From e0fd798d2245227700e6d1a4cd0b2ce66a2cce00 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 13 Oct 2024 05:09:47 +0300 Subject: [PATCH] Make objects.vectors_to_send() a generator of chunks --- minode/connection.py | 7 ++----- minode/sql.py | 15 +++++++++++---- minode/tests/test_objects.py | 6 ++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index 5d35017..3dd5f9a 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -278,14 +278,11 @@ class ConnectionBase(threading.Thread): self.send_queue.put(message.Addr(addr)) if shared.objects: - to_send = shared.objects.vectors_to_send() - offset = 0 - while offset < len(to_send): + for chunk in shared.objects.vectors_to_send(10000): # We limit size of inv messaged to 10000 entries # because they might time out # in very slow networks (I2P) - self.send_queue.put(message.Inv(to_send[offset:offset+10000])) - offset += 10000 + self.send_queue.put(message.Inv(chunk)) self.status = 'fully_established' def _process_queue(self): diff --git a/minode/sql.py b/minode/sql.py index cf69faf..3232c5e 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -111,13 +111,20 @@ class Inventory(): self._pending.update(vectors) return vectors - def vectors_to_send(self, stream=None): + def vectors_to_send(self, chunk_size=10000, stream=None): + if stream is None: + stream = shared.stream + now = int(time.time()) cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' - ' ORDER BY random()', - (int(time.time()), stream or shared.stream) + ' ORDER BY random()', (now, stream) ) - return [v for v, in cur] + cur.arraysize = chunk_size + while True: + vectors = cur.fetchmany() + if not vectors: + return + yield [v for v, in vectors] def get(self, vector, default=None): try: diff --git a/minode/tests/test_objects.py b/minode/tests/test_objects.py index e869a6a..dddbc18 100644 --- a/minode/tests/test_objects.py +++ b/minode/tests/test_objects.py @@ -65,8 +65,10 @@ class TestObjects(): self.objects[obj.vector] = obj needed.add(obj.vector) - self.assertEqual(set(self.objects.vectors_to_send(4)), needed) - self.assertTrue(set(self.objects.vectors_to_send()).difference(needed)) + self.assertEqual( + set(next(self.objects.vectors_to_send(stream=4))), needed) + self.assertTrue( + set(next(self.objects.vectors_to_send())).difference(needed)) def test_filter(self): """Check the objects filtering"""