From 9f89df6d1cbcafc5feaf4271cfacb4abdc6d740c Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 16 Jan 2017 17:08:47 +0100 Subject: [PATCH] Expire objects that we're unable to receive - sometimes a node would send an "inv" about an object but then not provide it when requested. This could be that it expired in the meantime or it was an attack or a bug. This patch will forget that the object exists if was requested too many times and not received. --- src/inventory.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/inventory.py b/src/inventory.py index 8b2f6e26..8a747d4b 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -91,7 +91,10 @@ class Missing(object): self.lock = RLock() self.hashes = {} self.stopped = False + # don't request the same object more frequently than this self.frequency = 60 + # after requesting and not receiving an object more than this times, consider it expired + self.maxRequestCount = 6 self.pending = {} def add(self, objectHash): @@ -99,7 +102,7 @@ class Missing(object): return with self.lock: if objectHash not in self.hashes: - self.hashes[objectHash] = {'peers':[], 'requested':0} + self.hashes[objectHash] = {'peers':[], 'requested':0, 'requestedCount':0} self.hashes[objectHash]['peers'].append(current_thread().peer) def addPending(self, objectHash=None): @@ -144,16 +147,31 @@ class Missing(object): self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ len(self.pending[current_thread().peer]['objects']) >= count: break - # requested too long ago or not at all + # requested too long ago or not at all from any thread if self.hashes[objectHash]['requested'] < time.time() - self.frequency: - # already requested from this thread but haven't received yet + # ready requested from this thread but haven't received yet if objectHash in self.pending[current_thread().peer]['objects']: - if self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or self.pending[current_thread().peer]['received'] >= time.time() - self.frequency: + # if still sending or receiving, request next + if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \ + self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency: continue + # haven't requested or received anything recently, re-request (i.e. continue) + # the current node doesn't have the object elif current_thread().peer not in self.hashes[objectHash]['peers']: continue + # already requested too many times, remove all signs of this object + if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount: + with self.lock: + del self.hashes[objectHash] + for thread in self.pending.keys(): + if objectHash in self.pending[thread]['objects']: + self.pending[thread]['objects'].remove(objectHash) + continue + # all ok, request objectHashes.append(objectHash) self.hashes[objectHash]['requested'] = time.time() + with self.lock: + self.hashes[objectHash]['requestedCount'] += 1 self.pending[current_thread().peer]['requested'] = time.time() self.addPending(objectHash) except (RuntimeError, KeyError, ValueError):