From 15077c9388d89d67d571ee612da4a0e35747fd41 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 1 Mar 2017 10:05:08 +0100 Subject: [PATCH] More accurate PendingUpload tracking - works correctly when starting offline - stops tracking after after 60 seconds but only if at least 1 successful upload --- src/class_sendDataThread.py | 6 +-- src/inventory.py | 83 +++++++++++++++++++++++++------------ 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 5ddb3c11..7de63a02 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -187,12 +187,12 @@ class sendDataThread(threading.Thread): logger.error('send pong failed') break elif command == 'sendRawData': - hash = None + objectHash = None if type(data) in [list, tuple]: - hash, data = data + objectHash, data = data try: self.sendBytes(data) - PendingUpload().delete(hash) + PendingUpload().delete(objectHash) except: logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.', exc_info=True) break diff --git a/src/inventory.py b/src/inventory.py index 4424942f..1087e655 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -225,36 +225,48 @@ class PendingUpload(object): # end by this time in any case self.deadline = 0 self.maxLen = 0 + # during shutdown, wait up to 20 seconds to finish uploading + self.shutdownWait = 20 + # forget tracking objects after 60 seconds + self.objectWait = 60 + # wait 10 seconds between clears + self.clearDelay = 10 + self.lastCleared = time.time() def add(self, objectHash = None): with self.lock: # add a new object into existing thread lists if objectHash: if objectHash not in self.hashes: - self.hashes[objectHash] = [] + self.hashes[objectHash] = {'created': time.time(), 'sendCount': 0, 'peers': []} for thread in threadingEnumerate(): if thread.isAlive() and hasattr(thread, 'peer') and \ - thread.peer not in self.hashes[objectHash]: - self.hashes[objectHash].append(thread.peer) + thread.peer not in self.hashes[objectHash]['peers']: + self.hashes[objectHash]['peers'].append(thread.peer) # add all objects into the current thread else: for objectHash in self.hashes: - if current_thread().peer not in self.hashes[objectHash]: - self.hashes[objectHash].append(current_thread().peer) + if current_thread().peer not in self.hashes[objectHash]['peers']: + self.hashes[objectHash]['peers'].append(current_thread().peer) def len(self): + self.clearHashes() with self.lock: - return sum(len(self.hashes[x]) > 0 for x in self.hashes) + return sum(1 + for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time() or + self.hashes[x]['sendCount'] == 0)) def _progress(self): with self.lock: - return float(sum(len(self.hashes[x]) for x in self.hashes)) + return float(sum(len(self.hashes[x]['peers']) + for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time()) or + self.hashes[x]['sendCount'] == 0)) - def progress(self, throwDeadline=True): + def progress(self, raiseDeadline=True): if self.maxLen < self._progress(): self.maxLen = self._progress() if self.deadline < time.time(): - if self.deadline > 0 and throwDeadline: + if self.deadline > 0 and raiseDeadline: raise PendingUploadDeadlineException self.deadline = time.time() + 20 try: @@ -262,29 +274,48 @@ class PendingUpload(object): except ZeroDivisionError: return 1.0 - def delete(self, objectHash): + def clearHashes(self, objectHash=None): + if objectHash is None: + if self.lastCleared > time.time() - self.clearDelay: + return + objects = self.hashes.keys() + else: + objects = objectHash, + with self.lock: + for i in objects: + try: + if self.hashes[i]['sendCount'] > 0 and ( + len(self.hashes[i]['peers']) == 0 or + self.hashes[i]['created'] + self.objectWait < time.time()): + del self.hashes[i] + except KeyError: + pass + self.lastCleared = time.time() + + def delete(self, objectHash=None): if not hasattr(current_thread(), 'peer'): return + if objectHash is None: + return with self.lock: - if objectHash in self.hashes and current_thread().peer in self.hashes[objectHash]: - self.hashes[objectHash].remove(current_thread().peer) - if len(self.hashes[objectHash]) == 0: - del self.hashes[objectHash] + try: + if objectHash in self.hashes and current_thread().peer in self.hashes[objectHash]['peers']: + self.hashes[objectHash]['sendCount'] += 1 + self.hashes[objectHash]['peers'].remove(current_thread().peer) + except KeyError: + pass + self.clearHashes(objectHash) def stop(self): with self.lock: self.hashes = {} def threadEnd(self): - while True: - try: - with self.lock: - for objectHash in self.hashes: - if current_thread().peer in self.hashes[objectHash]: - self.hashes[objectHash].remove(current_thread().peer) - if len(self.hashes[objectHash]) == 0: - del self.hashes[objectHash] - except (KeyError, RuntimeError): - pass - else: - break + with self.lock: + for objectHash in self.hashes: + try: + if current_thread().peer in self.hashes[objectHash]['peers']: + self.hashes[objectHash]['peers'].remove(current_thread().peer) + except KeyError: + pass + self.clearHashes()