From 2ce73f43217322d9f5a161a80887a829c6425171 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Thu, 12 Sep 2024 20:26:57 +0300 Subject: [PATCH] Take into account pending objects in Inventory.cleanup() --- minode/connection.py | 6 ++---- minode/sql.py | 25 ++++++++++++++++++++++++- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index 115eef5..5d35017 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -408,7 +408,7 @@ class ConnectionBase(threading.Thread): def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: - self.vectors_to_get.difference_update(shared.objects.keys()) + self.vectors_to_get = shared.objects.select(self.vectors_to_get) if not self.wait_until: nodes_count = ( len(shared.node_pool) + len(shared.unchecked_node_pool)) @@ -466,9 +466,7 @@ class Connection(ConnectionBase): def _process_msg_inv(self, m): inv = message.Inv.from_message(m) logging.debug('%s:%s -> %s', self.host_print, self.port, inv) - to_get = inv.vectors.copy() - to_get.difference_update(shared.objects.keys()) - self.vectors_to_get.update(to_get) + self.vectors_to_get.update(shared.objects.select(inv.vectors)) # Do not send objects they already have. self.vectors_to_send.difference_update(inv.vectors) diff --git a/minode/sql.py b/minode/sql.py index 3df8423..906d6ea 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -16,6 +16,7 @@ class Inventory(): def __init__(self): self._lock = threading.Lock() self._deleted = 0 + self._pending = set() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -55,6 +56,10 @@ class Inventory(): tag=tag, vector=vector) def cleanup(self): + if len(self._pending) > 100: + logging.warning( + 'Not cleaning up, %s objects pending', len(self._pending)) + return with self._lock: cur = self._db.execute( 'DELETE FROM objects WHERE expires < ?', @@ -71,7 +76,9 @@ class Inventory(): (int(time.time()),)) self._db.commit() self._deleted = 0 - logging.info('Deleted %s expired objects', cur.rowcount) + logging.info( + 'Deleted %s expired objects, %s pending', + cur.rowcount, len(self._pending)) def filter(self, stream=None, object_type=None, tag=None): clauses = [] @@ -89,6 +96,21 @@ class Inventory(): + ' AND '.join(clauses), params) return cur + def select(self, vectors): + chunk_size = 999 + keys = tuple(vectors) + with self._lock: + for i in range(0, len(vectors), chunk_size): + chunk = keys[i:i+chunk_size] + cur = self._db.execute( + 'SELECT vector FROM objects WHERE vector IN' # nosec B608 + ' ({})'.format(','.join('?' * len(chunk))), + chunk) + for v, in cur: + vectors.remove(v) + self._pending.update(vectors) + return vectors + def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' @@ -153,6 +175,7 @@ class Inventory(): return self._db.commit() self.rowid = cur.lastrowid + self._pending.discard(vector) def __bool__(self): return self._db.execute(