Take into account pending objects in Inventory.cleanup()

This commit is contained in:
Lee Miller 2024-09-12 20:26:57 +03:00
parent c87d248283
commit 2ce73f4321
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
2 changed files with 26 additions and 5 deletions

View File

@ -408,7 +408,7 @@ class ConnectionBase(threading.Thread):
def _request_objects(self): def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100: if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys()) self.vectors_to_get = shared.objects.select(self.vectors_to_get)
if not self.wait_until: if not self.wait_until:
nodes_count = ( nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool)) len(shared.node_pool) + len(shared.unchecked_node_pool))
@ -466,9 +466,7 @@ class Connection(ConnectionBase):
def _process_msg_inv(self, m): def _process_msg_inv(self, m):
inv = message.Inv.from_message(m) inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv) logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy() self.vectors_to_get.update(shared.objects.select(inv.vectors))
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have. # Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors) self.vectors_to_send.difference_update(inv.vectors)

View File

@ -16,6 +16,7 @@ class Inventory():
def __init__(self): def __init__(self):
self._lock = threading.Lock() self._lock = threading.Lock()
self._deleted = 0 self._deleted = 0
self._pending = set()
self._db = sqlite3.connect( self._db = sqlite3.connect(
os.path.join(shared.data_directory, 'objects.dat'), os.path.join(shared.data_directory, 'objects.dat'),
check_same_thread=False check_same_thread=False
@ -55,6 +56,10 @@ class Inventory():
tag=tag, vector=vector) tag=tag, vector=vector)
def cleanup(self): def cleanup(self):
if len(self._pending) > 100:
logging.warning(
'Not cleaning up, %s objects pending', len(self._pending))
return
with self._lock: with self._lock:
cur = self._db.execute( cur = self._db.execute(
'DELETE FROM objects WHERE expires < ?', 'DELETE FROM objects WHERE expires < ?',
@ -71,7 +76,9 @@ class Inventory():
(int(time.time()),)) (int(time.time()),))
self._db.commit() self._db.commit()
self._deleted = 0 self._deleted = 0
logging.info('Deleted %s expired objects', cur.rowcount) logging.info(
'Deleted %s expired objects, %s pending',
cur.rowcount, len(self._pending))
def filter(self, stream=None, object_type=None, tag=None): def filter(self, stream=None, object_type=None, tag=None):
clauses = [] clauses = []
@ -89,6 +96,21 @@ class Inventory():
+ ' AND '.join(clauses), params) + ' AND '.join(clauses), params)
return cur return cur
def select(self, vectors):
chunk_size = 999
keys = tuple(vectors)
with self._lock:
for i in range(0, len(vectors), chunk_size):
chunk = keys[i:i+chunk_size]
cur = self._db.execute(
'SELECT vector FROM objects WHERE vector IN' # nosec B608
' ({})'.format(','.join('?' * len(chunk))),
chunk)
for v, in cur:
vectors.remove(v)
self._pending.update(vectors)
return vectors
def vectors_to_send(self, stream=None): def vectors_to_send(self, stream=None):
cur = self._db.execute( cur = self._db.execute(
'SELECT vector FROM objects WHERE expires > ? AND stream = ?' 'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
@ -153,6 +175,7 @@ class Inventory():
return return
self._db.commit() self._db.commit()
self.rowid = cur.lastrowid self.rowid = cur.lastrowid
self._pending.discard(vector)
def __bool__(self): def __bool__(self):
return self._db.execute( return self._db.execute(