Prevent stuck vectors in objects._pending

This commit is contained in:
Lee Miller 2024-10-15 00:11:51 +03:00
parent db5142c7dd
commit ac5b6fa608
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
3 changed files with 8 additions and 1 deletions

View File

@ -485,6 +485,7 @@ class Connection(ConnectionBase):
logging.debug(dest) logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector) shared.vector_advertise_queue.put(obj.vector)
shared.objects.check(obj.vector)
def _process_msg_getdata(self, m): def _process_msg_getdata(self, m):
getdata = message.GetData.from_message(m) getdata = message.GetData.from_message(m)

View File

@ -56,6 +56,8 @@ class Manager(threading.Thread):
outgoing_connections = 0 outgoing_connections = 0
for c in shared.connections.copy(): for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected': if not c.is_alive() or c.status == 'disconnected':
shared.objects.check(
*(c.vectors_to_get | c.vectors_requested.keys()))
with shared.connections_lock: with shared.connections_lock:
shared.connections.remove(c) shared.connections.remove(c)
else: else:

View File

@ -56,6 +56,11 @@ class Inventory():
expires, obj_type, version, stream, data, offset, expires, obj_type, version, stream, data, offset,
tag=tag, vector=vector) tag=tag, vector=vector)
def check(self, *vectors):
with self._lock:
for vector in vectors:
self._pending.discard(vector)
def cleanup(self): def cleanup(self):
if len(self._pending) > 100: if len(self._pending) > 100:
logging.warning( logging.warning(
@ -230,7 +235,6 @@ class Inventory():
return return
with self._lock: with self._lock:
self._last[vector] = obj self._last[vector] = obj
self._pending.discard(vector)
def __bool__(self): def __bool__(self):
if self._last: if self._last: