Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit 2d19806

Browse files
committed
block sync after batch header fetching
1 parent 6272453 commit 2d19806

File tree

1 file changed

+111
-54
lines changed

1 file changed

+111
-54
lines changed

pyethapp/synchronizer.py

Lines changed: 111 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class SyncTask(object):
3232
initial_blockheaders_per_request = 32
3333
max_blockheaders_per_request = 192
3434
max_skeleton_size = 128
35-
max_blocks_per_request = 128
35+
#max_blocks_per_request = 128
36+
max_blocks_per_request = 192
3637
max_retries = 16
3738
retry_delay = 3.
3839
blocks_request_timeout = 8.
@@ -50,8 +51,9 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato
5051
self.chain_difficulty = chain_difficulty
5152
self.requests = dict() # proto: Event
5253
self.header_request = None
54+
self.header_processed = 0
5355
self.batch_requests = [] #batch header request
54-
self.batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
56+
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
5557
self.start_block_number = self.chain.head.number
5658
self.end_block_number = self.start_block_number + 1 # minimum synctask
5759
self.max_block_revert = 3600*24 / self.chainservice.config['eth']['block']['DIFF_ADJUSTMENT_CUTOFF']
@@ -86,34 +88,29 @@ def fetch_hashchain(self):
8688
# from=commonancestor(self.blockhash,local_blockhash)
8789
skeleton = []
8890
header_batch = []
89-
skeleton_fetch_done=False
91+
skeleton_fetch=True
92+
remaining=[]
9093
from0 = self.chain.head.number
91-
94+
9295
log_st.debug('current block number:%u', from0, origin=self.originating_proto)
93-
while not skeleton_fetch_done:
96+
while True:
9497
# Get skeleton headers
9598
deferred = AsyncResult()
96-
protos = self.idle_protocols()
97-
# if not protos:
98-
# log_st.warn('no protocols available')
99-
# self.exit(success=False)
100-
# else
101-
10299

103100
if self.originating_proto.is_stopped:
104-
if protos:
105-
self.skeleton_peer= protos[0]
106-
else:
107-
log_st.warn('no protocols available')
101+
# if protos:
102+
# self.skeleton_peer= protos[0]
103+
# else:
104+
log_st.warn('originating_proto not available')
108105
self.exit(success=False)
109106
else:
110107
self.skeleton_peer=self.originating_proto
111108
self.requests[self.skeleton_peer] = deferred
112109
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
113110
try:
114111
skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
115-
# assert isinstance(skeleton,list)
116-
# log_st.debug('skeleton received %u',len(skeleton))
112+
# assert isinstance(skeleton,list)
113+
log_st.debug('skeleton received %u',len(skeleton),skeleton=skeleton)
117114
except gevent.Timeout:
118115
log_st.warn('syncing skeleton timed out')
119116
#todo drop originating proto
@@ -126,30 +123,53 @@ def fetch_hashchain(self):
126123
del self.requests[self.skeleton_peer]
127124

128125

129-
log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)
126+
#log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)
130127

131-
if not skeleton:
132-
# self.fetch_headers(from0)
133-
# skeleton_fetch_done = True
134-
continue
135-
else:
136-
self.fetch_headerbatch(skeleton)
137-
# processed= process_headerbatch(batch_header)
128+
if skeleton_fetch and not skeleton:
129+
remaining = self.fetch_headers(self.skeleton_peer,from0)
130+
131+
skeleton_fetch = False
132+
if not remaining:
133+
log_st.warn('no more skeleton received')
134+
return self.exit(success=True)
135+
# self.exit(success=False)
136+
#should not continuew??
137+
138+
if skeleton_fetch:
139+
140+
header_batch = self.fetch_headerbatch(from0,skeleton)
141+
log_st.debug('header batch', headerbatch= header_batch)
142+
# check result
143+
# processed= process_headerbatch(batch_header)
138144
# self.batch_header = filled[:processed]
139-
# fetch_blocks(header_batch)
140-
from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
145+
if header_batch:
146+
# self.fetch_blocks(header_batch)
147+
#from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
148+
from0 = from0 + len(header_batch)
149+
else:
150+
return self.exit(success=False)
151+
152+
if remaining:
153+
log_st.debug('fetching new skeletons')
154+
self.fetch_blocks(remaining)
155+
from0+=len(remaining)
156+
157+
158+
159+
141160
#insert batch_header to hashchain
142161

143162

144163

145164
#send requests in batches, receive one header batch at a time
146-
def fetch_headerbatch(self, skeleton):
147-
165+
def fetch_headerbatch(self,origin,skeleton):
166+
log_st.debug('skeleton from',origin=origin)
148167

149-
# while True
150-
from0=skeleton[0]
168+
# while True
169+
self.header_processed = 0
170+
#from0=skeleton[0]
151171
self.batch_requests=[]
152-
batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
172+
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
153173
headers= []
154174
proto = None
155175
proto_received=None #proto which delivered the header
@@ -158,14 +178,18 @@ def fetch_headerbatch(self, skeleton):
158178
for header in skeleton:
159179
self.batch_requests.append(header)
160180

161-
#requests = cycle(self.batch_requests)
162-
# while there are unanswered requests
163-
164181

165182
while True:
166183
requests = iter(self.batch_requests)
167184
deferred = AsyncResult()
168185
self.header_request=deferred
186+
# check if there are idle protocols
187+
protocols = self.idle_protocols()
188+
if not protocols:
189+
log_st.warn('no protocols available')
190+
return self.exit(success=False)
191+
192+
169193
for proto in self.idle_protocols():
170194

171195
proto_deferred = AsyncResult()
@@ -185,9 +209,9 @@ def fetch_headerbatch(self, skeleton):
185209
log_st.debug('batch header fetching done')
186210
return self.batch_result
187211
try:
188-
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)
189-
log_st.debug('headers batch received from proto',proto=proto_received)
190-
del self.header_request
212+
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)['proto']
213+
header=deferred.get(timeout=self.blockheaders_request_timeout)['headers']
214+
log_st.debug('headers batch received from proto', header=header)
191215
except gevent.Timeout:
192216
log_st.warn('syncing batch hashchain timed out')
193217
retry += 1
@@ -199,8 +223,38 @@ def fetch_headerbatch(self, skeleton):
199223
log_st.info('headers sync failed with peers, retry', retry=retry)
200224
gevent.sleep(self.retry_delay)
201225
continue
226+
finally:
227+
del self.header_request
228+
229+
# check if header is empty
202230

203-
231+
if header[0] not in self.batch_requests:
232+
continue
233+
234+
#verified = self.verify_headers(self,proto_received, header)
235+
batch_header= header[::-1] #in hight rising order
236+
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin-1+len(batch_header)]= batch_header
237+
# log_st.debug('batch result',batch_result= self.batch_result)
238+
self.batch_requests.remove(header[0])
239+
proto_received.set_idle()
240+
del self.requests[proto_received]
241+
242+
header_ready = 0
243+
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
244+
header_ready += self.max_blockheaders_per_request
245+
246+
if header_ready > 0 :
247+
# Headers are ready for delivery, gather them
248+
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
249+
log_st.debug('issue fetching blocks',header_processed=self.header_processed, blocks=processed, proto=proto_received,count=len(processed),start=processed[0].number)
250+
count=len(processed)
251+
if self.fetch_blocks(processed):
252+
self.header_processed += count
253+
else:
254+
return self.batch_result[:self.header_processed]
255+
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
256+
257+
204258

205259

206260
def idle_protocols(self):
@@ -220,12 +274,13 @@ def verify_headers(self,proto,headers):
220274

221275
def fetch_headers(self,proto, fromx):
222276
deferred = AsyncResult()
277+
blockheaders_batch=[]
223278
proto.send_getblockheaders(fromx,self.max_blockheaders_per_request)
224279
try:
225280
blockheaders_batch = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
226281
except gevent.Timeout:
227282
log_st.warn('syncing batch hashchain timed out')
228-
self.exit()
283+
return []
229284
finally:
230285
return blockheaders_batch
231286

@@ -235,7 +290,7 @@ def fetch_blocks(self, blockheaders_chain):
235290
# fetch blocks (no parallelism here)
236291
log_st.debug('fetching blocks', num=len(blockheaders_chain))
237292
assert blockheaders_chain
238-
blockheaders_chain.reverse() # height rising order
293+
# blockheaders_chain.reverse() # height rising order
239294
num_blocks = len(blockheaders_chain)
240295
num_fetched = 0
241296
retry = 0
@@ -244,7 +299,7 @@ def fetch_blocks(self, blockheaders_chain):
244299
bodies = []
245300

246301
# try with protos
247-
protocols = self.protocols
302+
protocols = self.idle_protocols()
248303
if not protocols:
249304
log_st.warn('no protocols available')
250305
return self.exit(success=False)
@@ -304,13 +359,13 @@ def fetch_blocks(self, blockheaders_chain):
304359
# done
305360
last_block = t_block
306361
assert not len(blockheaders_chain)
307-
assert last_block.header.hash == self.blockhash
308-
log_st.debug('syncing finished')
362+
# assert last_block.header.hash == self.blockhash
363+
# log_st.debug('syncing finished')
309364
# at this point blocks are not in the chain yet, but in the add_block queue
310365
if self.chain_difficulty >= self.chain.head.chain_difficulty():
311366
self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)
312-
313-
self.exit(success=True)
367+
return True
368+
#self.exit(success=True)
314369

315370
def receive_newblockhashes(self, proto, newblockhashes):
316371
"""
@@ -348,13 +403,15 @@ def receive_blockheaders(self, proto, blockheaders):
348403
log.debug('unexpected blockheaders')
349404
return
350405
if self.batch_requests and blockheaders:
351-
# check header validity
352-
353-
if blockheaders[0] in self.batch_requests:
354-
self.batch_requests.remove(blockheaders[0])
355-
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
356-
proto.set_idle()
357-
del self.requests[proto]
406+
# check header validity
407+
# if not valid(blockheaders):
408+
# return self.exit(success=False)
409+
410+
# if blockheaders[0] in self.batch_requests:
411+
# self.batch_requests.remove(blockheaders[0])
412+
# log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
413+
# proto.set_idle()
414+
# del self.requests[proto]
358415
#deliver to header processer
359416

360417
#pack batch headers
@@ -365,7 +422,7 @@ def receive_blockheaders(self, proto, blockheaders):
365422

366423
#evoke next header fetching task
367424
# self.requests[proto].set(proto)
368-
self.header_request.set(proto)
425+
self.header_request.set({'proto':proto,'headers':blockheaders})
369426
elif proto == self.skeleton_peer: #make sure it's from the originating proto
370427
self.requests[proto].set(blockheaders)
371428

0 commit comments

Comments
 (0)