@@ -225,66 +225,97 @@ def __init__(self):
225
225
# end by this time in any case
226
226
self .deadline = 0
227
227
self .maxLen = 0
228
+ # during shutdown, wait up to 20 seconds to finish uploading
229
+ self .shutdownWait = 20
230
+ # forget tracking objects after 60 seconds
231
+ self .objectWait = 60
232
+ # wait 10 seconds between clears
233
+ self .clearDelay = 10
234
+ self .lastCleared = time .time ()
228
235
229
236
def add (self , objectHash = None ):
230
237
with self .lock :
231
238
# add a new object into existing thread lists
232
239
if objectHash :
233
240
if objectHash not in self .hashes :
234
- self .hashes [objectHash ] = []
241
+ self .hashes [objectHash ] = { 'created' : time . time (), 'sendCount' : 0 , 'peers' : []}
235
242
for thread in threadingEnumerate ():
236
243
if thread .isAlive () and hasattr (thread , 'peer' ) and \
237
- thread .peer not in self .hashes [objectHash ]:
238
- self .hashes [objectHash ].append (thread .peer )
244
+ thread .peer not in self .hashes [objectHash ][ 'peers' ] :
245
+ self .hashes [objectHash ][ 'peers' ] .append (thread .peer )
239
246
# add all objects into the current thread
240
247
else :
241
248
for objectHash in self .hashes :
242
- if current_thread ().peer not in self .hashes [objectHash ]:
243
- self .hashes [objectHash ].append (current_thread ().peer )
249
+ if current_thread ().peer not in self .hashes [objectHash ][ 'peers' ] :
250
+ self .hashes [objectHash ][ 'peers' ] .append (current_thread ().peer )
244
251
245
252
def len (self ):
253
+ self .clearHashes ()
246
254
with self .lock :
247
- return sum (len (self .hashes [x ]) > 0 for x in self .hashes )
255
+ return sum (1
256
+ for x in self .hashes if (self .hashes [x ]['created' ] + self .objectWait < time .time () or
257
+ self .hashes [x ]['sendCount' ] == 0 ))
248
258
249
259
def _progress (self ):
250
260
with self .lock :
251
- return float (sum (len (self .hashes [x ]) for x in self .hashes ))
261
+ return float (sum (len (self .hashes [x ]['peers' ])
262
+ for x in self .hashes if (self .hashes [x ]['created' ] + self .objectWait < time .time ()) or
263
+ self .hashes [x ]['sendCount' ] == 0 ))
252
264
253
- def progress (self , throwDeadline = True ):
265
+ def progress (self , raiseDeadline = True ):
254
266
if self .maxLen < self ._progress ():
255
267
self .maxLen = self ._progress ()
256
268
if self .deadline < time .time ():
257
- if self .deadline > 0 and throwDeadline :
269
+ if self .deadline > 0 and raiseDeadline :
258
270
raise PendingUploadDeadlineException
259
271
self .deadline = time .time () + 20
260
272
try :
261
273
return 1.0 - self ._progress () / self .maxLen
262
274
except ZeroDivisionError :
263
275
return 1.0
264
276
265
- def delete (self , objectHash ):
277
+ def clearHashes (self , objectHash = None ):
278
+ if objectHash is None :
279
+ if self .lastCleared > time .time () - self .clearDelay :
280
+ return
281
+ objects = self .hashes .keys ()
282
+ else :
283
+ objects = objectHash ,
284
+ with self .lock :
285
+ for i in objects :
286
+ try :
287
+ if self .hashes [i ]['sendCount' ] > 0 and (
288
+ len (self .hashes [i ]['peers' ]) == 0 or
289
+ self .hashes [i ]['created' ] + self .objectWait < time .time ()):
290
+ del self .hashes [i ]
291
+ except KeyError :
292
+ pass
293
+ self .lastCleared = time .time ()
294
+
295
+ def delete (self , objectHash = None ):
266
296
if not hasattr (current_thread (), 'peer' ):
267
297
return
298
+ if objectHash is None :
299
+ return
268
300
with self .lock :
269
- if objectHash in self .hashes and current_thread ().peer in self .hashes [objectHash ]:
270
- self .hashes [objectHash ].remove (current_thread ().peer )
271
- if len (self .hashes [objectHash ]) == 0 :
272
- del self .hashes [objectHash ]
301
+ try :
302
+ if objectHash in self .hashes and current_thread ().peer in self .hashes [objectHash ]['peers' ]:
303
+ self .hashes [objectHash ]['sendCount' ] += 1
304
+ self .hashes [objectHash ]['peers' ].remove (current_thread ().peer )
305
+ except KeyError :
306
+ pass
307
+ self .clearHashes (objectHash )
273
308
274
309
def stop (self ):
275
310
with self .lock :
276
311
self .hashes = {}
277
312
278
313
def threadEnd (self ):
279
- while True :
280
- try :
281
- with self .lock :
282
- for objectHash in self .hashes :
283
- if current_thread ().peer in self .hashes [objectHash ]:
284
- self .hashes [objectHash ].remove (current_thread ().peer )
285
- if len (self .hashes [objectHash ]) == 0 :
286
- del self .hashes [objectHash ]
287
- except (KeyError , RuntimeError ):
288
- pass
289
- else :
290
- break
314
+ with self .lock :
315
+ for objectHash in self .hashes :
316
+ try :
317
+ if current_thread ().peer in self .hashes [objectHash ]['peers' ]:
318
+ self .hashes [objectHash ]['peers' ].remove (current_thread ().peer )
319
+ except KeyError :
320
+ pass
321
+ self .clearHashes ()
0 commit comments