A rough implementation of proper bootstrapping

This commit is contained in:
Lee Miller 2023-08-19 03:02:19 +03:00
parent 70f887956c
commit cf7c03c9f5
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
3 changed files with 58 additions and 30 deletions

View File

@ -507,4 +507,12 @@ class Connection(ConnectionBase):
self.vectors_to_send.update(getdata.vectors)
class Bootstrapper(ConnectionBase):
def _process_msg_addr(self, m):
super()._process_msg_addr(m)
shared.node_pool.discard((self.host, self.port))
self.status = 'disconnecting'
self.send_queue.put(None)
shared.connection = Connection

View File

@ -101,18 +101,22 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
"""Addes addresses of bootstrap servers to core nodes"""
try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8080))
logging.debug(
'Adding %s to unchecked_node_pool'
' based on DNS bootstrap method', item[4][0])
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8444))
logging.debug(
'Adding %s to unchecked_node_pool'
' based on DNS bootstrap method', item[4][0])
for port in (8080, 8444):
for item in socket.getaddrinfo(
'bootstrap{}.bitmessage.org'.format(port), 80,
proto=socket.IPPROTO_TCP
):
try:
addr = item[4][0]
socket.inet_aton(addr)
except (TypeError, socket.error):
continue
else:
shared.core_nodes.add((addr, port))
except socket.gaierror:
pass
except Exception:
logging.info('Error during DNS bootstrap', exc_info=True)

View File

@ -11,7 +11,7 @@ import threading
import time
from . import proofofwork, shared, structure
from .connection import Connection
from .connection import Bootstrapper, Connection
from .i2p import I2PDialer
@ -20,6 +20,7 @@ class Manager(threading.Thread):
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
self.bootstrap_pool = []
self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time()
@ -31,6 +32,8 @@ class Manager(threading.Thread):
def run(self):
self.load_data()
self.clean_objects()
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
random.shuffle(self.bootstrap_pool)
while True:
time.sleep(0.8)
now = time.time()
@ -68,9 +71,22 @@ class Manager(threading.Thread):
with shared.objects_lock:
del shared.objects[vector]
@staticmethod
def manage_connections():
def manage_connections(self):
hosts = set()
def connect(target, connection_class=Connection):
c = connection_class(*target)
c.start()
hosts.add(c.host)
with shared.connections_lock:
shared.connections.add(c)
def bootstrap():
"""Bootstrap from DNS seed-nodes and known nodes"""
target = self.bootstrap_pool.pop()
logging.info('Starting a bootstrapper for %s:%s', *target)
connect(target, Bootstrapper)
outgoing_connections = 0
for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected':
@ -96,17 +112,20 @@ class Manager(threading.Thread):
):
if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16:
to_connect.update(random.sample(
tuple(shared.unchecked_node_pool), 16))
if shared.unchecked_node_pool:
if len(shared.unchecked_node_pool) > 16:
to_connect.update(random.sample(
tuple(shared.unchecked_node_pool), 16))
else:
to_connect.update(shared.unchecked_node_pool)
shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8:
to_connect.update(random.sample(
tuple(shared.node_pool), 8))
else:
to_connect.update(shared.node_pool)
else:
to_connect.update(shared.unchecked_node_pool)
shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8:
to_connect.update(random.sample(
tuple(shared.node_pool), 8))
else:
to_connect.update(shared.node_pool)
bootstrap()
if shared.i2p_enabled:
if len(shared.i2p_unchecked_node_pool) > 16:
@ -141,11 +160,8 @@ class Manager(threading.Thread):
else:
continue
else:
c = Connection(addr[0], addr[1])
c.start()
hosts.add(c.host)
with shared.connections_lock:
shared.connections.add(c)
connect((addr[0], addr[1]))
shared.hosts = hosts
@staticmethod
@ -189,7 +205,7 @@ class Manager(threading.Thread):
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(