Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit 2c5ca1a

Browse files
committed
body fetching flow control
1 parent 6652afe commit 2c5ca1a

File tree

1 file changed

+33
-52
lines changed

1 file changed

+33
-52
lines changed

pyethapp/synchronizer.py

Lines changed: 33 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ def fetch_hashchain(self):
146146

147147
if skeleton_fetch and not skeleton:
148148
remaining = self.fetch_headers(self.skeleton_peer,from0)
149-
150149
skeleton_fetch = False
151150
if not remaining:
152151
log_st.warn('no more skeleton received')
@@ -155,20 +154,18 @@ def fetch_hashchain(self):
155154
#should not continuew??
156155

157156
if skeleton_fetch:
158-
159157
self.fetch_headerbatch(from0,skeleton)
160-
log_st.debug('header batch', headerbatch= self.batch_result)
158+
# log_st.debug('header batch', headerbatch= self.batch_result)
161159
# check result
162160
if self.header_processed>0 :
163-
# self.fetch_blocks(header_batch)
164-
#from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
165161
from0 += self.header_processed
162+
remaining =self.batch_result[self.header_processed:]
166163
else:
167164
return self.exit(success=False)
168-
165+
#scheduling block download for unprocessed headers in the skeleton or last header batch
169166
if remaining:
170-
log_st.debug('fetching new skeletons')
171-
# self.fetch_blocks(remaining)
167+
log_st.debug('scheduling new headers',count= len(remaining), start=from0)
168+
self.synchronizer.blockheader_queue.put(remaining)
172169
from0+=len(remaining)
173170

174171

@@ -233,7 +230,7 @@ def fetch_headerbatch(self,origin,skeleton):
233230
self.pending_headerRequests[proto] = HeaderRequest(start)
234231
proto.idle = False
235232
fetching = True
236-
log_st.debug('sent header request',request= start , proto=proto)
233+
log_st.debug('sent header request',request=start , proto=proto)
237234
else:
238235
task_empty= True
239236
break
@@ -331,15 +328,19 @@ def verify_headers(self,proto,headers):
331328

332329
def fetch_headers(self,proto, fromx):
333330
deferred = AsyncResult()
334-
blockheaders_batch=[]
335-
proto.send_getblockheaders(fromx,self.max_blockheaders_per_request)
331+
blockheaders_batch=None
332+
proto.send_getblockheaders(fromx,self.max_blockheaders_per_request,0,1)
336333
try:
334+
self.requests[proto] = deferred
337335
blockheaders_batch = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
338336
except gevent.Timeout:
339337
log_st.warn('syncing batch hashchain timed out')
340-
return []
338+
proto.stop()
339+
return self.exit(success=False)
341340
finally:
342-
return blockheaders_batch
341+
del self.requests[proto]
342+
343+
return blockheaders_batch
343344

344345

345346
def receive_blockheaders(self, proto, blockheaders):
@@ -405,7 +406,7 @@ def run(self):
405406
def schedule_block_fetch(self):
406407
batch_header = []
407408
log_st.debug('start sheduleing blocks')
408-
self.synchronizer.blockheader_queue = Queue()
409+
self.synchronizer.blockheader_queue = Queue(maxsize=8192)
409410

410411
while True:
411412
batch_header= self.synchronizer.blockheader_queue.get()
@@ -436,32 +437,18 @@ def fetch_blocks(self):
436437
num_blocks = 0
437438
num_fetched = 0
438439
retry = 0
439-
last_block = None
440-
# batch_header = []
441-
#while not self.blockheaders_queue.empty():
442-
# self.synchronizer.blockheader_queue = Queue()
443-
# while True:
444-
# batch_header= self.synchronizer.blockheader_queue.get()
445-
# num_blocks = len(batch_header)
446-
# log_body_st.debug('delivered headers', delivered_heades=batch_header)
447-
# gevent.sleep(0)
448-
# while batch_header:
449-
# limit = len(batch_header) if len(batch_header) < self.max_blocks_process else self.max_blocks_process
450-
# blockbody_batch = batch_header[:limit]
451-
# for header in blockbody_batch:
452-
#check chain order
453-
#self.block_requests_pool[header.hash]= header
454-
# self.block_requests_pool.append(header)
455-
# self.bodytask_queue.put((header.number,header))
456-
#check if length block_requests_pool is equal to blockhashes_batch
457-
# batch_header = batch_header[limit:]
458-
440+
last_block = None
441+
throttled = False
459442
while True:
460443
try:
461444
result = self.fetch_ready.get()
462445
log_st.debug('start fetching blocks')
446+
if throttled:
447+
gevent.sleep(0.1)
448+
num_blocks = len(self.block_requests_pool)
463449
deferred = AsyncResult()
464450
self.body_request=deferred
451+
# throttled = False
465452

466453
#check timed out pending requests
467454
for proto in list(self.pending_bodyRequests):
@@ -477,8 +464,6 @@ def fetch_blocks(self):
477464
# proto.body_idle = True
478465
proto.stop()
479466

480-
# log_st.debug('header task queue size, pending queue size, batch_requestsize',size=self.bodytask_queue.qsize(),pending=len(self.pending_blockRequests),batch_request=len(self.block_requests_pool))
481-
#if self.headertask_queue.qsize == 0 and len(self.pending_headerRequests)==0 and len(self.batch_requests)==0 :
482467
if len(self.block_requests_pool) == 0:
483468
log_body_st.debug('block body fetching completed!')
484469
# return True
@@ -491,6 +476,14 @@ def fetch_blocks(self):
491476
# assert proto not in self.requests
492477
if proto.is_stopped:
493478
continue
479+
480+
#if self.chainservice.block_queue.qsize()> 1010:
481+
if pending>8192-num_fetched:
482+
throttled = True
483+
log_body_st.debug('throttled')
484+
break
485+
else:
486+
throttled = False
494487
proto_deferred = AsyncResult()
495488
# check if it's finished
496489
block_batch=[]
@@ -515,10 +508,10 @@ def fetch_blocks(self):
515508
break
516509
# check if there are protos available for header fetching
517510
#if not fetching and not self.headertask_queue.empty() and pending == len(self.pending_headerRequests):
518-
if not fetching and not task_empty:
511+
if not fetching and not task_empty and not throttled:
519512
log_body_st.warn('no protocols available')
520513
return self.exit(success=False)
521-
elif task_empty and pending==len(self.pending_bodyRequests):
514+
if task_empty or throttled and pending==len(self.pending_bodyRequests):
522515
continue
523516
try:
524517
proto_received = deferred.get(timeout=self.blocks_request_timeout)['proto']
@@ -558,7 +551,7 @@ def fetch_blocks(self):
558551
# add received t_blocks
559552
num_fetched += len(bodies)
560553
log_body_st.debug('received block bodies',
561-
num=len(bodies),blockbody=bodies,num_fetched=num_fetched,
554+
num=len(bodies),num_fetched=num_fetched,
562555
total=num_blocks, missing=num_blocks - num_fetched)
563556
proto_received.body_idle=True
564557
del self.requests[proto_received]
@@ -581,18 +574,6 @@ def fetch_blocks(self):
581574
except Exception as ex:
582575
log_body_st.error(error = ex)
583576

584-
585-
586-
# for body in bodies:
587-
# try:
588-
# h = headers.pop(0)
589-
# t_block = TransientBlock(h, body.transactions, body.uncles)
590-
# self.chainservice.add_block(t_block, proto) # this blocks if the queue is full
591-
# self.block_requests_pool.remove(h)
592-
# except IndexError as e:
593-
# log_st.error('headers and bodies mismatch', error=e)
594-
# self.exit(success=False)
595-
# log_st.debug('adding blocks done', took=time.time() - ts)
596577

597578
# done
598579
last_block = t_block
@@ -665,7 +646,7 @@ def __init__(self, chainservice, force_sync=None):
665646
self._protocols = dict() # proto: chain_difficulty
666647
self.synctask = None
667648
self.syncbody = None
668-
self.blockheader_queue = Queue()
649+
self.blockheader_queue = Queue(maxsize=8192)
669650

670651
def synctask_exited(self, success=False):
671652
# note: synctask broadcasts best block

0 commit comments

Comments
 (0)