From dbe15d0b99249bd26df7c6883029dd3288d63c1d Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 15 Jan 2017 19:50:28 +0100 Subject: [PATCH] Objects to be downloaded fixes - tries to avoid calling senddata it it would block receiveDataThread, allowing fore more asynchronous operation - request objects in chunks of 100 (CPU performance optimisation) --- src/class_receiveDataThread.py | 20 ++++++++++---------- src/inventory.py | 28 +++++++++++++++++----------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 40add534..c3373efe 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -217,16 +217,16 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages - while Missing().len() > 0 and not self.sendDataThreadQueue.full(): - objectHash = Missing().pull() - if objectHash is None: - break - if objectHash in Inventory(): - logger.debug('Inventory already has object listed in inv message.') - Missing().delete(objectHash) - else: - # We don't have the object in our inventory. Let's request it. - self.sendgetdata(objectHash) + if Missing().len() > 0 and self.sendDataThreadQueue.qsize() < 10: + for objectHash in Missing().pull(100): + if self.sendDataThreadQueue.full(): + break + if objectHash in Inventory(): + logger.debug('Inventory already has object listed in inv message.') + Missing().delete(objectHash) + else: + # We don't have the object in our inventory. Let's request it. + self.sendgetdata(objectHash) self.processData() diff --git a/src/inventory.py b/src/inventory.py index 3e00822a..e2a3eb01 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -101,23 +101,29 @@ class Missing(object): with self.lock: return len(self.hashes) - def pull(self): + def pull(self, count=1): + if count < 1: + raise ValueError("Must be at least one") with self.lock: now = time.time() since = now - 300 # once every 5 minutes try: - objectHash = random.choice({k:v for k, v in self.hashes.iteritems() if current_thread().peer in self.hashes[k]['peers'] and self.hashes[k]['requested'] < since}.keys()) + matchingHashes = {k:v for k, v in self.hashes.iteritems() if current_thread().peer in self.hashes[k]['peers'] and self.hashes[k]['requested'] < since} + if count > len(matchingHashes): + count = len(matchingHashes) + objectHashes = random.sample(matchingHashes, count) except (IndexError, KeyError): # list is empty return None - try: - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except ValueError: - pass - if len(self.hashes[objectHash]['peers']) == 0: - self.delete(objectHash) - else: - self.hashes[objectHash]['requested'] = now - return objectHash + for objectHash in objectHashes: + try: + self.hashes[objectHash]['peers'].remove(current_thread().peer) + except ValueError: + pass + if len(self.hashes[objectHash]['peers']) == 0: + self.delete(objectHash) + else: + self.hashes[objectHash]['requested'] = now + return objectHashes def delete(self, objectHash): with self.lock: