Compare commits

...

6 Commits

Author SHA1 Message Date
f09509893f
Test bootstrapping with a minode process
All checks were successful
Testing / default (push) Successful in 8m45s
2024-09-23 04:35:49 +03:00
144c3240db
Ensure main.bootstrap_from_dns() adds IPv6 addresses to the core nodes 2024-09-23 04:35:49 +03:00
80ca750da2
Added a test for bootstrapping 2024-09-23 04:35:48 +03:00
ce8bef45b8
Reduce number of simultaneous bootstrappers, refill the bootstrap pool 2024-09-23 04:33:22 +03:00
7053ac84f7
Try not to add core nodes to pool 2024-09-23 04:33:21 +03:00
05fcbdb45c
A rough implementation of proper bootstrapping:
added a Bootstrapper connection class, connect() and bootstrap() closures
in Manager.manage_connections(). The later is called while
shared.unchecked_node_pool is empty.
2024-09-23 04:28:33 +03:00
4 changed files with 127 additions and 22 deletions

View File

@ -411,6 +411,7 @@ class ConnectionBase(threading.Thread):
addr = message.Addr.from_message(m) addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr) logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses: for a in addr.addresses:
if (a.host, a.port) not in shared.core_nodes:
shared.unchecked_node_pool.add((a.host, a.port)) shared.unchecked_node_pool.add((a.host, a.port))
def _request_objects(self): def _request_objects(self):
@ -507,4 +508,13 @@ class Connection(ConnectionBase):
self.vectors_to_send.update(getdata.vectors) self.vectors_to_send.update(getdata.vectors)
class Bootstrapper(ConnectionBase):
"""A special type of connection to find IP nodes"""
def _process_msg_addr(self, m):
super()._process_msg_addr(m)
shared.node_pool.discard((self.host, self.port))
self.status = 'disconnecting'
self.send_queue.put(None)
shared.connection = Connection shared.connection = Connection

View File

@ -101,18 +101,22 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
def bootstrap_from_dns(): def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes""" """Addes addresses of bootstrap servers to core nodes"""
try: try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): for port in (8080, 8444):
shared.unchecked_node_pool.add((item[4][0], 8080)) for item in socket.getaddrinfo(
logging.debug( 'bootstrap{}.bitmessage.org'.format(port), 80,
'Adding %s to unchecked_node_pool' proto=socket.IPPROTO_TCP
' based on DNS bootstrap method', item[4][0]) ):
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): try:
shared.unchecked_node_pool.add((item[4][0], 8444)) addr = item[4][0]
logging.debug( socket.inet_pton(item[0], addr)
'Adding %s to unchecked_node_pool' except (TypeError, socket.error):
' based on DNS bootstrap method', item[4][0]) continue
else:
shared.core_nodes.add((addr, port))
except socket.gaierror:
logging.info('Failed to do a DNS query')
except Exception: except Exception:
logging.info('Error during DNS bootstrap', exc_info=True) logging.info('Error during DNS bootstrap', exc_info=True)

View File

@ -11,7 +11,7 @@ import threading
import time import time
from . import proofofwork, shared, structure from . import proofofwork, shared, structure
from .connection import Connection from .connection import Bootstrapper, Connection
from .i2p import I2PDialer from .i2p import I2PDialer
@ -20,6 +20,7 @@ class Manager(threading.Thread):
def __init__(self): def __init__(self):
super().__init__(name='Manager') super().__init__(name='Manager')
self.q = queue.Queue() self.q = queue.Queue()
self.bootstrap_pool = []
self.last_cleaned_objects = time.time() self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time() self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time() self.last_pickled_objects = time.time()
@ -28,9 +29,15 @@ class Manager(threading.Thread):
self.last_published_i2p_destination = \ self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311 time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def fill_bootstrap_pool(self):
"""Populate the bootstrap pool by core nodes and checked ones"""
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
random.shuffle(self.bootstrap_pool)
def run(self): def run(self):
self.load_data() self.load_data()
self.clean_objects() self.clean_objects()
self.fill_bootstrap_pool()
while True: while True:
time.sleep(0.8) time.sleep(0.8)
now = time.time() now = time.time()
@ -68,9 +75,32 @@ class Manager(threading.Thread):
with shared.objects_lock: with shared.objects_lock:
del shared.objects[vector] del shared.objects[vector]
@staticmethod def manage_connections(self):
def manage_connections(): """Open new connections if needed, remove closed ones"""
hosts = set() hosts = set()
def connect(target, connection_class=Connection):
"""
Open a connection of *connection_class*
to the *target* (host, port)
"""
c = connection_class(*target)
c.start()
with shared.connections_lock:
shared.connections.add(c)
def bootstrap():
"""Bootstrap from DNS seed-nodes and known nodes"""
try:
target = self.bootstrap_pool.pop()
except IndexError:
logging.warning(
'Ran out of bootstrap nodes, refilling')
self.fill_bootstrap_pool()
return
logging.info('Starting a bootstrapper for %s:%s', *target)
connect(target, Bootstrapper)
outgoing_connections = 0 outgoing_connections = 0
for c in shared.connections.copy(): for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected': if not c.is_alive() or c.status == 'disconnected':
@ -101,6 +131,8 @@ class Manager(threading.Thread):
tuple(shared.unchecked_node_pool), 16)) tuple(shared.unchecked_node_pool), 16))
else: else:
to_connect.update(shared.unchecked_node_pool) to_connect.update(shared.unchecked_node_pool)
if outgoing_connections < shared.outgoing_connections / 2:
bootstrap()
shared.unchecked_node_pool.difference_update(to_connect) shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8: if len(shared.node_pool) > 8:
to_connect.update(random.sample( to_connect.update(random.sample(
@ -142,11 +174,8 @@ class Manager(threading.Thread):
else: else:
continue continue
else: else:
c = Connection(host, port) connect((host, port))
c.start()
hosts.add(group) hosts.add(group)
with shared.connections_lock:
shared.connections.add(c)
shared.hosts = hosts shared.hosts = hosts
@staticmethod @staticmethod
@ -190,7 +219,7 @@ class Manager(threading.Thread):
'r', newline='', encoding='ascii' 'r', newline='', encoding='ascii'
) as src: ) as src:
reader = csv.reader(src) reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader} shared.core_nodes = {(row[0], int(row[1])) for row in reader}
shared.node_pool.update(shared.core_nodes) shared.node_pool.update(shared.core_nodes)
with open( with open(

View File

@ -1,4 +1,5 @@
"""Tests for network connections""" """Tests for network connections"""
import ipaddress
import logging import logging
import os import os
import random import random
@ -73,10 +74,43 @@ class TestNetwork(unittest.TestCase):
def _make_initial_nodes(self): def _make_initial_nodes(self):
Manager.load_data() Manager.load_data()
self.assertGreaterEqual(len(shared.core_nodes), 3) core_nodes_len = len(shared.core_nodes)
self.assertGreaterEqual(core_nodes_len, 3)
main.bootstrap_from_dns() main.bootstrap_from_dns()
self.assertGreaterEqual(len(shared.unchecked_node_pool), 3) self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len)
for host, _ in shared.core_nodes:
try:
ipaddress.IPv4Address(host)
except ipaddress.AddressValueError:
try:
ipaddress.IPv6Address(host)
except ipaddress.AddressValueError:
self.fail('Found not an IP address in the core nodes')
break
else:
self.fail('No IPv6 address found in the core nodes')
def test_bootstrap(self):
"""Start bootstrappers and check node pool"""
if shared.core_nodes:
shared.core_nodes = set()
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
self._make_initial_nodes()
self.assertEqual(len(shared.unchecked_node_pool), 0)
for node in shared.core_nodes:
c = connection.Bootstrapper(*node)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after running %s bootstrappers' % len(shared.core_nodes))
def test_connection(self): def test_connection(self):
"""Check a normal connection - should receive objects""" """Check a normal connection - should receive objects"""
@ -215,3 +249,31 @@ class TestListener(TestProcessProto):
if c.status == 'fully_established': if c.status == 'fully_established':
self.fail('Established a connection') self.fail('Established a connection')
time.sleep(0.5) time.sleep(0.5)
class TestBootstrapProcess(TestProcessProto):
"""A separate test case for bootstrapping with a minode process"""
_listen = True
_connection_limit = 24
def test_bootstrap(self):
"""Start a bootstrapper for the local process and check node pool"""
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
started = time.time()
while not self.connections():
if time.time() - started > 60:
self.fail('Failed to establish a connection')
time.sleep(1)
for _ in range(3):
c = connection.Bootstrapper('127.0.0.1', 8444)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after 3 tries to bootstrap with the local process')