diff --git a/minode/connection.py b/minode/connection.py index fbd0306..80bceca 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -408,7 +408,7 @@ class ConnectionBase(threading.Thread): def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: - self.vectors_to_get.difference_update(shared.objects.keys()) + self.vectors_to_get = shared.objects.select(self.vectors_to_get) if not self.wait_until: nodes_count = ( len(shared.node_pool) + len(shared.unchecked_node_pool)) @@ -466,9 +466,7 @@ class Connection(ConnectionBase): def _process_msg_inv(self, m): inv = message.Inv.from_message(m) logging.debug('%s:%s -> %s', self.host_print, self.port, inv) - to_get = inv.vectors.copy() - to_get.difference_update(shared.objects.keys()) - self.vectors_to_get.update(to_get) + self.vectors_to_get.update(shared.objects.select(inv.vectors)) # Do not send objects they already have. self.vectors_to_send.difference_update(inv.vectors) diff --git a/minode/sql.py b/minode/sql.py index ba41fb3..ec84533 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -16,6 +16,7 @@ class Inventory(): def __init__(self): self._lock = threading.Lock() self._deleted = 0 + self._pending = set() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -56,6 +57,10 @@ class Inventory(): return cur def cleanup(self): + if len(self._pending) > 100: + logging.warning( + 'Not cleaning up, %s objects pending', len(self._pending)) + return with self._lock: cur = self._db.execute( 'DELETE FROM objects WHERE expires < ?', @@ -72,7 +77,9 @@ class Inventory(): (int(time.time()),)) self._db.commit() self._deleted = 0 - logging.info('Deleted %s expired objects', cur.rowcount) + logging.info( + 'Deleted %s expired objects, %s pending', + cur.rowcount, len(self._pending)) def filter(self, stream=None, object_type=None, tag=None): clauses = [] @@ -89,6 +96,20 @@ class Inventory(): 'SELECT * FROM objects WHERE ' + ' AND '.join(clauses), params) return self.__objects(cur) + def select(self, vectors): + chunk_size = 999 + keys = tuple(vectors) + with self._lock: + for i in range(0, len(vectors), chunk_size): + chunk = keys[i:i+chunk_size] + cur = self._db.execute( + 'SELECT vector FROM objects WHERE vector IN ({})'.format( + ','.join('?' * len(chunk))), chunk) + for v, in cur: + vectors.remove(v) + self._pending.update(vectors) + return vectors + def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' @@ -154,6 +175,7 @@ class Inventory(): return self._db.commit() self.rowid = cur.lastrowid + self._pending.discard(vector) def __len__(self): cur = self._db.execute('SELECT count(*) FROM objects')