Make objects.vectors_to_send() a generator of chunks

This commit is contained in:
Lee Miller 2024-10-13 05:09:47 +03:00
parent f9e6104a56
commit 5f861cb5a5
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
3 changed files with 17 additions and 11 deletions

View File

@ -278,14 +278,11 @@ class ConnectionBase(threading.Thread):
self.send_queue.put(message.Addr(addr))
if shared.objects:
to_send = shared.objects.vectors_to_send()
offset = 0
while offset < len(to_send):
for chunk in shared.objects.vectors_to_send(10000):
# We limit size of inv messaged to 10000 entries
# because they might time out
# in very slow networks (I2P)
self.send_queue.put(message.Inv(to_send[offset:offset+10000]))
offset += 10000
self.send_queue.put(message.Inv(chunk))
self.status = 'fully_established'
def _process_queue(self):

View File

@ -115,13 +115,20 @@ class Inventory():
self._pending.update(vectors)
return vectors
def vectors_to_send(self, stream=None):
def vectors_to_send(self, chunk_size=10000, stream=None):
if stream is None:
stream = shared.stream
now = int(time.time())
cur = self._db.execute(
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
' ORDER BY random()',
(int(time.time()), stream or shared.stream)
' ORDER BY random()', (now, stream)
)
return [v for v, in cur]
cur.arraysize = chunk_size
while True:
vectors = cur.fetchmany()
if not vectors:
return
yield [v for v, in vectors]
def get(self, vector, default=None):
try:

View File

@ -65,8 +65,10 @@ class TestObjects():
self.objects[obj.vector] = obj
needed.add(obj.vector)
self.assertEqual(set(self.objects.vectors_to_send(4)), needed)
self.assertTrue(set(self.objects.vectors_to_send()).difference(needed))
self.assertEqual(
set(next(self.objects.vectors_to_send(stream=4))), needed)
self.assertTrue(
set(next(self.objects.vectors_to_send())).difference(needed))
def test_filter(self):
"""Check the objects filtering"""