Take into account pending objects in Inventory.cleanup()
This commit is contained in:
parent
60d08571b1
commit
319feeaa97
|
@ -409,7 +409,7 @@ class ConnectionBase(threading.Thread):
|
||||||
|
|
||||||
def _request_objects(self):
|
def _request_objects(self):
|
||||||
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
||||||
self.vectors_to_get.difference_update(shared.objects.keys())
|
self.vectors_to_get = shared.objects.select(self.vectors_to_get)
|
||||||
if not self.wait_until:
|
if not self.wait_until:
|
||||||
nodes_count = (
|
nodes_count = (
|
||||||
len(shared.node_pool) + len(shared.unchecked_node_pool))
|
len(shared.node_pool) + len(shared.unchecked_node_pool))
|
||||||
|
@ -467,9 +467,7 @@ class Connection(ConnectionBase):
|
||||||
def _process_msg_inv(self, m):
|
def _process_msg_inv(self, m):
|
||||||
inv = message.Inv.from_message(m)
|
inv = message.Inv.from_message(m)
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
||||||
to_get = inv.vectors.copy()
|
self.vectors_to_get.update(shared.objects.select(inv.vectors))
|
||||||
to_get.difference_update(shared.objects.keys())
|
|
||||||
self.vectors_to_get.update(to_get)
|
|
||||||
# Do not send objects they already have.
|
# Do not send objects they already have.
|
||||||
self.vectors_to_send.difference_update(inv.vectors)
|
self.vectors_to_send.difference_update(inv.vectors)
|
||||||
|
|
||||||
|
@ -491,6 +489,7 @@ class Connection(ConnectionBase):
|
||||||
logging.debug(dest)
|
logging.debug(dest)
|
||||||
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
||||||
shared.vector_advertise_queue.put(obj.vector)
|
shared.vector_advertise_queue.put(obj.vector)
|
||||||
|
shared.objects.check(obj.vector)
|
||||||
|
|
||||||
def _process_msg_getdata(self, m):
|
def _process_msg_getdata(self, m):
|
||||||
getdata = message.GetData.from_message(m)
|
getdata = message.GetData.from_message(m)
|
||||||
|
|
|
@ -85,6 +85,8 @@ class Manager(threading.Thread):
|
||||||
outgoing_connections = 0
|
outgoing_connections = 0
|
||||||
for c in shared.connections.copy():
|
for c in shared.connections.copy():
|
||||||
if not c.is_alive() or c.status == 'disconnected':
|
if not c.is_alive() or c.status == 'disconnected':
|
||||||
|
shared.objects.check(
|
||||||
|
*(c.vectors_to_get | c.vectors_requested.keys()))
|
||||||
with shared.connections_lock:
|
with shared.connections_lock:
|
||||||
shared.connections.remove(c)
|
shared.connections.remove(c)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -16,6 +16,7 @@ class Inventory():
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._deleted = 0
|
self._deleted = 0
|
||||||
|
self._pending = set()
|
||||||
self._db = sqlite3.connect(
|
self._db = sqlite3.connect(
|
||||||
os.path.join(shared.data_directory, 'objects.dat'),
|
os.path.join(shared.data_directory, 'objects.dat'),
|
||||||
check_same_thread=False
|
check_same_thread=False
|
||||||
|
@ -53,15 +54,27 @@ class Inventory():
|
||||||
expires, obj_type, version, stream, data, offset,
|
expires, obj_type, version, stream, data, offset,
|
||||||
tag=tag, vector=vector)
|
tag=tag, vector=vector)
|
||||||
|
|
||||||
|
def check(self, *vectors):
|
||||||
|
"""Remove given vectors from pending"""
|
||||||
|
with self._lock:
|
||||||
|
for vector in vectors:
|
||||||
|
self._pending.discard(vector)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
"""Remove expired objects"""
|
"""Remove expired objects"""
|
||||||
|
if len(self._pending) > 100:
|
||||||
|
logging.info(
|
||||||
|
'Not cleaning up, %s objects pending', len(self._pending))
|
||||||
|
return
|
||||||
with self._lock:
|
with self._lock:
|
||||||
now = int(time.time())
|
now = int(time.time())
|
||||||
cur = self._db.execute(
|
cur = self._db.execute(
|
||||||
'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,))
|
'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,))
|
||||||
self._db.commit()
|
self._db.commit()
|
||||||
self._deleted += cur.rowcount
|
self._deleted += cur.rowcount
|
||||||
logging.debug('Deleted %s expired objects', cur.rowcount)
|
(logging.info if self._pending else logging.debug)(
|
||||||
|
'Deleted %s expired objects, %s pending',
|
||||||
|
cur.rowcount, len(self._pending))
|
||||||
# conditional vacuum and validity check (TODO)
|
# conditional vacuum and validity check (TODO)
|
||||||
# every 24 hours or after deleting a lot of items
|
# every 24 hours or after deleting a lot of items
|
||||||
if self._deleted > 10000 or self.lastvacuumtime < now - 86400:
|
if self._deleted > 10000 or self.lastvacuumtime < now - 86400:
|
||||||
|
@ -90,6 +103,22 @@ class Inventory():
|
||||||
+ ' AND '.join(clauses), params)
|
+ ' AND '.join(clauses), params)
|
||||||
return cur
|
return cur
|
||||||
|
|
||||||
|
def select(self, vectors):
|
||||||
|
"""Select new vectors from the given set"""
|
||||||
|
chunk_size = 999
|
||||||
|
keys = tuple(vectors)
|
||||||
|
with self._lock:
|
||||||
|
for i in range(0, len(vectors), chunk_size):
|
||||||
|
chunk = keys[i:i+chunk_size]
|
||||||
|
cur = self._db.execute(
|
||||||
|
'SELECT vector FROM objects WHERE vector IN' # nosec B608
|
||||||
|
' ({})'.format(','.join('?' * len(chunk))),
|
||||||
|
chunk)
|
||||||
|
for v, in cur:
|
||||||
|
vectors.remove(v)
|
||||||
|
self._pending.update(vectors)
|
||||||
|
return vectors
|
||||||
|
|
||||||
def vectors_to_send(self, stream=None):
|
def vectors_to_send(self, stream=None):
|
||||||
cur = self._db.execute(
|
cur = self._db.execute(
|
||||||
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
|
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
|
||||||
|
|
Loading…
Reference in New Issue
Block a user