Take into account pending objects in Inventory.cleanup()
This commit is contained in:
parent
db30c1ed7b
commit
2aa5f0039b
|
@ -408,7 +408,7 @@ class ConnectionBase(threading.Thread):
|
||||||
|
|
||||||
def _request_objects(self):
|
def _request_objects(self):
|
||||||
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
||||||
self.vectors_to_get.difference_update(shared.objects.keys())
|
self.vectors_to_get = shared.objects.select(self.vectors_to_get)
|
||||||
if not self.wait_until:
|
if not self.wait_until:
|
||||||
nodes_count = (
|
nodes_count = (
|
||||||
len(shared.node_pool) + len(shared.unchecked_node_pool))
|
len(shared.node_pool) + len(shared.unchecked_node_pool))
|
||||||
|
@ -466,9 +466,7 @@ class Connection(ConnectionBase):
|
||||||
def _process_msg_inv(self, m):
|
def _process_msg_inv(self, m):
|
||||||
inv = message.Inv.from_message(m)
|
inv = message.Inv.from_message(m)
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
||||||
to_get = inv.vectors.copy()
|
self.vectors_to_get.update(shared.objects.select(inv.vectors))
|
||||||
to_get.difference_update(shared.objects.keys())
|
|
||||||
self.vectors_to_get.update(to_get)
|
|
||||||
# Do not send objects they already have.
|
# Do not send objects they already have.
|
||||||
self.vectors_to_send.difference_update(inv.vectors)
|
self.vectors_to_send.difference_update(inv.vectors)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ class Inventory():
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._deleted = 0
|
self._deleted = 0
|
||||||
|
self._pending = set()
|
||||||
self._db = sqlite3.connect(
|
self._db = sqlite3.connect(
|
||||||
os.path.join(shared.data_directory, 'objects.dat'),
|
os.path.join(shared.data_directory, 'objects.dat'),
|
||||||
check_same_thread=False
|
check_same_thread=False
|
||||||
|
@ -54,6 +55,10 @@ class Inventory():
|
||||||
tag=tag, vector=vector)
|
tag=tag, vector=vector)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
if len(self._pending) > 100:
|
||||||
|
logging.warning(
|
||||||
|
'Not cleaning up, %s objects pending', len(self._pending))
|
||||||
|
return
|
||||||
with self._lock:
|
with self._lock:
|
||||||
cur = self._db.execute(
|
cur = self._db.execute(
|
||||||
'DELETE FROM objects WHERE expires < ?',
|
'DELETE FROM objects WHERE expires < ?',
|
||||||
|
@ -70,7 +75,9 @@ class Inventory():
|
||||||
(int(time.time()),))
|
(int(time.time()),))
|
||||||
self._db.commit()
|
self._db.commit()
|
||||||
self._deleted = 0
|
self._deleted = 0
|
||||||
logging.info('Deleted %s expired objects', cur.rowcount)
|
logging.info(
|
||||||
|
'Deleted %s expired objects, %s pending',
|
||||||
|
cur.rowcount, len(self._pending))
|
||||||
|
|
||||||
def filter(self, stream=None, object_type=None, tag=None):
|
def filter(self, stream=None, object_type=None, tag=None):
|
||||||
clauses = []
|
clauses = []
|
||||||
|
@ -87,6 +94,20 @@ class Inventory():
|
||||||
'SELECT * FROM objects WHERE ' + ' AND '.join(clauses), params)
|
'SELECT * FROM objects WHERE ' + ' AND '.join(clauses), params)
|
||||||
return cur
|
return cur
|
||||||
|
|
||||||
|
def select(self, vectors):
|
||||||
|
chunk_size = 999
|
||||||
|
keys = tuple(vectors)
|
||||||
|
with self._lock:
|
||||||
|
for i in range(0, len(vectors), chunk_size):
|
||||||
|
chunk = keys[i:i+chunk_size]
|
||||||
|
cur = self._db.execute(
|
||||||
|
'SELECT vector FROM objects WHERE vector IN ({})'.format(
|
||||||
|
','.join('?' * len(chunk))), chunk)
|
||||||
|
for v, in cur:
|
||||||
|
vectors.remove(v)
|
||||||
|
self._pending.update(vectors)
|
||||||
|
return vectors
|
||||||
|
|
||||||
def vectors_to_send(self, stream=None):
|
def vectors_to_send(self, stream=None):
|
||||||
cur = self._db.execute(
|
cur = self._db.execute(
|
||||||
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
|
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
|
||||||
|
@ -151,6 +172,7 @@ class Inventory():
|
||||||
return
|
return
|
||||||
self._db.commit()
|
self._db.commit()
|
||||||
self.rowid = cur.lastrowid
|
self.rowid = cur.lastrowid
|
||||||
|
self._pending.discard(vector)
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
return self._db.execute(
|
return self._db.execute(
|
||||||
|
|
Loading…
Reference in New Issue
Block a user