From 05fcbdb45c8c45c0d00218282cda97fca4b30203 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sat, 19 Aug 2023 03:02:19 +0300 Subject: [PATCH 1/6] A rough implementation of proper bootstrapping: added a Bootstrapper connection class, connect() and bootstrap() closures in Manager.manage_connections(). The later is called while shared.unchecked_node_pool is empty. --- minode/connection.py | 9 +++++++ minode/main.py | 26 +++++++++++--------- minode/manager.py | 56 ++++++++++++++++++++++++++++++-------------- 3 files changed, 62 insertions(+), 29 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index b7f9a75..ed727ec 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -507,4 +507,13 @@ class Connection(ConnectionBase): self.vectors_to_send.update(getdata.vectors) +class Bootstrapper(ConnectionBase): + """A special type of connection to find IP nodes""" + def _process_msg_addr(self, m): + super()._process_msg_addr(m) + shared.node_pool.discard((self.host, self.port)) + self.status = 'disconnecting' + self.send_queue.put(None) + + shared.connection = Connection diff --git a/minode/main.py b/minode/main.py index 7b52796..72cebe0 100644 --- a/minode/main.py +++ b/minode/main.py @@ -101,18 +101,22 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements def bootstrap_from_dns(): - """Addes addresses of bootstrap servers to known nodes""" + """Addes addresses of bootstrap servers to core nodes""" try: - for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): - shared.unchecked_node_pool.add((item[4][0], 8080)) - logging.debug( - 'Adding %s to unchecked_node_pool' - ' based on DNS bootstrap method', item[4][0]) - for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): - shared.unchecked_node_pool.add((item[4][0], 8444)) - logging.debug( - 'Adding %s to unchecked_node_pool' - ' based on DNS bootstrap method', item[4][0]) + for port in (8080, 8444): + for item in socket.getaddrinfo( + 'bootstrap{}.bitmessage.org'.format(port), 80, + proto=socket.IPPROTO_TCP + ): + try: + addr = item[4][0] + socket.inet_pton(item[0], addr) + except (TypeError, socket.error): + continue + else: + shared.core_nodes.add((addr, port)) + except socket.gaierror: + logging.info('Failed to do a DNS query') except Exception: logging.info('Error during DNS bootstrap', exc_info=True) diff --git a/minode/manager.py b/minode/manager.py index a03f9d6..b425c4f 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -11,7 +11,7 @@ import threading import time from . import proofofwork, shared, structure -from .connection import Connection +from .connection import Bootstrapper, Connection from .i2p import I2PDialer @@ -20,6 +20,7 @@ class Manager(threading.Thread): def __init__(self): super().__init__(name='Manager') self.q = queue.Queue() + self.bootstrap_pool = [] self.last_cleaned_objects = time.time() self.last_cleaned_connections = time.time() self.last_pickled_objects = time.time() @@ -31,6 +32,8 @@ class Manager(threading.Thread): def run(self): self.load_data() self.clean_objects() + self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool)) + random.shuffle(self.bootstrap_pool) while True: time.sleep(0.8) now = time.time() @@ -68,9 +71,26 @@ class Manager(threading.Thread): with shared.objects_lock: del shared.objects[vector] - @staticmethod - def manage_connections(): + def manage_connections(self): + """Open new connections if needed, remove closed ones""" hosts = set() + + def connect(target, connection_class=Connection): + """ + Open a connection of *connection_class* + to the *target* (host, port) + """ + c = connection_class(*target) + c.start() + with shared.connections_lock: + shared.connections.add(c) + + def bootstrap(): + """Bootstrap from DNS seed-nodes and known nodes""" + target = self.bootstrap_pool.pop() + logging.info('Starting a bootstrapper for %s:%s', *target) + connect(target, Bootstrapper) + outgoing_connections = 0 for c in shared.connections.copy(): if not c.is_alive() or c.status == 'disconnected': @@ -96,17 +116,20 @@ class Manager(threading.Thread): ): if shared.ip_enabled: - if len(shared.unchecked_node_pool) > 16: - to_connect.update(random.sample( - tuple(shared.unchecked_node_pool), 16)) + if shared.unchecked_node_pool: + if len(shared.unchecked_node_pool) > 16: + to_connect.update(random.sample( + tuple(shared.unchecked_node_pool), 16)) + else: + to_connect.update(shared.unchecked_node_pool) + shared.unchecked_node_pool.difference_update(to_connect) + if len(shared.node_pool) > 8: + to_connect.update(random.sample( + tuple(shared.node_pool), 8)) + else: + to_connect.update(shared.node_pool) else: - to_connect.update(shared.unchecked_node_pool) - shared.unchecked_node_pool.difference_update(to_connect) - if len(shared.node_pool) > 8: - to_connect.update(random.sample( - tuple(shared.node_pool), 8)) - else: - to_connect.update(shared.node_pool) + bootstrap() if shared.i2p_enabled: if len(shared.i2p_unchecked_node_pool) > 16: @@ -142,11 +165,8 @@ class Manager(threading.Thread): else: continue else: - c = Connection(host, port) - c.start() + connect((host, port)) hosts.add(group) - with shared.connections_lock: - shared.connections.add(c) shared.hosts = hosts @staticmethod @@ -190,7 +210,7 @@ class Manager(threading.Thread): 'r', newline='', encoding='ascii' ) as src: reader = csv.reader(src) - shared.core_nodes = {tuple(row) for row in reader} + shared.core_nodes = {(row[0], int(row[1])) for row in reader} shared.node_pool.update(shared.core_nodes) with open( -- 2.45.1 From 7053ac84f745d90a281405b5b27f0ce655df7e1c Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sat, 19 Aug 2023 04:14:05 +0300 Subject: [PATCH 2/6] Try not to add core nodes to pool --- minode/connection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/minode/connection.py b/minode/connection.py index ed727ec..05d552a 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -411,7 +411,8 @@ class ConnectionBase(threading.Thread): addr = message.Addr.from_message(m) logging.debug('%s:%s -> %s', self.host_print, self.port, addr) for a in addr.addresses: - shared.unchecked_node_pool.add((a.host, a.port)) + if (a.host, a.port) not in shared.core_nodes: + shared.unchecked_node_pool.add((a.host, a.port)) def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: -- 2.45.1 From ce8bef45b84309fdd18b7fbd086af4ddd93b5b13 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sat, 19 Aug 2023 05:40:10 +0300 Subject: [PATCH 3/6] Reduce number of simultaneous bootstrappers, refill the bootstrap pool --- minode/manager.py | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/minode/manager.py b/minode/manager.py index b425c4f..71f5873 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -29,11 +29,15 @@ class Manager(threading.Thread): self.last_published_i2p_destination = \ time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311 + def fill_bootstrap_pool(self): + """Populate the bootstrap pool by core nodes and checked ones""" + self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool)) + random.shuffle(self.bootstrap_pool) + def run(self): self.load_data() self.clean_objects() - self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool)) - random.shuffle(self.bootstrap_pool) + self.fill_bootstrap_pool() while True: time.sleep(0.8) now = time.time() @@ -87,7 +91,13 @@ class Manager(threading.Thread): def bootstrap(): """Bootstrap from DNS seed-nodes and known nodes""" - target = self.bootstrap_pool.pop() + try: + target = self.bootstrap_pool.pop() + except IndexError: + logging.warning( + 'Ran out of bootstrap nodes, refilling') + self.fill_bootstrap_pool() + return logging.info('Starting a bootstrapper for %s:%s', *target) connect(target, Bootstrapper) @@ -116,20 +126,19 @@ class Manager(threading.Thread): ): if shared.ip_enabled: - if shared.unchecked_node_pool: - if len(shared.unchecked_node_pool) > 16: - to_connect.update(random.sample( - tuple(shared.unchecked_node_pool), 16)) - else: - to_connect.update(shared.unchecked_node_pool) - shared.unchecked_node_pool.difference_update(to_connect) - if len(shared.node_pool) > 8: - to_connect.update(random.sample( - tuple(shared.node_pool), 8)) - else: - to_connect.update(shared.node_pool) + if len(shared.unchecked_node_pool) > 16: + to_connect.update(random.sample( + tuple(shared.unchecked_node_pool), 16)) else: - bootstrap() + to_connect.update(shared.unchecked_node_pool) + if outgoing_connections < shared.outgoing_connections / 2: + bootstrap() + shared.unchecked_node_pool.difference_update(to_connect) + if len(shared.node_pool) > 8: + to_connect.update(random.sample( + tuple(shared.node_pool), 8)) + else: + to_connect.update(shared.node_pool) if shared.i2p_enabled: if len(shared.i2p_unchecked_node_pool) > 16: -- 2.45.1 From 80ca750da210dce765f5459e4c667320ef87a658 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 20 Aug 2023 06:48:05 +0300 Subject: [PATCH 4/6] Added a test for bootstrapping --- minode/tests/test_network.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/minode/tests/test_network.py b/minode/tests/test_network.py index c8cdd31..bd7b60e 100644 --- a/minode/tests/test_network.py +++ b/minode/tests/test_network.py @@ -73,10 +73,32 @@ class TestNetwork(unittest.TestCase): def _make_initial_nodes(self): Manager.load_data() - self.assertGreaterEqual(len(shared.core_nodes), 3) + core_nodes_len = len(shared.core_nodes) + self.assertGreaterEqual(core_nodes_len, 3) main.bootstrap_from_dns() - self.assertGreaterEqual(len(shared.unchecked_node_pool), 3) + self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len) + + def test_bootstrap(self): + """Start bootstrappers and check node pool""" + if shared.core_nodes: + shared.core_nodes = set() + if shared.unchecked_node_pool: + shared.unchecked_node_pool = set() + + self._make_initial_nodes() + self.assertEqual(len(shared.unchecked_node_pool), 0) + + for node in shared.core_nodes: + c = connection.Bootstrapper(*node) + c.start() + c.join() + if len(shared.unchecked_node_pool) > 2: + break + else: + self.fail( + 'Failed to find at least 3 nodes' + ' after running %s bootstrappers' % len(shared.core_nodes)) def test_connection(self): """Check a normal connection - should receive objects""" -- 2.45.1 From 144c3240db873393b774ae819dc02c93027ec2a6 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Wed, 17 Jul 2024 12:57:05 +0300 Subject: [PATCH 5/6] Ensure main.bootstrap_from_dns() adds IPv6 addresses to the core nodes --- minode/tests/test_network.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/minode/tests/test_network.py b/minode/tests/test_network.py index bd7b60e..f1ae7df 100644 --- a/minode/tests/test_network.py +++ b/minode/tests/test_network.py @@ -1,4 +1,5 @@ """Tests for network connections""" +import ipaddress import logging import os import random @@ -78,6 +79,17 @@ class TestNetwork(unittest.TestCase): main.bootstrap_from_dns() self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len) + for host, _ in shared.core_nodes: + try: + ipaddress.IPv4Address(host) + except ipaddress.AddressValueError: + try: + ipaddress.IPv6Address(host) + except ipaddress.AddressValueError: + self.fail('Found not an IP address in the core nodes') + break + else: + self.fail('No IPv6 address found in the core nodes') def test_bootstrap(self): """Start bootstrappers and check node pool""" -- 2.45.1 From f09509893ff575b93e1a597904c02c5f152a5a8f Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Wed, 17 Jul 2024 13:31:26 +0300 Subject: [PATCH 6/6] Test bootstrapping with a minode process --- minode/tests/test_network.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/minode/tests/test_network.py b/minode/tests/test_network.py index f1ae7df..c683637 100644 --- a/minode/tests/test_network.py +++ b/minode/tests/test_network.py @@ -249,3 +249,31 @@ class TestListener(TestProcessProto): if c.status == 'fully_established': self.fail('Established a connection') time.sleep(0.5) + + +class TestBootstrapProcess(TestProcessProto): + """A separate test case for bootstrapping with a minode process""" + _listen = True + _connection_limit = 24 + + def test_bootstrap(self): + """Start a bootstrapper for the local process and check node pool""" + if shared.unchecked_node_pool: + shared.unchecked_node_pool = set() + + started = time.time() + while not self.connections(): + if time.time() - started > 60: + self.fail('Failed to establish a connection') + time.sleep(1) + + for _ in range(3): + c = connection.Bootstrapper('127.0.0.1', 8444) + c.start() + c.join() + if len(shared.unchecked_node_pool) > 2: + break + else: + self.fail( + 'Failed to find at least 3 nodes' + ' after 3 tries to bootstrap with the local process') -- 2.45.1