Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit 850f78f

Browse files
committed
reconstructing header delivering
1 parent dcfe518 commit 850f78f

File tree

1 file changed

+41
-41
lines changed

1 file changed

+41
-41
lines changed

pyethapp/synchronizer.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class SyncTask(object):
4747
initial_blockheaders_per_request = 32
4848
max_blockheaders_per_request = 192
4949
max_skeleton_size = 128
50-
blockheaders_request_timeout = 15.
50+
blockheaders_request_timeout = 20.
5151
max_retries = 3
5252
retry_delay = 2.
5353
blocks_request_timeout = 16.
@@ -210,10 +210,15 @@ def fetch_headerbatch(self,origin,skeleton):
210210
self.headertask_queue.put((request.start,request.start))
211211
log_st.debug('timeouted request',
212212
start=request.start,proto=proto)
213+
# if failed header requests> 2 else set it idle try one more time,
214+
# if len(request.headers) > 2:
215+
proto.idle = True
216+
# else:
217+
# proto.stop()
213218
del self.pending_headerRequests[proto]
214-
# if overtimes> 2 else set it idle try one more time,
215-
# proto.idle = True
216-
proto.stop()
219+
220+
221+
217222

218223
log_st.debug('header task queue size, pending queue size, batch_requestsize',size=self.headertask_queue.qsize(),pending=len(self.pending_headerRequests),batch_request=len(self.batch_requests))
219224
#if self.headertask_queue.qsize == 0 and len(self.pending_headerRequests)==0 and len(self.batch_requests)==0 :
@@ -249,8 +254,8 @@ def fetch_headerbatch(self,origin,skeleton):
249254
continue
250255
try:
251256
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)['proto']
252-
header=deferred.get(timeout=self.blockheaders_request_timeout)['headers']
253-
log_st.debug('headers batch received from proto', header=header)
257+
headers =deferred.get(timeout=self.blockheaders_request_timeout)['headers']
258+
log_st.debug('headers batch received from proto', header=headers)
254259
except gevent.Timeout:
255260
log_st.warn('syncing batch hashchain timed out')
256261
retry += 1
@@ -266,34 +271,35 @@ def fetch_headerbatch(self,origin,skeleton):
266271
del self.header_request
267272

268273
# check if header is empty
269-
270-
if header[0] not in self.batch_requests:
271-
continue
272274
if proto_received not in self.pending_headerRequests:
273-
continue
274-
start_header= self.pending_headerRequests[proto_received].start
275-
del self.pending_headerRequests[proto_received]
276-
verified = self.verify_headers(proto_received,header)
275+
continue
276+
self.deliver_headers(origin,proto_received, headers)
277+
proto_received.idle = True
278+
279+
280+
def deliver_headers(self,origin,proto,header):
281+
if header[0] not in self.batch_requests:
282+
return
283+
start_header= self.pending_headerRequests[proto].start
284+
del self.pending_headerRequests[proto]
285+
verified = self.verify_headers(proto,header)
286+
277287
if not verified:
278288
self.headertask_queue.put((start_header,start_header))
279-
continue
280-
281-
289+
return
282290
batch_header= header[::-1] #in hight rising order
283291
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin-1+len(batch_header)]= batch_header
284292
# log_st.debug('batch result',batch_result= self.batch_result)
285293
self.batch_requests.remove(header[0])
286-
proto_received.set_idle()
287-
del self.requests[proto_received]
288-
294+
del self.requests[proto]
289295
header_ready = 0
290296
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
291297
header_ready += self.max_blockheaders_per_request
292298

293299
if header_ready > 0 :
294300
# Headers are ready for delivery, gather them
295301
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
296-
# log_st.debug('issue fetching blocks',header_processed=self.header_processed, blocks=processed, proto=proto_received,count=len(processed),start=processed[0].number)
302+
# log_st.debug('issue fetching blocks',header_processed=self.header_processed,blocks=processed, proto=proto,count=len(processed),start=processed[0].number)
297303

298304
count=len(processed)
299305
self.synchronizer.blockheader_queue.put(processed)
@@ -303,7 +309,7 @@ def fetch_headerbatch(self,origin,skeleton):
303309
# return self.batch_result[:self.header_processed]
304310
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
305311

306-
312+
307313

308314

309315
def idle_protocols(self):
@@ -340,7 +346,7 @@ def fetch_headers(self,proto, fromx):
340346
self.requests[proto] = deferred
341347
blockheaders_batch = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
342348
except gevent.Timeout:
343-
log_st.warn('syncing batch hashchain timed out')
349+
log_st.warn('fetch_headers syncing batch hashchain timed out')
344350
proto.stop()
345351
return self.exit(success=False)
346352
finally:
@@ -462,7 +468,7 @@ def fetch_blocks(self):
462468
while True:
463469
try:
464470
result = self.fetch_ready.get()
465-
log_st.debug('start fetching blocks')
471+
log_body_st.debug('start fetching blocks')
466472
num_blocks = len(self.block_requests_pool)
467473
deferred = AsyncResult()
468474
self.body_request=deferred
@@ -477,10 +483,12 @@ def fetch_blocks(self):
477483
self.bodytask_queue.put((h.number,h))
478484
log_body_st.debug('timeouted request',
479485
start=request.start,proto=proto)
486+
# if failed headers> 2 set it idle,
487+
if len(request.headers) > 2:
488+
proto.body_idle = True
489+
else:
490+
proto.stop()
480491
del self.pending_bodyRequests[proto]
481-
# if overtimes> 2 else set it idle try one more time,
482-
# proto.body_idle = True
483-
proto.stop()
484492

485493
if len(self.block_requests_pool) == 0:
486494
log_body_st.debug('block body fetching completed!')
@@ -495,14 +503,6 @@ def fetch_blocks(self):
495503
# assert proto not in self.requests
496504
if proto.is_stopped:
497505
continue
498-
499-
if pending>8192-num_fetched:
500-
throttled = True
501-
log_body_st.debug('throttled')
502-
break
503-
else:
504-
throttled = False
505-
506506
if not self.reserve_blocks(proto, self.max_blocks_per_request):
507507
log_body_st.debug('reserve blocks failed')
508508
break
@@ -523,13 +523,13 @@ def fetch_blocks(self):
523523
except gevent.Timeout:
524524
log_body_st.warn('syncing batch block body timed out')
525525
retry += 1
526-
if retry >= self.max_retries:
527-
log_body_st.warn('headers sync failed with all peers',
528-
num_protos=len(self.body_idle_protocols()))
529-
return self.exit(success=False)
530-
else:
531-
log_body_st.info('headers sync failed with peers, retry', retry=retry)
532-
gevent.sleep(self.retry_delay)
526+
# if retry >= self.max_retries:
527+
# log_body_st.warn('headers sync failed with all peers',
528+
# num_protos=len(self.body_idle_protocols()))
529+
# return self.exit(success=False)
530+
# else:
531+
log_body_st.info('headers sync failed with peers, retry', retry=retry)
532+
gevent.sleep(self.retry_delay)
533533
continue
534534
finally:
535535
del self.body_request

0 commit comments

Comments
 (0)