Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit bdbf137

Browse files
committed
Asynchronize block importing
1 parent c478bb0 commit bdbf137

File tree

2 files changed

+61
-44
lines changed

2 files changed

+61
-44
lines changed

pyethapp/eth_service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class ChainService(WiredService):
112112
genesis = None
113113
synchronizer = None
114114
config = None
115-
block_queue_size = 1024
115+
block_queue_size = 2048
116116
processed_gas = 0
117117
processed_elapsed = 0
118118

@@ -278,6 +278,7 @@ def add_block(self, t_block, proto):
278278
if not self.add_blocks_lock:
279279
self.add_blocks_lock = True # need to lock here (ctx switch is later)
280280
gevent.spawn(self._add_blocks)
281+
log.debug('return from add blocks')
281282

282283
def add_mined_block(self, block):
283284
log.debug('adding mined block', block=block)

pyethapp/synchronizer.py

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
import sys
1+
#import sys
2+
from __future__ import print_function
3+
from __future__ import absolute_import
4+
from builtins import str
5+
from builtins import object
26
from gevent.event import AsyncResult
37
import gevent
48
import time
5-
from eth_protocol import TransientBlockBody, TransientBlock
9+
from .eth_protocol import TransientBlockBody, TransientBlock
610
from ethereum.block import BlockHeader
711
from ethereum.slogging import get_logger
812
import ethereum.utils as utils
913
import traceback
10-
import Queue as Q
14+
import queue as Q
1115
from gevent.queue import Queue
1216

1317
log = get_logger('eth.sync')
@@ -25,7 +29,7 @@ def __init__(self, start=0, headers=[]):
2529
class BodyResponse(object):
2630
def __init__(self, header=None):
2731
self.header = header
28-
self.block = []
32+
self.block = None
2933

3034
class SyncTask(object):
3135

@@ -270,6 +274,7 @@ def fetch_headerbatch(self,origin,skeleton):
270274

271275
#else:
272276
# log_st.info('headers sync failed with peers, retry', retry=retry)
277+
self.synchronizer.blockheader_queue.put(None)
273278
gevent.sleep(self.retry_delay)
274279
continue
275280
finally:
@@ -287,7 +292,7 @@ def fetch_headerbatch(self,origin,skeleton):
287292
def deliver_headers(self,origin,proto,header):
288293
if header[0].number not in self.batch_requests:
289294
log_st.debug('header delivered not matching requested headers')
290-
return
295+
self.exit(False)
291296
index = self.pending_headerRequests[proto].start
292297

293298
log_st.debug('index', index=index)
@@ -404,7 +409,8 @@ def __init__(self, synchronizer, blockhash, chain_difficulty=0, originator_only=
404409
self.pending_bodyRequests = dict()
405410
self.requests = dict() # proto: Event
406411
self.body_request = None
407-
self.fetch_ready= AsyncResult()
412+
self.fetch_ready = AsyncResult()
413+
self.import_ready = AsyncResult()
408414
gevent.spawn(self.run)
409415
# gevent.spawn(self.schedule_block_fetch)
410416

@@ -429,12 +435,18 @@ def run(self):
429435
try:
430436
gevent.spawn(self.schedule_block_fetch)
431437
gevent.spawn(self.fetch_blocks)
438+
gevent.spawn(self.import_block)
432439
except Exception:
433440
print(traceback.format_exc())
434441
self.exit(success=False)
442+
# log_body_st.debug('syncing finished')
443+
# if self.chain_difficulty >= self.chain.get_score(self.chain.head):
444+
# self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)
445+
446+
# self.exit(success=True)
447+
435448

436449

437-
#Body fetch scheduler
438450
#Body fetch scheduler reads from downloaded header queue, dividing headers
439451
#into batches(2048 or less), for each header batch adding the headers to the
440452
#task queue, each queue item contains a task of 128 body fetches, activate
@@ -443,7 +455,7 @@ def schedule_block_fetch(self):
443455
batch_header = []
444456
log_st.debug('start sheduleing blocks')
445457
#?? maxsize wrong??
446-
self.synchronizer.blockheader_queue = Queue(maxsize=8192)
458+
self.synchronizer.blockheader_queue = Queue(maxsize=0)
447459

448460
while True:
449461
batch_header = self.synchronizer.blockheader_queue.get()
@@ -561,24 +573,16 @@ def fetch_blocks(self):
561573
num=len(bodies),num_fetched=num_fetched,
562574
total_requested=num_blocks, missing=num_blocks - num_fetched)
563575
ts= time.time()
564-
last_block = self.deliver_blocks(proto_received, bodies)
576+
self.deliver_blocks(proto_received, bodies)
565577

566-
log_body_st.debug('adding blocks done', took=time.time() - ts)
578+
# log_body_st.debug('adding blocks done', took=time.time() - ts)
567579

568580
except Exception as ex:
569581
log_body_st.error(error = ex)
570582

571583

572584
# done
573585
#last_block = t_block
574-
# assert not len(batch_header)
575-
assert last_block.header.hash == self.blockhash
576-
log_body_st.debug('syncing finished')
577-
# at this point blocks are not in the chain yet, but in the add_block queue
578-
if self.chain_difficulty >= self.chain.get_score(self.chain.head):
579-
self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)
580-
581-
self.exit(success=True)
582586

583587
def reserve_blocks(self,proto,count):
584588
log_body_st.debug('reserving blocks')
@@ -611,7 +615,7 @@ def reserve_blocks(self,proto,count):
611615
# if isNoop(header):
612616
# if proto.lacks(header.hash)
613617
block_batch.append(header)
614-
i +=1
618+
i += 1
615619
self.requests[proto] = AsyncResult()
616620
blockhashes_batch = [h.hash for h in block_batch]
617621
proto.send_getblockbodies(*blockhashes_batch)
@@ -650,25 +654,37 @@ def deliver_blocks(self, proto, bodies):
650654
for h in headers:
651655
if h:
652656
self.bodytask_queue.put((h.number,h))
653-
654-
#import downloaded bodies into chain
655-
for i, block in enumerate(self.body_cache):
656-
if not self.body_cache[i]:
657-
break
658-
nimp = i
659-
log_body_st.debug('nimp', nimp=nimp)
660-
body_result = self.body_cache[:nimp]
661-
662-
log_body_st.debug('body downloaded',downloaded=self.body_downloaded)
663-
664-
if body_result:
665-
for result in body_result:
666-
self.chainservice.add_block(result.block,proto)
667-
del self.body_downloaded[result.header.number]
668-
self.body_cache = self.body_cache[nimp:]+[None for b in body_result]
669-
self.body_cache_offset += nimp
670-
log_body_st.debug('body cache offset', offset=self.body_cache_offset)
671-
return result.block
657+
#activate importing bodies
658+
log_body_st.debug('acivate importing blocks...')
659+
self.import_ready.set(proto)
660+
661+
def import_block(self):
662+
while True:
663+
self.import_ready = AsyncResult()
664+
try:
665+
proto = self.import_ready.get()
666+
log_body_st.debug('start importing blocks...')
667+
for i, block in enumerate(self.body_cache):
668+
if not self.body_cache[i] or self.body_cache[i].block is None:
669+
break
670+
nimp = i
671+
log_body_st.debug('nimp', nimp=nimp)
672+
body_result = self.body_cache[:nimp]
673+
674+
log_body_st.debug('body downloaded',downloaded=self.body_downloaded)
675+
676+
if body_result:
677+
for result in body_result:
678+
self.chainservice.add_block(result.block,proto)
679+
del self.body_downloaded[result.header.number]
680+
self.body_cache = self.body_cache[nimp:]+[None for b in body_result]
681+
self.body_cache_offset += nimp
682+
log_body_st.debug('body cache offset', offset=self.body_cache_offset)
683+
except Exception as ex:
684+
log_body_st.error(error = ex)
685+
686+
687+
672688

673689

674690
def receive_blockbodies(self, proto, bodies):
@@ -731,7 +747,7 @@ def __init__(self, chainservice, force_sync=None):
731747
self._protocols = dict() # proto: chain_difficulty
732748
self.synctask = None
733749
self.syncbody = None
734-
self.blockheader_queue = Queue(maxsize=8192)
750+
self.blockheader_queue = Queue(maxsize=0)
735751

736752
def synctask_exited(self, success=False):
737753
# note: synctask broadcasts best block
@@ -754,8 +770,8 @@ def syncbody_exited(self, success=False):
754770
def protocols(self):
755771
"return protocols which are not stopped sorted by highest chain_difficulty"
756772
# filter and cleanup
757-
self._protocols = dict((p, cd) for p, cd in self._protocols.items() if not p.is_stopped)
758-
return sorted(self._protocols.keys(), key=lambda p: self._protocols[p], reverse=True)
773+
self._protocols = dict((p, cd) for p, cd in list(self._protocols.items()) if not p.is_stopped)
774+
return sorted(list(self._protocols.keys()), key=lambda p: self._protocols[p], reverse=True)
759775

760776

761777
def receive_newblock(self, proto, t_block, chain_difficulty):
@@ -826,7 +842,7 @@ def receive_status(self, proto, blockhash, chain_difficulty):
826842

827843
if self.force_sync:
828844
blockhash, chain_difficulty = self.force_sync
829-
log.debug('starting forced syctask', blockhash=blockhash.encode('hex'))
845+
log.debug('starting forced syctask', blockhash=encode_hex(blockhash))
830846
self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)
831847

832848
elif chain_difficulty > self.chain.get_score(self.chain.head):
@@ -852,7 +868,7 @@ def receive_newblockhashes(self, proto, newblockhashes):
852868
log.warn('supporting only one newblockhash', num=len(newblockhashes))
853869
if not self.synctask:
854870
blockhash = newblockhashes[0]
855-
log.debug('starting synctask for newblockhashes', blockhash=blockhash.encode('hex'))
871+
log.debug('starting synctask for newblockhashes', blockhash=encode_hex(blockhash))
856872
self.synctask = SyncTask(self, proto, blockhash, 0, originator_only=True)
857873
self.syncbody = SyncBody(self, chain_difficulty, blockhash)
858874

0 commit comments

Comments
 (0)