diff --git a/minode/connection.py b/minode/connection.py index c18a619..6eeec6e 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -408,7 +408,7 @@ class ConnectionBase(threading.Thread): def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: - self.vectors_to_get.difference_update(shared.objects.keys()) + self.vectors_to_get = shared.objects.select(self.vectors_to_get) if not self.wait_until: nodes_count = ( len(shared.node_pool) + len(shared.unchecked_node_pool)) @@ -466,9 +466,7 @@ class Connection(ConnectionBase): def _process_msg_inv(self, m): inv = message.Inv.from_message(m) logging.debug('%s:%s -> %s', self.host_print, self.port, inv) - to_get = inv.vectors.copy() - to_get.difference_update(shared.objects.keys()) - self.vectors_to_get.update(to_get) + self.vectors_to_get.update(shared.objects.select(inv.vectors)) # Do not send objects they already have. self.vectors_to_send.difference_update(inv.vectors) @@ -490,6 +488,7 @@ class Connection(ConnectionBase): logging.debug(dest) shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.vector_advertise_queue.put(obj.vector) + shared.objects.check(obj.vector) def _process_msg_getdata(self, m): getdata = message.GetData.from_message(m) diff --git a/minode/manager.py b/minode/manager.py index b4631ed..c5ba1c7 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -55,6 +55,8 @@ class Manager(threading.Thread): outgoing_connections = 0 for c in shared.connections.copy(): if not c.is_alive() or c.status == 'disconnected': + shared.objects.check( + *(c.vectors_to_get | c.vectors_requested.keys())) with shared.connections_lock: shared.connections.remove(c) else: diff --git a/minode/sql.py b/minode/sql.py index dbac43e..8f24fcc 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -16,6 +16,7 @@ class Inventory(): def __init__(self): self._lock = threading.Lock() self._deleted = 0 + self._pending = set() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -53,14 +54,25 @@ class Inventory(): expires, obj_type, version, stream, data, offset, tag=tag, vector=vector) + def check(self, *vectors): + with self._lock: + for vector in vectors: + self._pending.discard(vector) + def cleanup(self): + if len(self._pending) > 100: + logging.warning( + 'Not cleaning up, %s objects pending', len(self._pending)) + return with self._lock: now = int(time.time()) cur = self._db.execute( 'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,)) self._db.commit() self._deleted += cur.rowcount - logging.info('Deleted %s expired objects', cur.rowcount) + logging.info( + 'Deleted %s expired objects, %s pending', + cur.rowcount, len(self._pending)) # conditional vacuum and validity check (TODO) # every 24 hours or after deleting a lot of items if self._deleted > 10000 or self.lastvacuumtime < now - 86400: @@ -88,6 +100,21 @@ class Inventory(): + ' AND '.join(clauses), params) return cur + def select(self, vectors): + chunk_size = 999 + keys = tuple(vectors) + with self._lock: + for i in range(0, len(vectors), chunk_size): + chunk = keys[i:i+chunk_size] + cur = self._db.execute( + 'SELECT vector FROM objects WHERE vector IN' # nosec B608 + ' ({})'.format(','.join('?' * len(chunk))), + chunk) + for v, in cur: + vectors.remove(v) + self._pending.update(vectors) + return vectors + def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?'