From 1f64719a56f82b692dd567c884a444ab3d03d754 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 8 Sep 2024 06:48:28 +0300 Subject: [PATCH 01/11] A rough implementation of the objects stored in sqlite db; structure.Object was rewritten, to store the object data. --- minode/connection.py | 38 +++----- minode/main.py | 11 +-- minode/manager.py | 48 +---------- minode/proofofwork.py | 10 +-- minode/sql.py | 153 +++++++++++++++++++++++++++++++++ minode/structure.py | 55 +++++++----- minode/tests/test_network.py | 6 +- minode/tests/test_structure.py | 12 +-- 8 files changed, 223 insertions(+), 110 deletions(-) create mode 100644 minode/sql.py diff --git a/minode/connection.py b/minode/connection.py index 05d552a..fba349c 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -277,22 +277,15 @@ class ConnectionBase(threading.Thread): if len(addr) != 0: self.send_queue.put(message.Addr(addr)) - with shared.objects_lock: - if len(shared.objects) > 0: - to_send = { - vector for vector in shared.objects.keys() - if shared.objects[vector].expires_time > time.time()} - while len(to_send) > 0: - if len(to_send) > 10000: - # We limit size of inv messaged to 10000 entries - # because they might time out - # in very slow networks (I2P) - pack = random.sample(tuple(to_send), 10000) - self.send_queue.put(message.Inv(pack)) - to_send.difference_update(pack) - else: - self.send_queue.put(message.Inv(to_send)) - to_send.clear() + if shared.objects: + to_send = shared.objects.vectors_to_send() + offset = 0 + while offset < len(to_send): + # We limit size of inv messaged to 10000 entries + # because they might time out + # in very slow networks (I2P) + self.send_queue.put(message.Inv(to_send[offset:offset+10000])) + offset += 10000 self.status = 'fully_established' def _process_queue(self): @@ -463,12 +456,10 @@ class ConnectionBase(threading.Thread): else: to_send = self.vectors_to_send.copy() self.vectors_to_send.clear() - with shared.objects_lock: - for vector in to_send: - obj = shared.objects.get(vector, None) - if obj: - self.send_queue.put( - message.Message(b'object', obj.to_bytes())) + for vector in to_send: + obj = shared.objects.get(vector) + if obj: + self.send_queue.put(message.Message(b'object', obj.data)) class Connection(ConnectionBase): @@ -488,8 +479,7 @@ class Connection(ConnectionBase): self.vectors_requested.pop(obj.vector, None) self.vectors_to_get.discard(obj.vector) if obj.is_valid() and obj.vector not in shared.objects: - with shared.objects_lock: - shared.objects[obj.vector] = obj + shared.objects[obj.vector] = obj if ( obj.object_type == shared.i2p_dest_obj_type and obj.version == shared.i2p_dest_obj_version diff --git a/minode/main.py b/minode/main.py index 72cebe0..868c3f0 100644 --- a/minode/main.py +++ b/minode/main.py @@ -8,7 +8,7 @@ import os import signal import socket -from . import i2p, shared +from . import i2p, shared, sql from .advertiser import Advertiser from .manager import Manager from .listener import Listener @@ -161,10 +161,9 @@ def start_ip_listener(): def start_i2p_listener(): """Starts I2P threads""" # Grab I2P destinations from old object file - for obj in shared.objects.values(): - if obj.object_type == shared.i2p_dest_obj_type: - shared.i2p_unchecked_node_pool.add(( - base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) + for obj in shared.objects.filter(object_type=shared.i2p_dest_obj_type): + shared.i2p_unchecked_node_pool.add(( + base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) dest_priv = b'' @@ -245,6 +244,8 @@ def main(): if shared.ip_enabled and not shared.trusted_peer: bootstrap_from_dns() + shared.objects = sql.Inventory() + if shared.i2p_enabled: # We are starting it before cleaning expired objects # so we can collect I2P destination objects diff --git a/minode/manager.py b/minode/manager.py index caf223b..9d0dc63 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -23,7 +23,6 @@ class Manager(threading.Thread): self.bootstrap_pool = [] self.last_cleaned_objects = time.time() self.last_cleaned_connections = time.time() - self.last_pickled_objects = time.time() self.last_pickled_nodes = time.time() # Publish destination 5-15 minutes after start self.last_published_i2p_destination = \ @@ -36,7 +35,7 @@ class Manager(threading.Thread): def run(self): self.load_data() - self.clean_objects() + shared.objects.cleanup() self.fill_bootstrap_pool() while True: time.sleep(0.8) @@ -45,14 +44,11 @@ class Manager(threading.Thread): logging.debug('Shutting down Manager') break if now - self.last_cleaned_objects > 90: - self.clean_objects() + shared.objects.cleanup() self.last_cleaned_objects = now if now - self.last_cleaned_connections > 2: self.manage_connections() self.last_cleaned_connections = now - if now - self.last_pickled_objects > 100: - self.pickle_objects() - self.last_pickled_objects = now if now - self.last_pickled_nodes > 60: self.pickle_nodes() self.last_pickled_nodes = now @@ -60,18 +56,6 @@ class Manager(threading.Thread): self.publish_i2p_destination() self.last_published_i2p_destination = now - @staticmethod - def clean_objects(): - """Remove expired objects""" - for vector in set(shared.objects): - # FIXME: no need to check is_valid() here - if shared.objects[vector].is_expired(): - logging.debug( - 'Deleted expired object: %s', - base64.b16encode(vector).decode()) - with shared.objects_lock: - del shared.objects[vector] - def manage_connections(self): """Open new connections if needed, remove closed ones""" hosts = set() @@ -177,17 +161,6 @@ class Manager(threading.Thread): @staticmethod def load_data(): """Load initial nodes and data, stored in files between sessions""" - try: - with open( - os.path.join(shared.data_directory, 'objects.pickle'), 'br' - ) as src: - shared.objects = pickle.load(src) - except FileNotFoundError: - pass # first start - except Exception: - logging.warning( - 'Error while loading objects from disk.', exc_info=True) - try: with open( os.path.join(shared.data_directory, 'nodes.pickle'), 'br' @@ -227,19 +200,6 @@ class Manager(threading.Thread): (row[0].encode(), 'i2p') for row in reader} shared.i2p_node_pool.update(shared.i2p_core_nodes) - @staticmethod - def pickle_objects(): - """Save objects into a file objects.pickle in the data directory""" - try: - with open( - os.path.join(shared.data_directory, 'objects.pickle'), 'bw' - ) as dst: - with shared.objects_lock: - pickle.dump(shared.objects, dst, protocol=3) - logging.debug('Saved objects') - except Exception: - logging.warning('Error while saving objects', exc_info=True) - @staticmethod def pickle_nodes(): """Save nodes into files in the data directory""" @@ -278,7 +238,7 @@ class Manager(threading.Thread): dest_pub_raw = base64.b64decode( shared.i2p_dest_pub, altchars=b'-~') obj = structure.Object( - b'\x00' * 8, int(time.time() + 2 * 3600), + int(time.time() + 2 * 3600), shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, - shared.stream, dest_pub_raw) + shared.stream, object_payload=dest_pub_raw) proofofwork.do_pow_and_publish(obj) diff --git a/minode/proofofwork.py b/minode/proofofwork.py index c878b2a..c0a503d 100644 --- a/minode/proofofwork.py +++ b/minode/proofofwork.py @@ -39,14 +39,14 @@ def _worker(obj): logging.debug( 'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t) obj = structure.Object( - nonce, obj.expires_time, obj.object_type, obj.version, - obj.stream_number, obj.object_payload) + obj.expires_time, obj.object_type, obj.version, obj.stream_number, + object_payload=obj.object_payload, nonce=nonce + ) logging.debug( 'Object vector is %s', base64.b16encode(obj.vector).decode()) - with shared.objects_lock: - shared.objects[obj.vector] = obj - shared.vector_advertise_queue.put(obj.vector) + shared.objects[obj.vector] = obj + shared.vector_advertise_queue.put(obj.vector) def do_pow_and_publish(obj): diff --git a/minode/sql.py b/minode/sql.py new file mode 100644 index 0000000..4ccb6d9 --- /dev/null +++ b/minode/sql.py @@ -0,0 +1,153 @@ +"""Inventory implementation using sqlite""" + +import os +import sqlite3 +import time + +from . import shared, structure + +sqlite3.threadsafety = 3 + + +class Inventory(): + """sqlite inventory""" + def __init__(self): + self._db = sqlite3.connect( + os.path.join(shared.data_directory, 'objects.dat'), + check_same_thread=False + ) + self._db.executescript(""" + BEGIN; + CREATE TABLE IF NOT EXISTS status + (key text, value integer, UNIQUE(key) ON CONFLICT REPLACE); + INSERT INTO status VALUES ('version', 1); + CREATE TABLE IF NOT EXISTS objects + (vector unique, expires integer, type integer, version integer, + stream integer, tag, data, offset integer); + COMMIT; + """) + self.rowid = len(self) or None + cur = self._db.cursor() + cur.execute("SELECT value FROM status WHERE key='lastvacuumtime'") + now = int(time.time()) + try: + vacuumed = cur.fetchone()[0] + except TypeError: + pass + else: + if vacuumed < now - 86400: # 24 hours + cur.execute('VACUUM') + cur.execute("INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,)) + self._db.commit() + + def __objects(self, cur): + return ( + structure.Object( + expires, obj_type, version, stream, data, offset, + tag=tag, vector=vector) + for (vector, expires, obj_type, version, stream, tag, data, offset) + in cur.fetchall() + ) + + def cleanup(self): + """Remove expired objects""" + with shared.objects_lock: + self._db.execute( + 'DELETE FROM objects WHERE expires < ?', + (int(time.time()) - 3 * 3600,) + ) + self._db.commit() + # conditional vacuum and validity check + + def filter(self, stream=None, object_type=None, tag=None): + """Generator of objects with the given parameters""" + clauses = [] + if stream: + clauses.append(('stream = ?', stream)) + if object_type is not None: + clauses.append(('type = ?', object_type)) + if tag: + clauses.append(('tag = ?', tag)) + + clauses, params = zip(*clauses) + + cur = self._db.execute( + 'SELECT * FROM objects WHERE ' # nosec B608 + + ' AND '.join(clauses), params) + return self.__objects(cur) + + def vectors_to_send(self, stream=None): + cur = self._db.execute( + 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' + ' ORDER BY random()', + (int(time.time()), stream or shared.stream) + ) + return [v for v, in cur.fetchall()] + + def get(self, vector, default=None): + try: + return self[vector] + except KeyError: + return default + + def keys(self): + cur = self._db.execute('SELECT vector FROM objects') + return (v for v, in cur.fetchall()) + + def values(self): + cur = self._db.execute('SELECT * FROM objects') + return self.__objects(cur) + + def popitem(self): + if not self.rowid: + raise KeyError('empty') + cur = self._db.execute( + 'SELECT vector FROM objects WHERE ROWID = ?', (self.rowid,)) + vector = cur.fetchone()[0] + obj = self.get(vector) + del self[vector] + return (vector, obj) + + def __contains__(self, vector): + cur = self._db.execute( + 'SELECT vector FROM objects WHERE vector = ?', (vector,)) + return cur.fetchone() + + def __getitem__(self, vector): + cur = self._db.execute( + 'SELECT * FROM objects WHERE vector = ?', (vector,)) + item = cur.fetchone() + if item is None: + raise KeyError(vector) + vector, expires, obj_type, version, stream, tag, data, offset = item + return structure.Object( + expires, obj_type, version, stream, data, offset, + tag=tag, vector=vector + ) + + def __delitem__(self, vector): + with shared.objects_lock: # KeyError + self._db.execute('DELETE FROM objects WHERE vector = ?', (vector,)) + self._db.commit() + self.rowid = len(self) + + def __setitem__(self, vector, obj): + with shared.objects_lock: + cur = self._db.execute( + 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', ( + vector, obj.expires_time, obj.object_type, obj.version, + obj.stream_number, obj.tag, obj.data, obj.offset + )) + self._db.commit() + self.rowid = cur.lastrowid + + def __bool__(self): + return self._db.execute( + 'SELECT vector from objects LIMIT 1').fetchone() is not None + + def __len__(self): + cur = self._db.execute('SELECT count(*) FROM objects') + return cur.fetchone()[0] + + def __del__(self): + self._db.close() diff --git a/minode/structure.py b/minode/structure.py index 3c01537..657b6ac 100644 --- a/minode/structure.py +++ b/minode/structure.py @@ -62,21 +62,26 @@ class VarInt(IStructure): class Object(): """The 'object' message payload""" def __init__( - self, nonce, expires_time, object_type, version, - stream_number, object_payload + self, expires_time, object_type, version, stream_number, + data=None, offset=None, object_payload=None, + tag=None, nonce=b'\x00' * 8, vector=None ): self.nonce = nonce self.expires_time = expires_time self.object_type = object_type self.version = version self.stream_number = stream_number - self.object_payload = object_payload - self.vector = hashlib.sha512(hashlib.sha512( - self.to_bytes()).digest()).digest()[:32] + if not data: + data, offset = self.to_bytes(object_payload) + self.data = data + self.offset = offset + self.vector = vector or hashlib.sha512(hashlib.sha512( + self.data).digest()).digest()[:32] - self.tag = ( + self.tag = tag or ( # broadcast from version 5 and pubkey/getpukey from version 4 - self.object_payload[:32] if object_type == 3 and version == 5 + (object_payload or self.object_payload)[:32] + if object_type == 3 and version == 5 or (object_type in (0, 1) and version == 4) else None) @@ -87,29 +92,33 @@ class Object(): @classmethod def from_message(cls, m): """Decode message payload""" - payload = m.payload - nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20]) - payload = payload[20:] - version_varint_length = VarInt.length(payload[0]) - version = VarInt.from_bytes(payload[:version_varint_length]).n - payload = payload[version_varint_length:] - stream_number_varint_length = VarInt.length(payload[0]) + data = m.payload + nonce, expires_time, object_type = struct.unpack('>8sQL', data[:20]) + version_varint_length = VarInt.length(data[20]) + offset = 20 + version_varint_length + version = VarInt.from_bytes(data[20:offset]).n + stream_number_varint_length = VarInt.length(data[offset]) stream_number = VarInt.from_bytes( - payload[:stream_number_varint_length]).n - payload = payload[stream_number_varint_length:] + data[offset:offset+stream_number_varint_length]).n + offset += stream_number_varint_length return cls( - nonce, expires_time, object_type, version, stream_number, payload) + expires_time, object_type, version, stream_number, + data, offset, nonce=nonce + ) - def to_bytes(self): - """Serialize to bytes object payload""" + @property + def object_payload(self): + return self.data[self.offset:] + + def to_bytes(self, object_payload): + """Serialize to bytes""" payload = b'' payload += self.nonce payload += struct.pack('>QL', self.expires_time, self.object_type) payload += ( VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes()) - payload += self.object_payload - return payload + return payload + object_payload, len(payload) def is_expired(self): """Check if object's TTL is expired""" @@ -152,7 +161,7 @@ class Object(): def pow_target(self): """Compute PoW target""" - data = self.to_bytes()[8:] + data = self.data[8:] length = len(data) + 8 + shared.payload_length_extra_bytes dt = max(self.expires_time - time.time(), 0) return int( @@ -162,7 +171,7 @@ class Object(): def pow_initial_hash(self): """Compute the initial hash for PoW""" - return hashlib.sha512(self.to_bytes()[8:]).digest() + return hashlib.sha512(self.data[8:]).digest() class NetAddrNoPrefix(IStructure): diff --git a/minode/tests/test_network.py b/minode/tests/test_network.py index c683637..d588022 100644 --- a/minode/tests/test_network.py +++ b/minode/tests/test_network.py @@ -8,7 +8,7 @@ import tempfile import time from contextlib import contextmanager -from minode import connection, main, shared +from minode import connection, main, shared, sql from minode.listener import Listener from minode.manager import Manager @@ -66,11 +66,11 @@ class TestNetwork(unittest.TestCase): def setUp(self): shared.core_nodes.clear() shared.unchecked_node_pool.clear() - shared.objects = {} try: - os.remove(os.path.join(shared.data_directory, 'objects.pickle')) + os.remove(os.path.join(shared.data_directory, 'objects.dat')) except FileNotFoundError: pass + shared.objects = sql.Inventory() def _make_initial_nodes(self): Manager.load_data() diff --git a/minode/tests/test_structure.py b/minode/tests/test_structure.py index 970c152..c45d2d4 100644 --- a/minode/tests/test_structure.py +++ b/minode/tests/test_structure.py @@ -135,13 +135,13 @@ class TestStructure(unittest.TestCase): self.assertEqual(obj.object_payload, b'HELLO') obj = structure.Object( - b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO') + int(time.time() + 3000000), 42, 1, 1, object_payload=b'HELLO') self.assertFalse(obj.is_valid()) obj.expires_time = int(time.time() - 11000) self.assertFalse(obj.is_valid()) obj = structure.Object( - b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO') + int(time.time() + 300), 42, 1, 2, object_payload=b'HELLO') vector = obj.vector proofofwork._worker(obj) # pylint: disable=protected-access obj = shared.objects.popitem()[1] @@ -159,8 +159,8 @@ class TestStructure(unittest.TestCase): """Check the main proofofwork call and worker""" shared.vector_advertise_queue = queue.Queue() obj = structure.Object( - b'\x00' * 8, int(time.time() + 300), 42, 1, - shared.stream, b'HELLO') + int(time.time() + 300), 42, 1, shared.stream, + object_payload=b'HELLO') start_time = time.time() proofofwork.do_pow_and_publish(obj) try: @@ -189,6 +189,6 @@ class TestStructure(unittest.TestCase): self.fail("No nonce found in the queue") obj = structure.Object( - nonce, obj.expires_time, obj.object_type, obj.version, - obj.stream_number, obj.object_payload) + obj.expires_time, obj.object_type, obj.version, obj.stream_number, + object_payload=obj.object_payload, nonce=nonce) self.assertTrue(obj.is_valid()) From 8ffcf92380d7e9bccf13057902fd6ed8803d1810 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Mon, 9 Sep 2024 00:29:55 +0300 Subject: [PATCH 02/11] Update test object for the new Object structure, comment outdated setting object_payload --- minode/tests/test_structure.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/minode/tests/test_structure.py b/minode/tests/test_structure.py index c45d2d4..0d35275 100644 --- a/minode/tests/test_structure.py +++ b/minode/tests/test_structure.py @@ -18,11 +18,12 @@ sample_addr_data = unhexlify( '260753000201300000000000000057ae1f90') # data for an object with expires_time 1697063939 -# structure.Object( -# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes() +# structure.Object(expires_time, 42, 1, 2, object_payload=b'HELLO').data sample_object_data = unhexlify( '000000000000000000000000652724030000002a010248454c4c4f') +sample_object_expires = 1697063939 + logging.basicConfig( level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s') @@ -131,9 +132,16 @@ class TestStructure(unittest.TestCase): message.Message(b'object', sample_object_data)) self.assertEqual(obj.object_type, 42) self.assertEqual(obj.stream_number, 2) - self.assertEqual(obj.expires_time, 1697063939) + self.assertEqual(obj.expires_time, sample_object_expires) self.assertEqual(obj.object_payload, b'HELLO') + obj = structure.Object( + sample_object_expires, 42, 1, 2, object_payload=b'HELLO') + self.assertEqual(obj.data, sample_object_data) + self.assertEqual(obj.offset, 22) + self.assertEqual(obj.nonce, b'\x00' * 8) + self.assertTrue(obj.is_expired()) + obj = structure.Object( int(time.time() + 3000000), 42, 1, 1, object_payload=b'HELLO') self.assertFalse(obj.is_valid()) @@ -151,9 +159,10 @@ class TestStructure(unittest.TestCase): shared.stream = 2 self.assertTrue(obj.is_valid()) - obj.object_payload = \ - b'TIGER, tiger, burning bright. In the forests of the night' - self.assertFalse(obj.is_valid()) + # obj.data = struct.pack(... + # obj.object_payload = \ + # b'TIGER, tiger, burning bright. In the forests of the night' + # self.assertFalse(obj.is_valid()) def test_proofofwork(self): """Check the main proofofwork call and worker""" From 73893dabf7b1524141d859b5e7c34750784c93fb Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Wed, 11 Sep 2024 02:35:09 +0300 Subject: [PATCH 03/11] Set a row factory instead of nesting the iterator, iterate through the cursor --- minode/sql.py | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/minode/sql.py b/minode/sql.py index 4ccb6d9..755f194 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -39,15 +39,16 @@ class Inventory(): cur.execute('VACUUM') cur.execute("INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,)) self._db.commit() + self._db.row_factory = self.__object - def __objects(self, cur): - return ( - structure.Object( - expires, obj_type, version, stream, data, offset, - tag=tag, vector=vector) - for (vector, expires, obj_type, version, stream, tag, data, offset) - in cur.fetchall() - ) + @staticmethod + def __object(cursor, row): + if len(cursor.description) != 8: + return row + vector, expires, obj_type, version, stream, tag, data, offset = row + return structure.Object( + expires, obj_type, version, stream, data, offset, + tag=tag, vector=vector) def cleanup(self): """Remove expired objects""" @@ -74,7 +75,7 @@ class Inventory(): cur = self._db.execute( 'SELECT * FROM objects WHERE ' # nosec B608 + ' AND '.join(clauses), params) - return self.__objects(cur) + return cur def vectors_to_send(self, stream=None): cur = self._db.execute( @@ -82,7 +83,7 @@ class Inventory(): ' ORDER BY random()', (int(time.time()), stream or shared.stream) ) - return [v for v, in cur.fetchall()] + return [v for v, in cur] def get(self, vector, default=None): try: @@ -92,11 +93,10 @@ class Inventory(): def keys(self): cur = self._db.execute('SELECT vector FROM objects') - return (v for v, in cur.fetchall()) + return (v for v, in cur) def values(self): - cur = self._db.execute('SELECT * FROM objects') - return self.__objects(cur) + return self._db.execute('SELECT * FROM objects') def popitem(self): if not self.rowid: @@ -114,16 +114,11 @@ class Inventory(): return cur.fetchone() def __getitem__(self, vector): - cur = self._db.execute( - 'SELECT * FROM objects WHERE vector = ?', (vector,)) - item = cur.fetchone() + item = self._db.execute( + 'SELECT * FROM objects WHERE vector = ?', (vector,)).fetchone() if item is None: raise KeyError(vector) - vector, expires, obj_type, version, stream, tag, data, offset = item - return structure.Object( - expires, obj_type, version, stream, data, offset, - tag=tag, vector=vector - ) + return item def __delitem__(self, vector): with shared.objects_lock: # KeyError From c2abf2879a135d42d9263b5949ec9ea828d0b317 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Mon, 9 Sep 2024 00:11:58 +0300 Subject: [PATCH 04/11] Move objects_lock into the Inventory object --- minode/shared.py | 1 - minode/sql.py | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/minode/shared.py b/minode/shared.py index 72864ec..a964dca 100644 --- a/minode/shared.py +++ b/minode/shared.py @@ -63,4 +63,3 @@ outgoing_connections = 8 connection_limit = 250 objects = {} -objects_lock = threading.Lock() diff --git a/minode/sql.py b/minode/sql.py index 755f194..f1b3cd3 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -2,6 +2,7 @@ import os import sqlite3 +import threading import time from . import shared, structure @@ -12,6 +13,7 @@ sqlite3.threadsafety = 3 class Inventory(): """sqlite inventory""" def __init__(self): + self._lock = threading.Lock() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -52,7 +54,7 @@ class Inventory(): def cleanup(self): """Remove expired objects""" - with shared.objects_lock: + with self._lock: self._db.execute( 'DELETE FROM objects WHERE expires < ?', (int(time.time()) - 3 * 3600,) @@ -121,13 +123,13 @@ class Inventory(): return item def __delitem__(self, vector): - with shared.objects_lock: # KeyError + with self._lock: # KeyError self._db.execute('DELETE FROM objects WHERE vector = ?', (vector,)) self._db.commit() self.rowid = len(self) def __setitem__(self, vector, obj): - with shared.objects_lock: + with self._lock: cur = self._db.execute( 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', ( vector, obj.expires_time, obj.object_type, obj.version, From 22f0282ef950c78f7cff9e1aaba51f4e41697de1 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Wed, 11 Sep 2024 05:33:10 +0300 Subject: [PATCH 05/11] Add VACUUM after deleting 10000 objects, do it in cleanup() --- minode/sql.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/minode/sql.py b/minode/sql.py index f1b3cd3..a513aef 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -1,5 +1,6 @@ """Inventory implementation using sqlite""" +import logging import os import sqlite3 import threading @@ -14,6 +15,7 @@ class Inventory(): """sqlite inventory""" def __init__(self): self._lock = threading.Lock() + self._deleted = 0 self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -29,17 +31,16 @@ class Inventory(): COMMIT; """) self.rowid = len(self) or None - cur = self._db.cursor() - cur.execute("SELECT value FROM status WHERE key='lastvacuumtime'") - now = int(time.time()) try: - vacuumed = cur.fetchone()[0] + self.lastvacuumtime = self._db.execute( + "SELECT value FROM status WHERE key='lastvacuumtime'" + ).fetchone()[0] except TypeError: - pass - else: - if vacuumed < now - 86400: # 24 hours - cur.execute('VACUUM') - cur.execute("INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,)) + self.lastvacuumtime = int(time.time()) + self._db.execute( + "INSERT INTO status VALUES ('lastvacuumtime', ?)", + (self.lastvacuumtime,) + ) self._db.commit() self._db.row_factory = self.__object @@ -55,12 +56,22 @@ class Inventory(): def cleanup(self): """Remove expired objects""" with self._lock: - self._db.execute( - 'DELETE FROM objects WHERE expires < ?', - (int(time.time()) - 3 * 3600,) - ) + now = int(time.time()) + cur = self._db.execute( + 'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,)) self._db.commit() - # conditional vacuum and validity check + self._deleted += cur.rowcount + logging.debug('Deleted %s expired objects', cur.rowcount) + # conditional vacuum and validity check (TODO) + # every 24 hours or after deleting a lot of items + if self._deleted > 10000 or self.lastvacuumtime < now - 86400: + logging.info('Doing VACUUM for objects') + cur.execute('VACUUM') + cur.execute( + "INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,)) + self._db.commit() + self._deleted = 0 + self.lastvacuumtime = now def filter(self, stream=None, object_type=None, tag=None): """Generator of objects with the given parameters""" From 260c839e264610167f1fce872c615aa2fcae17f9 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Thu, 12 Sep 2024 20:26:57 +0300 Subject: [PATCH 06/11] Take into account pending objects in Inventory.cleanup() --- minode/connection.py | 7 +++---- minode/manager.py | 2 ++ minode/sql.py | 31 ++++++++++++++++++++++++++++++- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index fba349c..700d2e3 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -409,7 +409,7 @@ class ConnectionBase(threading.Thread): def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: - self.vectors_to_get.difference_update(shared.objects.keys()) + self.vectors_to_get = shared.objects.select(self.vectors_to_get) if not self.wait_until: nodes_count = ( len(shared.node_pool) + len(shared.unchecked_node_pool)) @@ -467,9 +467,7 @@ class Connection(ConnectionBase): def _process_msg_inv(self, m): inv = message.Inv.from_message(m) logging.debug('%s:%s -> %s', self.host_print, self.port, inv) - to_get = inv.vectors.copy() - to_get.difference_update(shared.objects.keys()) - self.vectors_to_get.update(to_get) + self.vectors_to_get.update(shared.objects.select(inv.vectors)) # Do not send objects they already have. self.vectors_to_send.difference_update(inv.vectors) @@ -491,6 +489,7 @@ class Connection(ConnectionBase): logging.debug(dest) shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.vector_advertise_queue.put(obj.vector) + shared.objects.check(obj.vector) def _process_msg_getdata(self, m): getdata = message.GetData.from_message(m) diff --git a/minode/manager.py b/minode/manager.py index 9d0dc63..fc7b2e2 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -85,6 +85,8 @@ class Manager(threading.Thread): outgoing_connections = 0 for c in shared.connections.copy(): if not c.is_alive() or c.status == 'disconnected': + shared.objects.check( + *(c.vectors_to_get | c.vectors_requested.keys())) with shared.connections_lock: shared.connections.remove(c) else: diff --git a/minode/sql.py b/minode/sql.py index a513aef..0aa0c4a 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -16,6 +16,7 @@ class Inventory(): def __init__(self): self._lock = threading.Lock() self._deleted = 0 + self._pending = set() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False @@ -53,15 +54,27 @@ class Inventory(): expires, obj_type, version, stream, data, offset, tag=tag, vector=vector) + def check(self, *vectors): + """Remove given vectors from pending""" + with self._lock: + for vector in vectors: + self._pending.discard(vector) + def cleanup(self): """Remove expired objects""" + if len(self._pending) > 100: + logging.info( + 'Not cleaning up, %s objects pending', len(self._pending)) + return with self._lock: now = int(time.time()) cur = self._db.execute( 'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,)) self._db.commit() self._deleted += cur.rowcount - logging.debug('Deleted %s expired objects', cur.rowcount) + (logging.info if self._pending else logging.debug)( + 'Deleted %s expired objects, %s pending', + cur.rowcount, len(self._pending)) # conditional vacuum and validity check (TODO) # every 24 hours or after deleting a lot of items if self._deleted > 10000 or self.lastvacuumtime < now - 86400: @@ -90,6 +103,22 @@ class Inventory(): + ' AND '.join(clauses), params) return cur + def select(self, vectors): + """Select new vectors from the given set""" + chunk_size = 999 + keys = tuple(vectors) + with self._lock: + for i in range(0, len(vectors), chunk_size): + chunk = keys[i:i+chunk_size] + cur = self._db.execute( + 'SELECT vector FROM objects WHERE vector IN' # nosec B608 + ' ({})'.format(','.join('?' * len(chunk))), + chunk) + for v, in cur: + vectors.remove(v) + self._pending.update(vectors) + return vectors + def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' From dba455464a53b5dfe2a20c0d92bc35e7d9c340c3 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 29 Sep 2024 17:04:34 +0300 Subject: [PATCH 07/11] Started a test case for the objects --- minode/tests/test_objects.py | 163 +++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 minode/tests/test_objects.py diff --git a/minode/tests/test_objects.py b/minode/tests/test_objects.py new file mode 100644 index 0000000..e869a6a --- /dev/null +++ b/minode/tests/test_objects.py @@ -0,0 +1,163 @@ +"""Tests for the Inventory implementation""" +import os +import random +import tempfile +import time +import unittest + +from minode import sql, shared, structure + + +# + __bool__ +# + __contains__ +# + __getitem__ +# + __setitem__ +# = cleanup +# + get +# + filter +# = select +# + vectors_to_send + + +class TestObjects(): + """ + A base class for the test case for shared.objects, + containing tests for all the methods directly used in code. + """ + # pylint: disable=no-member + # A possibility of abstract test cases was rejected: + # https://bugs.python.org/issue17519 + + def test_set_get(self): + """Put some objects and check presence and getting""" + obj = structure.Object( + int(time.time()), 42, 1, 1, object_payload=b'HELLO') + self.assertFalse(obj.vector in self.objects) + with self.assertRaises(KeyError): + self.objects[obj.vector] # pylint: disable=pointless-statement + self.assertIsNone(self.objects.get(obj.vector)) + prev_len = len(self.objects) + self.objects[obj.vector] = obj + self.objects[obj.vector] = obj + self.assertTrue(self.objects) + self.assertEqual(len(self.objects), prev_len + 1) + self.assertTrue(obj.vector in self.objects) + obj1 = self.objects[obj.vector] + self.assertEqual(obj.vector, obj1.vector) + self.assertEqual(obj.data, obj1.data) + + def test_vectors_to_send(self): + """Check vectors_to_send method""" + needed = set() + for _ in range(10): + # wrong stream + obj = structure.Object( + int(time.time()) + 10, 42, 1, random.randint(1, 3), + object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # expired + obj = structure.Object( + int(time.time()) - 10, 42, 1, 4, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # interesting + obj = structure.Object( + int(time.time()) + 10, 42, 1, 4, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + needed.add(obj.vector) + + self.assertEqual(set(self.objects.vectors_to_send(4)), needed) + self.assertTrue(set(self.objects.vectors_to_send()).difference(needed)) + + def test_filter(self): + """Check the objects filtering""" + needed = set() + tagged = set() + tag = b'@' * 32 + for _ in range(10): + # wrong type + obj = structure.Object( + int(time.time()), 0, 1, 5, object_payload=os.urandom(64)) + self.objects[obj.vector] = obj + # wrong type, but the proper tag + obj = structure.Object( + int(time.time()) - 11000, 0, 4, random.choice([1, 2, 3, 5]), + object_payload=tag + os.urandom(32)) + self.objects[obj.vector] = obj + tagged.add(obj.vector) + # wrong stream + obj = structure.Object( + int(time.time()), 33, 1, 1, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # interesting + obj = structure.Object( + int(time.time()) - 11000, 33, 1, 5, + object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + needed.add(obj.vector) + + # stream and type + self.assertTrue(needed) + for obj in self.objects.filter(5, 33): + needed.remove(obj.vector) + self.assertFalse(needed) + + # tag + self.assertTrue(tagged) + for obj in self.objects.filter(tag=tag): + tagged.remove(obj.vector) + self.assertFalse(tagged) + + def test_cleanup(self): + """Check cleaning up""" + for _ in range(10): + obj = structure.Object( + int(time.time()) - random.randint(4, 5) * 3600, + 42, 1, 6, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + obj = structure.Object( + int(time.time()) - 2 * 3600, + 42, 1, 6, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + + for obj in self.objects.values(): + if obj.is_expired(): + break + else: + self.fail('No objects found to delete') + + self.objects.cleanup() + self.assertTrue(self.objects) + for obj in self.objects.values(): + self.assertFalse(obj.is_expired()) + + def test_select(self): + """Check the select method""" + pending = set() + questionable = set() + + for _ in range(5): + obj = structure.Object( + int(time.time()) - 10, 42, 1, 7, object_payload=os.urandom(32)) + questionable.add(obj.vector) + self.objects[obj.vector] = obj + obj = structure.Object( + int(time.time()) + 10, 42, 1, 7, object_payload=os.urandom(32)) + questionable.add(obj.vector) + pending.add(obj.vector) + + self.assertEqual(self.objects.select(questionable), pending) + + +class TestObjectsSQL(TestObjects, unittest.TestCase): + """A test case for the sqlite inventory""" + + @classmethod + def setUpClass(cls): + shared.data_directory = tempfile.gettempdir() + cls.tearDownClass() + cls.objects = sql.Inventory() + + @classmethod + def tearDownClass(cls): + cls.objects = None + os.remove(os.path.join(shared.data_directory, 'objects.dat')) From 34d2e54d56d24ea00033470baafe5766622eac82 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 13 Oct 2024 05:09:47 +0300 Subject: [PATCH 08/11] Make objects.vectors_to_send() a generator of chunks --- minode/connection.py | 7 ++----- minode/sql.py | 15 +++++++++++---- minode/tests/test_objects.py | 6 ++++-- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index 700d2e3..60cced1 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -278,14 +278,11 @@ class ConnectionBase(threading.Thread): self.send_queue.put(message.Addr(addr)) if shared.objects: - to_send = shared.objects.vectors_to_send() - offset = 0 - while offset < len(to_send): + for chunk in shared.objects.vectors_to_send(10000): # We limit size of inv messaged to 10000 entries # because they might time out # in very slow networks (I2P) - self.send_queue.put(message.Inv(to_send[offset:offset+10000])) - offset += 10000 + self.send_queue.put(message.Inv(chunk)) self.status = 'fully_established' def _process_queue(self): diff --git a/minode/sql.py b/minode/sql.py index 0aa0c4a..27c2471 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -119,13 +119,20 @@ class Inventory(): self._pending.update(vectors) return vectors - def vectors_to_send(self, stream=None): + def vectors_to_send(self, chunk_size=10000, stream=None): + if stream is None: + stream = shared.stream + now = int(time.time()) cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' - ' ORDER BY random()', - (int(time.time()), stream or shared.stream) + ' ORDER BY random()', (now, stream) ) - return [v for v, in cur] + cur.arraysize = chunk_size + while True: + vectors = cur.fetchmany() + if not vectors: + return + yield [v for v, in vectors] def get(self, vector, default=None): try: diff --git a/minode/tests/test_objects.py b/minode/tests/test_objects.py index e869a6a..dddbc18 100644 --- a/minode/tests/test_objects.py +++ b/minode/tests/test_objects.py @@ -65,8 +65,10 @@ class TestObjects(): self.objects[obj.vector] = obj needed.add(obj.vector) - self.assertEqual(set(self.objects.vectors_to_send(4)), needed) - self.assertTrue(set(self.objects.vectors_to_send()).difference(needed)) + self.assertEqual( + set(next(self.objects.vectors_to_send(stream=4))), needed) + self.assertTrue( + set(next(self.objects.vectors_to_send())).difference(needed)) def test_filter(self): """Check the objects filtering""" From ecfafcc3859a0082fa12b5c875ecad489e27ef6a Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Mon, 14 Oct 2024 16:49:05 +0300 Subject: [PATCH 09/11] Add caching objects in a dict --- minode/connection.py | 2 +- minode/manager.py | 1 + minode/sql.py | 95 +++++++++++++++++++++++++++++------- minode/tests/test_network.py | 3 ++ 4 files changed, 82 insertions(+), 19 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index 60cced1..fc470b0 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -473,7 +473,7 @@ class Connection(ConnectionBase): logging.debug('%s:%s -> %s', self.host_print, self.port, obj) self.vectors_requested.pop(obj.vector, None) self.vectors_to_get.discard(obj.vector) - if obj.is_valid() and obj.vector not in shared.objects: + if obj.is_valid(): shared.objects[obj.vector] = obj if ( obj.object_type == shared.i2p_dest_obj_type diff --git a/minode/manager.py b/minode/manager.py index fc7b2e2..efbbd12 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -42,6 +42,7 @@ class Manager(threading.Thread): now = time.time() if shared.shutting_down: logging.debug('Shutting down Manager') + shared.objects.flush() break if now - self.last_cleaned_objects > 90: shared.objects.cleanup() diff --git a/minode/sql.py b/minode/sql.py index 27c2471..55536d0 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -1,5 +1,6 @@ """Inventory implementation using sqlite""" +import base64 import logging import os import sqlite3 @@ -16,6 +17,7 @@ class Inventory(): def __init__(self): self._lock = threading.Lock() self._deleted = 0 + self._last = {} self._pending = set() self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), @@ -66,6 +68,16 @@ class Inventory(): logging.info( 'Not cleaning up, %s objects pending', len(self._pending)) return + for vector in set(self._last): + if self._last[vector].is_expired(): + logging.debug( + 'Deleted expired object: %s', + base64.b16encode(vector).decode()) + with self._lock: + del self._last[vector] + if len(self._last) > 1000: + self.flush() + return with self._lock: now = int(time.time()) cur = self._db.execute( @@ -86,8 +98,31 @@ class Inventory(): self._deleted = 0 self.lastvacuumtime = now + def flush(self): + """Write cached objects to the database""" + with self._lock: + cur = self._db.executemany( + 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', + ((obj.vector, obj.expires_time, obj.object_type, + obj.version, obj.stream_number, obj.tag, obj.data, + obj.offset) for obj in self._last.values())) + self._db.commit() + self.rowid = cur.lastrowid + self._last = {} + def filter(self, stream=None, object_type=None, tag=None): """Generator of objects with the given parameters""" + def fits(obj): + if stream and obj.stream_number != stream: + return False + if object_type is not None and obj.object_type != object_type: + return False + if tag and obj.tag != tag: + return False + return True + + yield from filter(fits, self._last.values()) + clauses = [] if stream: clauses.append(('stream = ?', stream)) @@ -98,16 +133,16 @@ class Inventory(): clauses, params = zip(*clauses) - cur = self._db.execute( + yield from self._db.execute( 'SELECT * FROM objects WHERE ' # nosec B608 + ' AND '.join(clauses), params) - return cur def select(self, vectors): """Select new vectors from the given set""" chunk_size = 999 - keys = tuple(vectors) with self._lock: + vectors.difference_update(self._last) + keys = tuple(vectors) for i in range(0, len(vectors), chunk_size): chunk = keys[i:i+chunk_size] cur = self._db.execute( @@ -116,7 +151,7 @@ class Inventory(): chunk) for v, in cur: vectors.remove(v) - self._pending.update(vectors) + self._pending.update(vectors) return vectors def vectors_to_send(self, chunk_size=10000, stream=None): @@ -131,6 +166,14 @@ class Inventory(): while True: vectors = cur.fetchmany() if not vectors: + # TODO: append to the last short result, + # check that _last is shorter than the chunk_size + # (should be < 1000) + if self._last: + yield [ + obj.vector for obj in self._last.values() + if obj.stream_number == stream + and obj.expires_time > now] return yield [v for v, in vectors] @@ -141,13 +184,19 @@ class Inventory(): return default def keys(self): - cur = self._db.execute('SELECT vector FROM objects') - return (v for v, in cur) + yield from self._last + for vector, in self._db.execute('SELECT vector FROM objects'): + yield vector def values(self): - return self._db.execute('SELECT * FROM objects') + yield from self._last.values() + yield from self._db.execute('SELECT * FROM objects') def popitem(self): + try: + return self._last.popitem() + except KeyError: + pass if not self.rowid: raise KeyError('empty') cur = self._db.execute( @@ -158,11 +207,17 @@ class Inventory(): return (vector, obj) def __contains__(self, vector): - cur = self._db.execute( - 'SELECT vector FROM objects WHERE vector = ?', (vector,)) - return cur.fetchone() + if vector in self._last: + return True + return self._db.execute( + 'SELECT vector FROM objects WHERE vector = ?', (vector,) + ).fetchone() is not None def __getitem__(self, vector): + try: + return self._last[vector] + except KeyError: + pass item = self._db.execute( 'SELECT * FROM objects WHERE vector = ?', (vector,)).fetchone() if item is None: @@ -170,28 +225,32 @@ class Inventory(): return item def __delitem__(self, vector): + try: + del self._last[vector] + return + except KeyError: + pass with self._lock: # KeyError self._db.execute('DELETE FROM objects WHERE vector = ?', (vector,)) self._db.commit() self.rowid = len(self) def __setitem__(self, vector, obj): + if vector in self: + return with self._lock: - cur = self._db.execute( - 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', ( - vector, obj.expires_time, obj.object_type, obj.version, - obj.stream_number, obj.tag, obj.data, obj.offset - )) - self._db.commit() - self.rowid = cur.lastrowid + self._last[vector] = obj def __bool__(self): + if self._last: + return True return self._db.execute( 'SELECT vector from objects LIMIT 1').fetchone() is not None def __len__(self): cur = self._db.execute('SELECT count(*) FROM objects') - return cur.fetchone()[0] + return cur.fetchone()[0] + len(self._last) def __del__(self): + self.flush() self._db.close() diff --git a/minode/tests/test_network.py b/minode/tests/test_network.py index d588022..1a9085e 100644 --- a/minode/tests/test_network.py +++ b/minode/tests/test_network.py @@ -72,6 +72,9 @@ class TestNetwork(unittest.TestCase): pass shared.objects = sql.Inventory() + def tearDown(self): + shared.objects.flush() + def _make_initial_nodes(self): Manager.load_data() core_nodes_len = len(shared.core_nodes) From 1aca3e69311ea0513fe6a8a1fdfbbcdafd14cd13 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 13 Oct 2024 03:24:36 +0300 Subject: [PATCH 10/11] Bump version to 0.3.5 --- minode/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/minode/shared.py b/minode/shared.py index a964dca..cc892d4 100644 --- a/minode/shared.py +++ b/minode/shared.py @@ -21,7 +21,7 @@ protocol_version = 3 services = 3 # NODE_NETWORK, NODE_SSL stream = 1 nonce = os.urandom(8) -user_agent = b'/MiNode:0.3.3/' +user_agent = b'/MiNode:0.3.5/' timeout = 600 header_length = 24 i2p_dest_obj_type = 0x493250 From 6372578bb628dcf78675e2eeb9c799a9a1428844 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Thu, 31 Oct 2024 07:22:41 +0200 Subject: [PATCH 11/11] Rename .vectors_to_send() to .biginv_chunks() preventing ambiguity --- minode/connection.py | 7 +++---- minode/sql.py | 3 ++- minode/tests/test_objects.py | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index fc470b0..23a2d90 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -278,10 +278,9 @@ class ConnectionBase(threading.Thread): self.send_queue.put(message.Addr(addr)) if shared.objects: - for chunk in shared.objects.vectors_to_send(10000): - # We limit size of inv messaged to 10000 entries - # because they might time out - # in very slow networks (I2P) + for chunk in shared.objects.biginv_chunks(10000): + # We limit size of inv messages to 10000 entries + # because they might time out in very slow networks (I2P) self.send_queue.put(message.Inv(chunk)) self.status = 'fully_established' diff --git a/minode/sql.py b/minode/sql.py index 55536d0..47b2509 100644 --- a/minode/sql.py +++ b/minode/sql.py @@ -154,7 +154,8 @@ class Inventory(): self._pending.update(vectors) return vectors - def vectors_to_send(self, chunk_size=10000, stream=None): + def biginv_chunks(self, chunk_size=10000, stream=None): + """Generator of vector lists for making the biginv""" if stream is None: stream = shared.stream now = int(time.time()) diff --git a/minode/tests/test_objects.py b/minode/tests/test_objects.py index dddbc18..fe2b825 100644 --- a/minode/tests/test_objects.py +++ b/minode/tests/test_objects.py @@ -16,7 +16,7 @@ from minode import sql, shared, structure # + get # + filter # = select -# + vectors_to_send +# + biginv_chunks class TestObjects(): @@ -46,7 +46,7 @@ class TestObjects(): self.assertEqual(obj.vector, obj1.vector) self.assertEqual(obj.data, obj1.data) - def test_vectors_to_send(self): + def test_biginv_chunks(self): """Check vectors_to_send method""" needed = set() for _ in range(10): @@ -66,9 +66,9 @@ class TestObjects(): needed.add(obj.vector) self.assertEqual( - set(next(self.objects.vectors_to_send(stream=4))), needed) + set(next(self.objects.biginv_chunks(stream=4))), needed) self.assertTrue( - set(next(self.objects.vectors_to_send())).difference(needed)) + set(next(self.objects.biginv_chunks())).difference(needed)) def test_filter(self): """Check the objects filtering"""