Skip to content
This repository was archived by the owner on Aug 8, 2018. It is now read-only.

Commit c478bb0

Browse files
committed
Rewrite header verification
1 parent 68520af commit c478bb0

File tree

1 file changed

+38
-38
lines changed

1 file changed

+38
-38
lines changed

pyethapp/synchronizer.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato
6565
self.chain_difficulty = chain_difficulty
6666
self.requests = dict() # proto: Event
6767
self.header_processed = 0
68-
self.batch_requests = [] #batch header request
68+
self.batch_requests = dict() #batch header request
6969
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
7070
self.headertask_queue = Q.PriorityQueue()
7171
self.pending_headerRequests = dict()
@@ -129,7 +129,8 @@ def fetch_hashchain(self):
129129
else:
130130
self.skeleton_peer=self.originating_proto
131131
self.requests[self.skeleton_peer] = deferred
132-
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
132+
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request-1,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
133+
133134
try:
134135
skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
135136
# assert isinstance(skeleton,list)
@@ -185,16 +186,17 @@ def fetch_headerbatch(self,origin,skeleton):
185186
# while True
186187
self.header_processed = 0
187188
#from0=skeleton[0]
188-
self.batch_requests=[]
189+
self.batch_requests=dict()
189190
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
190191
headers= []
191192
proto = None
192193
proto_received=None #proto which delivered the header
193194
retry = 0
194195
received = False
195-
for header in skeleton:
196-
self.batch_requests.append(header)
197-
self.headertask_queue.put((header.number,header.number))
196+
for i, header in enumerate(skeleton):
197+
index = origin + i*self.max_blockheaders_per_request
198+
self.batch_requests[index] = header
199+
self.headertask_queue.put((index,index))
198200

199201
while True:
200202
# requests = iter(self.batch_requests)
@@ -230,14 +232,16 @@ def fetch_headerbatch(self,origin,skeleton):
230232
task_empty = False
231233
pending=len(self.pending_headerRequests)
232234
for proto in self.idle_protocols():
233-
234235
proto_deferred = AsyncResult()
235-
# check if it's finished
236-
236+
# check if the proto is already busy
237+
238+
if self.pending_headerRequests.get(proto):
239+
continue
240+
237241
if not self.headertask_queue.empty():
238242
start=self.headertask_queue.get()[1]
239243
self.requests[proto] = proto_deferred
240-
proto.send_getblockheaders(start,self.max_blockheaders_per_request)
244+
proto.send_getblockheaders(start,self.max_blockheaders_per_request,0,0)
241245
self.pending_headerRequests[proto] = HeaderRequest(start)
242246
proto.idle = False
243247
fetching = True
@@ -281,21 +285,34 @@ def fetch_headerbatch(self,origin,skeleton):
281285

282286

283287
def deliver_headers(self,origin,proto,header):
284-
if header[0] not in self.batch_requests:
288+
if header[0].number not in self.batch_requests:
285289
log_st.debug('header delivered not matching requested headers')
286290
return
287-
start_header= self.pending_headerRequests[proto].start
291+
index = self.pending_headerRequests[proto].start
292+
293+
log_st.debug('index', index=index)
288294
del self.pending_headerRequests[proto]
289-
verified = self.verify_headers(proto,header)
290295

296+
#start= self.batch_requests[index].number
297+
headerhash= self.batch_requests[index].hash
298+
verified = True
299+
if len(header) != self.max_blockheaders_per_request :
300+
verified = False
301+
if verified:
302+
if header[0].number != index:
303+
log_st.warn('First header broke chain ordering',proto=proto,number = header[0].number,start=index)
304+
verified= False
305+
elif header[len(header)-1].hash != headerhash:
306+
log_st.warn('Last header broke skeleton structure', proto= proto,
307+
number= header[len(header)-1].number, headerhash = header[len(header)-1].hash, expected= headerhash)
308+
verified= False
309+
#
291310
if not verified:
292311
log_st.debug('header delivered not verified')
293312
self.headertask_queue.put((start_header,start_header))
294313
return
295-
batch_header= header[::-1] #in hight rising order
296-
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin-1+len(batch_header)]= batch_header
297-
# log_st.debug('batch result',batch_result= self.batch_result)
298-
self.batch_requests.remove(header[0])
314+
self.batch_result[(header[0].number-origin):header[0].number-origin+len(header)]=header
315+
del self.batch_requests[index]
299316
del self.requests[proto]
300317
header_ready = 0
301318
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
@@ -304,15 +321,15 @@ def deliver_headers(self,origin,proto,header):
304321
if header_ready > 0 :
305322
# Headers are ready for delivery, gather them
306323
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
307-
# log_st.debug('issue fetching blocks',header_processed=self.header_processed,blocks=processed, proto=proto,count=len(processed),start=processed[0].number)
324+
log_st.debug('issue fetching blocks',header_processed=self.header_processed,blocks=processed, proto=proto,count=len(processed),start=processed[0].number)
308325

309326
count=len(processed)
310327
self.synchronizer.blockheader_queue.put(processed)
311328
# if self.fetch_blocks(processed):
312329
self.header_processed += count
313330
#else:
314331
# return self.batch_result[:self.header_processed]
315-
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
332+
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests.items())
316333

317334

318335

@@ -325,23 +342,6 @@ def idle_protocols(self):
325342
return idle
326343

327344

328-
def verify_headers(self,proto,headers):
329-
# start = self.pending_headerRequests[proto]
330-
# headerhash= self.batch_requests[start].hash
331-
# if headers[0].number != start:
332-
# log_st.warn('First header broke chain ordering', proto=proto,number =headers[0].number,headerhash = headers[0].hash,start=start)
333-
# return False
334-
# elif headers[len(headers)-1].hash != target
335-
336-
# if request:
337-
# return 0, errNoFetchesPending
338-
if len(headers) != self.max_blockheaders_per_request:
339-
log_st.debug('headers batch count', count=len(headers))
340-
341-
return False
342-
343-
return True
344-
345345

346346
def fetch_headers(self,proto, fromx):
347347
deferred = AsyncResult()
@@ -365,7 +365,7 @@ def receive_blockheaders(self, proto, blockheaders):
365365
if proto not in self.requests:
366366
log.debug('unexpected blockheaders')
367367
return
368-
if self.batch_requests:
368+
if any(self.batch_requests):
369369
self.header_request.set({'proto':proto,'headers':blockheaders})
370370
elif proto == self.skeleton_peer: #make sure it's from the originating proto
371371
self.requests[proto].set(blockheaders)
@@ -399,7 +399,7 @@ def __init__(self, synchronizer, blockhash, chain_difficulty=0, originator_only=
399399
self.block_requests_pool = []
400400
self.bodytask_queue = Q.PriorityQueue()
401401
self.body_cache = [None]*self.body_cache_limit
402-
self.body_cache_offset= self.chain.head.number+1
402+
self.body_cache_offset= self.chain.head.number
403403
self.body_downloaded = dict()
404404
self.pending_bodyRequests = dict()
405405
self.requests = dict() # proto: Event

0 commit comments

Comments
 (0)