Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit f3b15e6

Browse files
committed
block sync after batch header fetching
1 parent 6272453 commit f3b15e6

File tree

1 file changed

+71
-35
lines changed

1 file changed

+71
-35
lines changed

pyethapp/synchronizer.py

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato
5050
self.chain_difficulty = chain_difficulty
5151
self.requests = dict() # proto: Event
5252
self.header_request = None
53+
self.header_processed = 0
5354
self.batch_requests = [] #batch header request
54-
self.batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
55+
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
5556
self.start_block_number = self.chain.head.number
5657
self.end_block_number = self.start_block_number + 1 # minimum synctask
5758
self.max_block_revert = 3600*24 / self.chainservice.config['eth']['block']['DIFF_ADJUSTMENT_CUTOFF']
@@ -101,22 +102,24 @@ def fetch_hashchain(self):
101102

102103

103104
if self.originating_proto.is_stopped:
104-
if protos:
105-
self.skeleton_peer= protos[0]
106-
else:
107-
log_st.warn('no protocols available')
105+
# if protos:
106+
# self.skeleton_peer= protos[0]
107+
# else:
108+
log_st.warn('originating_proto not available')
108109
self.exit(success=False)
109110
else:
110111
self.skeleton_peer=self.originating_proto
111112
self.requests[self.skeleton_peer] = deferred
112113
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
113114
try:
114115
skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
115-
# assert isinstance(skeleton,list)
116-
# log_st.debug('skeleton received %u',len(skeleton))
116+
# assert isinstance(skeleton,list)
117+
log_st.debug('skeleton received %u',len(skeleton),skeleton=skeleton)
117118
except gevent.Timeout:
118119
log_st.warn('syncing skeleton timed out')
119120
#todo drop originating proto
121+
122+
# del self.requests[self.skeleton_peer]
120123
self.exit(success=False)
121124
finally:
122125
# # is also executed 'on the way out' when any other clause of the try statement
@@ -126,30 +129,35 @@ def fetch_hashchain(self):
126129
del self.requests[self.skeleton_peer]
127130

128131

129-
log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)
132+
#log_st.debug('skeleton received',num= len(skeleton), skeleton=skeleton)
130133

131134
if not skeleton:
132-
# self.fetch_headers(from0)
133-
# skeleton_fetch_done = True
135+
log_st.warn('no more skeleton received')
136+
# self.fetch_headers(from0)
137+
skeleton_fetch_done = True
138+
self.exit(success=False)
134139
continue
135140
else:
136-
self.fetch_headerbatch(skeleton)
141+
header_batch = self.fetch_headerbatch(from0,skeleton)
142+
log_st.debug('header batch', headerbatch= header_batch)
137143
# processed= process_headerbatch(batch_header)
138144
# self.batch_header = filled[:processed]
139-
# fetch_blocks(header_batch)
140-
from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
145+
if header_batch:
146+
# self.fetch_blocks(header_batch)
147+
from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
141148
#insert batch_header to hashchain
142149

143150

144151

145152
#send requests in batches, receive one header batch at a time
146-
def fetch_headerbatch(self, skeleton):
147-
153+
def fetch_headerbatch(self,origin,skeleton):
154+
log_st.debug('origin from',origin=origin)
148155

149-
# while True
150-
from0=skeleton[0]
156+
# while True
157+
#header_processed = 0
158+
#from0=skeleton[0]
151159
self.batch_requests=[]
152-
batch_result= []*self.max_skeleton_size*self.max_blockheaders_per_request
160+
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
153161
headers= []
154162
proto = None
155163
proto_received=None #proto which delivered the header
@@ -185,9 +193,9 @@ def fetch_headerbatch(self, skeleton):
185193
log_st.debug('batch header fetching done')
186194
return self.batch_result
187195
try:
188-
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)
189-
log_st.debug('headers batch received from proto',proto=proto_received)
190-
del self.header_request
196+
proto_received = deferred.get(timeout=self.blockheaders_request_timeout)['proto']
197+
header=deferred.get(timeout=self.blockheaders_request_timeout)['headers']
198+
log_st.debug('headers batch received from proto', header=header)
191199
except gevent.Timeout:
192200
log_st.warn('syncing batch hashchain timed out')
193201
retry += 1
@@ -199,8 +207,34 @@ def fetch_headerbatch(self, skeleton):
199207
log_st.info('headers sync failed with peers, retry', retry=retry)
200208
gevent.sleep(self.retry_delay)
201209
continue
210+
finally:
211+
del self.header_request
202212

203-
213+
if header[0] not in self.batch_requests:
214+
continue
215+
216+
#verified = self.verify_headers(self,proto_received, header)
217+
batch_header= header[::-1] #in hight rising order
218+
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin+len(batch_header)]= batch_header
219+
# log_st.debug('batch result',batch_result= self.batch_result)
220+
self.batch_requests.remove(header[0])
221+
proto_received.set_idle()
222+
del self.requests[proto_received]
223+
224+
header_ready = 0
225+
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
226+
header_ready += self.max_blockheaders_per_request
227+
228+
if header_ready > 0 :
229+
# Headers are ready for delivery, gather them
230+
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
231+
log_st.debug('issue fetching blocks',header_processed=self.header_processed, blocks=processed, proto=proto_received,count=len(processed),start=processed[0].number)
232+
self.fetch_blocks(processed)
233+
self.header_processed += len(processed)
234+
235+
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
236+
237+
204238

205239

206240
def idle_protocols(self):
@@ -235,7 +269,7 @@ def fetch_blocks(self, blockheaders_chain):
235269
# fetch blocks (no parallelism here)
236270
log_st.debug('fetching blocks', num=len(blockheaders_chain))
237271
assert blockheaders_chain
238-
blockheaders_chain.reverse() # height rising order
272+
# blockheaders_chain.reverse() # height rising order
239273
num_blocks = len(blockheaders_chain)
240274
num_fetched = 0
241275
retry = 0
@@ -244,7 +278,7 @@ def fetch_blocks(self, blockheaders_chain):
244278
bodies = []
245279

246280
# try with protos
247-
protocols = self.protocols
281+
protocols = self.idle_protocols()
248282
if not protocols:
249283
log_st.warn('no protocols available')
250284
return self.exit(success=False)
@@ -304,13 +338,13 @@ def fetch_blocks(self, blockheaders_chain):
304338
# done
305339
last_block = t_block
306340
assert not len(blockheaders_chain)
307-
assert last_block.header.hash == self.blockhash
308-
log_st.debug('syncing finished')
341+
# assert last_block.header.hash == self.blockhash
342+
# log_st.debug('syncing finished')
309343
# at this point blocks are not in the chain yet, but in the add_block queue
310344
if self.chain_difficulty >= self.chain.head.chain_difficulty():
311345
self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)
312-
313-
self.exit(success=True)
346+
return
347+
#self.exit(success=True)
314348

315349
def receive_newblockhashes(self, proto, newblockhashes):
316350
"""
@@ -349,12 +383,14 @@ def receive_blockheaders(self, proto, blockheaders):
349383
return
350384
if self.batch_requests and blockheaders:
351385
# check header validity
352-
353-
if blockheaders[0] in self.batch_requests:
354-
self.batch_requests.remove(blockheaders[0])
355-
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
356-
proto.set_idle()
357-
del self.requests[proto]
386+
# if not valid(blockheaders):
387+
# return self.exit(success=False)
388+
389+
# if blockheaders[0] in self.batch_requests:
390+
# self.batch_requests.remove(blockheaders[0])
391+
# log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
392+
# proto.set_idle()
393+
# del self.requests[proto]
358394
#deliver to header processer
359395

360396
#pack batch headers
@@ -365,7 +401,7 @@ def receive_blockheaders(self, proto, blockheaders):
365401

366402
#evoke next header fetching task
367403
# self.requests[proto].set(proto)
368-
self.header_request.set(proto)
404+
self.header_request.set({'proto':proto,'headers':blockheaders})
369405
elif proto == self.skeleton_peer: #make sure it's from the originating proto
370406
self.requests[proto].set(blockheaders)
371407

0 commit comments

Comments
 (0)