A rough implementation of proper bootstrapping:
added a Bootstrapper connection class, connect() and bootstrap() closures in Manager.manage_connections(). The later is called while shared.unchecked_node_pool is empty.
This commit is contained in:
parent
d106078dac
commit
05fcbdb45c
|
@ -507,4 +507,13 @@ class Connection(ConnectionBase):
|
|||
self.vectors_to_send.update(getdata.vectors)
|
||||
|
||||
|
||||
class Bootstrapper(ConnectionBase):
|
||||
"""A special type of connection to find IP nodes"""
|
||||
def _process_msg_addr(self, m):
|
||||
super()._process_msg_addr(m)
|
||||
shared.node_pool.discard((self.host, self.port))
|
||||
self.status = 'disconnecting'
|
||||
self.send_queue.put(None)
|
||||
|
||||
|
||||
shared.connection = Connection
|
||||
|
|
|
@ -101,18 +101,22 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
|
|||
|
||||
|
||||
def bootstrap_from_dns():
|
||||
"""Addes addresses of bootstrap servers to known nodes"""
|
||||
"""Addes addresses of bootstrap servers to core nodes"""
|
||||
try:
|
||||
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||
shared.unchecked_node_pool.add((item[4][0], 8080))
|
||||
logging.debug(
|
||||
'Adding %s to unchecked_node_pool'
|
||||
' based on DNS bootstrap method', item[4][0])
|
||||
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
||||
shared.unchecked_node_pool.add((item[4][0], 8444))
|
||||
logging.debug(
|
||||
'Adding %s to unchecked_node_pool'
|
||||
' based on DNS bootstrap method', item[4][0])
|
||||
for port in (8080, 8444):
|
||||
for item in socket.getaddrinfo(
|
||||
'bootstrap{}.bitmessage.org'.format(port), 80,
|
||||
proto=socket.IPPROTO_TCP
|
||||
):
|
||||
try:
|
||||
addr = item[4][0]
|
||||
socket.inet_pton(item[0], addr)
|
||||
except (TypeError, socket.error):
|
||||
continue
|
||||
else:
|
||||
shared.core_nodes.add((addr, port))
|
||||
except socket.gaierror:
|
||||
logging.info('Failed to do a DNS query')
|
||||
except Exception:
|
||||
logging.info('Error during DNS bootstrap', exc_info=True)
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import threading
|
|||
import time
|
||||
|
||||
from . import proofofwork, shared, structure
|
||||
from .connection import Connection
|
||||
from .connection import Bootstrapper, Connection
|
||||
from .i2p import I2PDialer
|
||||
|
||||
|
||||
|
@ -20,6 +20,7 @@ class Manager(threading.Thread):
|
|||
def __init__(self):
|
||||
super().__init__(name='Manager')
|
||||
self.q = queue.Queue()
|
||||
self.bootstrap_pool = []
|
||||
self.last_cleaned_objects = time.time()
|
||||
self.last_cleaned_connections = time.time()
|
||||
self.last_pickled_objects = time.time()
|
||||
|
@ -31,6 +32,8 @@ class Manager(threading.Thread):
|
|||
def run(self):
|
||||
self.load_data()
|
||||
self.clean_objects()
|
||||
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
|
||||
random.shuffle(self.bootstrap_pool)
|
||||
while True:
|
||||
time.sleep(0.8)
|
||||
now = time.time()
|
||||
|
@ -68,9 +71,26 @@ class Manager(threading.Thread):
|
|||
with shared.objects_lock:
|
||||
del shared.objects[vector]
|
||||
|
||||
@staticmethod
|
||||
def manage_connections():
|
||||
def manage_connections(self):
|
||||
"""Open new connections if needed, remove closed ones"""
|
||||
hosts = set()
|
||||
|
||||
def connect(target, connection_class=Connection):
|
||||
"""
|
||||
Open a connection of *connection_class*
|
||||
to the *target* (host, port)
|
||||
"""
|
||||
c = connection_class(*target)
|
||||
c.start()
|
||||
with shared.connections_lock:
|
||||
shared.connections.add(c)
|
||||
|
||||
def bootstrap():
|
||||
"""Bootstrap from DNS seed-nodes and known nodes"""
|
||||
target = self.bootstrap_pool.pop()
|
||||
logging.info('Starting a bootstrapper for %s:%s', *target)
|
||||
connect(target, Bootstrapper)
|
||||
|
||||
outgoing_connections = 0
|
||||
for c in shared.connections.copy():
|
||||
if not c.is_alive() or c.status == 'disconnected':
|
||||
|
@ -96,17 +116,20 @@ class Manager(threading.Thread):
|
|||
):
|
||||
|
||||
if shared.ip_enabled:
|
||||
if len(shared.unchecked_node_pool) > 16:
|
||||
to_connect.update(random.sample(
|
||||
tuple(shared.unchecked_node_pool), 16))
|
||||
if shared.unchecked_node_pool:
|
||||
if len(shared.unchecked_node_pool) > 16:
|
||||
to_connect.update(random.sample(
|
||||
tuple(shared.unchecked_node_pool), 16))
|
||||
else:
|
||||
to_connect.update(shared.unchecked_node_pool)
|
||||
shared.unchecked_node_pool.difference_update(to_connect)
|
||||
if len(shared.node_pool) > 8:
|
||||
to_connect.update(random.sample(
|
||||
tuple(shared.node_pool), 8))
|
||||
else:
|
||||
to_connect.update(shared.node_pool)
|
||||
else:
|
||||
to_connect.update(shared.unchecked_node_pool)
|
||||
shared.unchecked_node_pool.difference_update(to_connect)
|
||||
if len(shared.node_pool) > 8:
|
||||
to_connect.update(random.sample(
|
||||
tuple(shared.node_pool), 8))
|
||||
else:
|
||||
to_connect.update(shared.node_pool)
|
||||
bootstrap()
|
||||
|
||||
if shared.i2p_enabled:
|
||||
if len(shared.i2p_unchecked_node_pool) > 16:
|
||||
|
@ -142,11 +165,8 @@ class Manager(threading.Thread):
|
|||
else:
|
||||
continue
|
||||
else:
|
||||
c = Connection(host, port)
|
||||
c.start()
|
||||
connect((host, port))
|
||||
hosts.add(group)
|
||||
with shared.connections_lock:
|
||||
shared.connections.add(c)
|
||||
shared.hosts = hosts
|
||||
|
||||
@staticmethod
|
||||
|
@ -190,7 +210,7 @@ class Manager(threading.Thread):
|
|||
'r', newline='', encoding='ascii'
|
||||
) as src:
|
||||
reader = csv.reader(src)
|
||||
shared.core_nodes = {tuple(row) for row in reader}
|
||||
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
|
||||
shared.node_pool.update(shared.core_nodes)
|
||||
|
||||
with open(
|
||||
|
|
Loading…
Reference in New Issue
Block a user