diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index c3373efe..622c71ca 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -217,16 +217,15 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages - if Missing().len() > 0 and self.sendDataThreadQueue.qsize() < 10: - for objectHash in Missing().pull(100): - if self.sendDataThreadQueue.full(): - break - if objectHash in Inventory(): - logger.debug('Inventory already has object listed in inv message.') - Missing().delete(objectHash) - else: - # We don't have the object in our inventory. Let's request it. - self.sendgetdata(objectHash) + for objectHash in Missing().pull(100): + if self.sendDataThreadQueue.full(): + break + if objectHash in Inventory(): + logger.debug('Inventory already has object listed in inv message.') + Missing().delete(objectHash) + else: + # We don't have the object in our inventory. Let's request it. + self.sendgetdata(objectHash) self.processData() diff --git a/src/inventory.py b/src/inventory.py index 8530a44f..8b2f6e26 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -37,7 +37,7 @@ class Inventory(collections.MutableMapping): value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) - Missing().delete(hash) + Missing().delete(hash, True) def __delitem__(self, hash): raise NotImplementedError @@ -92,15 +92,25 @@ class Missing(object): self.hashes = {} self.stopped = False self.frequency = 60 + self.pending = {} def add(self, objectHash): if self.stopped: return with self.lock: - if not objectHash in self.hashes: + if objectHash not in self.hashes: self.hashes[objectHash] = {'peers':[], 'requested':0} self.hashes[objectHash]['peers'].append(current_thread().peer) + def addPending(self, objectHash=None): + if self.stopped: + return + if current_thread().peer not in self.pending: + self.pending[current_thread().peer] = {'objects':[], 'requested':0, 'received':0} + if objectHash not in self.pending[current_thread().peer]['objects'] and not objectHash is None: + self.pending[current_thread().peer]['objects'].append(objectHash) + self.pending[current_thread().peer]['requested'] = time.time() + def len(self): with self.lock: return len(self.hashes) @@ -122,33 +132,54 @@ class Missing(object): if count < 1: raise ValueError("Must be at least one") objectHashes = [] + if self.stopped: + return objectHashes try: for objectHash in self.hashes.keys(): + if len(objectHashes) >= count: + break + if current_thread().peer not in self.pending: + self.addPending() + if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \ + self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ + len(self.pending[current_thread().peer]['objects']) >= count: + break + # requested too long ago or not at all if self.hashes[objectHash]['requested'] < time.time() - self.frequency: - if len(self.hashes[objectHash]['peers']) == 0: - self.removeObjectFromCurrentThread(objectHash) + # already requested from this thread but haven't received yet + if objectHash in self.pending[current_thread().peer]['objects']: + if self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or self.pending[current_thread().peer]['received'] >= time.time() - self.frequency: + continue + elif current_thread().peer not in self.hashes[objectHash]['peers']: continue - if current_thread().peer in self.hashes[objectHash]['peers']: - objectHashes.append(objectHash) - self.hashes[objectHash]['requested'] = time.time() - self.removeObjectFromCurrentThread(objectHash) - if len(objectHashes) >= count: - break + objectHashes.append(objectHash) + self.hashes[objectHash]['requested'] = time.time() + self.pending[current_thread().peer]['requested'] = time.time() + self.addPending(objectHash) except (RuntimeError, KeyError, ValueError): # the for cycle sometimes breaks if you remove elements pass return objectHashes - def delete(self, objectHash): + def delete(self, objectHash, justReceived=False): with self.lock: if objectHash in self.hashes: del self.hashes[objectHash] + if objectHash in self.pending[current_thread().peer]['objects']: + self.pending[current_thread().peer]['objects'].remove(objectHash) + if justReceived: + self.pending[current_thread().peer]['received'] = time.time() def stop(self): with self.lock: self.hashes = {} + self.pending = {} def threadEnd(self): with self.lock: for objectHash in self.hashes: self.removeObjectFromCurrentThread(objectHash) + try: + del self.pending[current_thread().peer] + except KeyError: + pass