diff --git a/src/inventory.py b/src/inventory.py index b4fcce6d..77291a5c 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,5 +1,4 @@ import collections -import random from threading import current_thread, RLock import time @@ -38,6 +37,7 @@ class Inventory(collections.MutableMapping): value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) + Missing().delete(hash) def __delitem__(self, hash): raise NotImplementedError @@ -101,18 +101,27 @@ class Missing(object): with self.lock: return len(self.hashes) + def removeObjectFromCurrentThread(self, objectHash): + with self.lock: + try: + self.hashes[objectHash]['peers'].remove(current_thread().peer) + except ValueError: + pass + if len(self.hashes[objectHash]['peers']) == 0: + self.delete(objectHash) + else: + self.hashes[objectHash]['requested'] = time.time() + def pull(self, count=1): if count < 1: raise ValueError("Must be at least one") with self.lock: - now = time.time() - since = now - 60 # once every minute + since = time.time() - 60 # once every minute objectHashes = [] for objectHash in self.hashes.keys(): if current_thread().peer in self.hashes[objectHash]['peers'] and self.hashes[objectHash]['requested'] < since: objectHashes.append(objectHash) - self.hashes[objectHash]['peers'].remove(current_thread().peer) - self.hashes[objectHash]['requested'] = now + self.removeObjectFromCurrentThread(objectHash) if len(objectHashes) >= count: break return objectHashes @@ -125,9 +134,4 @@ class Missing(object): def threadEnd(self): with self.lock: for objectHash in self.hashes: - try: - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except ValueError: - pass - -# current_thread().peer + self.removeObjectFromCurrentThread(objectHash) diff --git a/src/shared.py b/src/shared.py index 33b1dc33..022fa4b3 100644 --- a/src/shared.py +++ b/src/shared.py @@ -27,7 +27,7 @@ import highlevelcrypto #import helper_startup from helper_sql import * from helper_threading import * -from inventory import Inventory, Missing +from inventory import Inventory import protocol import state @@ -474,7 +474,6 @@ def _checkAndShareUndefinedObjectWithPeers(data): return inventoryHash = calculateInventoryHash(data) - Missing().delete(inventoryHash) if inventoryHash in Inventory(): logger.debug('We have already received this undefined object. Ignoring.') return @@ -498,7 +497,6 @@ def _checkAndShareMsgWithPeers(data): return readPosition += streamNumberLength inventoryHash = calculateInventoryHash(data) - Missing().delete(inventoryHash) if inventoryHash in Inventory(): logger.debug('We have already received this msg message. Ignoring.') return @@ -531,7 +529,6 @@ def _checkAndShareGetpubkeyWithPeers(data): readPosition += streamNumberLength inventoryHash = calculateInventoryHash(data) - Missing().delete(inventoryHash) if inventoryHash in Inventory(): logger.debug('We have already received this getpubkey request. Ignoring it.') return @@ -567,7 +564,6 @@ def _checkAndSharePubkeyWithPeers(data): tag = '' inventoryHash = calculateInventoryHash(data) - Missing().delete(inventoryHash) if inventoryHash in Inventory(): logger.debug('We have already received this pubkey. Ignoring it.') return @@ -603,7 +599,6 @@ def _checkAndShareBroadcastWithPeers(data): else: tag = '' inventoryHash = calculateInventoryHash(data) - Missing().delete(inventoryHash) if inventoryHash in Inventory(): logger.debug('We have already received this broadcast object. Ignoring.') return