From 4f920fe64141d2d15ccaadd7aae70b0f752fdc41 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 16 Jan 2017 23:17:56 +0100 Subject: [PATCH] Fix infinite loop --- src/inventory.py | 56 +++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/src/inventory.py b/src/inventory.py index e10c822e..579c1159 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -118,23 +118,14 @@ class Missing(object): with self.lock: return len(self.hashes) - def removeObjectFromCurrentThread(self, objectHash): - with self.lock: - try: - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except KeyError: - return - except ValueError: - pass - if len(self.hashes[objectHash]['peers']) == 0: - del self.hashes[objectHash] - def pull(self, count=1): if count < 1: raise ValueError("Must be at least one") objectHashes = [] + unreachableObjects = [] if self.stopped: return objectHashes + start = time.time() try: for objectHash in self.hashes.keys(): with self.lock: @@ -146,6 +137,9 @@ class Missing(object): self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ len(self.pending[current_thread().peer]['objects']) >= count: break + if len(self.hashes[objectHash]['peers']) == 0: + unreachableObjects.append(objectHash) + continue # requested too long ago or not at all from any thread if self.hashes[objectHash]['requested'] < time.time() - self.frequency: # ready requested from this thread but haven't received yet @@ -174,24 +168,26 @@ class Missing(object): except (RuntimeError, KeyError, ValueError): # the for cycle sometimes breaks if you remove elements pass + for objectHash in unreachableObjects: + with self.lock: + del self.hashes[objectHash] +# logger.debug("Pull took %.3f seconds", time.time() - start) return objectHashes - def delete(self, objectHash, justReceived=False): + def delete(self, objectHash): with self.lock: if objectHash in self.hashes: del self.hashes[objectHash] self.pending[current_thread().peer]['received'] = time.time() - while True: - try: - for thread in self.pending.keys(): - with self.lock: - if objectHash in self.pending[thread]['objects']: - self.pending[thread]['objects'].remove(objectHash) - except (KeyError, RuntimeError): - pass - else: - break - + toDelete = [] + for thread in self.pending.keys(): + with self.lock: + if objectHash in self.pending[thread]['objects']: + toDelete.append(objectHash) + for thread in toDelete: + with self.lock: + if thread in self.pending: + self.pending[thread]['objects'].remove(objectHash) def stop(self): with self.lock: @@ -201,11 +197,17 @@ class Missing(object): def threadEnd(self): while True: try: - for objectHash in self.hashes: - self.removeObjectFromCurrentThread(objectHash) with self.lock: - del self.pending[current_thread().peer] - except (KeyError, RuntimeError): + if current_thread().peer in self.pending: + for objectHash in self.pending[current_thread().peer]['objects']: + if objectHash in self.hashes: + self.hashes[objectHash]['peers'].remove(current_thread().peer) + except (KeyError): pass else: break + with self.lock: + try: + del self.pending[current_thread().peer] + except KeyError: + pass