From 913b401dd0a01bc4b938a1088b73dd2ab04ee7c2 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 20 Mar 2017 01:22:37 +0100 Subject: [PATCH] PendingDownloadQueue updates - track pending hashId more accurately - add timeout and a cleanup so that the download queues don't get stuck and memory is freed - randomise download order (only works for inv commands with more than 1 entry) --- src/class_receiveDataThread.py | 8 ++++---- src/class_singleCleaner.py | 5 +++++ src/inventory.py | 30 +++++++++++++++++++++++------- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index df7d18b5..aeb38e78 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -240,14 +240,14 @@ class receiveDataThread(threading.Thread): if self.data == '': # if there are no more messages toRequest = [] try: - for i in range(self.downloadQueue.pendingSize, 100): + for i in range(len(self.downloadQueue.pending), 100): while True: hashId = self.downloadQueue.get(False) if not hashId in Inventory(): toRequest.append(hashId) break # don't track download for duplicates - self.downloadQueue.task_done() + self.downloadQueue.task_done(hashId) except Queue.Empty: pass if len(toRequest) > 0: @@ -484,7 +484,7 @@ class receiveDataThread(threading.Thread): def recobject(self, data): self.messageProcessingStartTime = time.time() lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data) - self.downloadQueue.task_done() + self.downloadQueue.task_done(calculateInventoryHash(data)) """ Sleeping will help guarantee that we can process messages faster than a @@ -517,7 +517,7 @@ class receiveDataThread(threading.Thread): for stream in self.streamNumber: objectsNewToMe -= Inventory().hashes_by_stream(stream) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) - for item in objectsNewToMe: + for item in random.sample(objectsNewToMe, len(objectsNewToMe)): self.downloadQueue.put(item) # Send a getdata message to our peer to request the object with the given diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 921e84ed..1ed7a5e4 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -108,6 +108,11 @@ class singleCleaner(threading.Thread, StoppableThread): os._exit(0) shared.needToWriteKnownNodesToDisk = False + # clear download queues + for thread in threading.enumerate(): + if thread.isAlive() and hasattr(thread, 'downloadQueue'): + thread.downloadQueue.clear() + # TODO: cleanup pending upload / download if state.shutdown == 0: diff --git a/src/inventory.py b/src/inventory.py index 1982dacf..a796968a 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -86,29 +86,44 @@ class Inventory(collections.MutableMapping): class PendingDownloadQueue(Queue.Queue): # keep a track of objects that have been advertised to us but we haven't downloaded them yet + maxWait = 300 + def __init__(self, maxsize=0): Queue.Queue.__init__(self, maxsize) self.stopped = False - self.pendingSize = 0 + self.pending = {} + self.lock = RLock() - def task_done(self): + def task_done(self, hashId): Queue.Queue.task_done(self) - if self.pendingSize > 0: - self.pendingSize -= 1 + try: + with self.lock: + del self.pending[hashId] + except KeyError: + pass def get(self, block=True, timeout=None): retval = Queue.Queue.get(self, block, timeout) # no exception was raised if not self.stopped: - self.pendingSize += 1 + with self.lock: + self.pending[retval] = time.time() return retval + def clear(self): + with self.lock: + newPending = {} + for hashId in self.pending: + if self.pending[hashId] + PendingDownloadQueue.maxWait > time.time(): + newPending[hashId] = self.pending[hashId] + self.pending = newPending + @staticmethod def totalSize(): size = 0 for thread in threadingEnumerate(): if thread.isAlive() and hasattr(thread, 'downloadQueue'): - size += thread.downloadQueue.qsize() + thread.downloadQueue.pendingSize + size += thread.downloadQueue.qsize() + len(thread.downloadQueue.pending) return size @staticmethod @@ -116,7 +131,8 @@ class PendingDownloadQueue(Queue.Queue): for thread in threadingEnumerate(): if thread.isAlive() and hasattr(thread, 'downloadQueue'): thread.downloadQueue.stopped = True - thread.downloadQueue.pendingSize = 0 + with thread.downloadQueue.lock: + thread.downloadQueue.pending = {} class PendingUploadDeadlineException(Exception):