Compare commits
No commits in common. "f09509893ff575b93e1a597904c02c5f152a5a8f" and "d106078dac4c869dbc9d3c329d014e571924c39c" have entirely different histories.
f09509893f
...
d106078dac
|
@ -411,8 +411,7 @@ class ConnectionBase(threading.Thread):
|
||||||
addr = message.Addr.from_message(m)
|
addr = message.Addr.from_message(m)
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
|
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
|
||||||
for a in addr.addresses:
|
for a in addr.addresses:
|
||||||
if (a.host, a.port) not in shared.core_nodes:
|
shared.unchecked_node_pool.add((a.host, a.port))
|
||||||
shared.unchecked_node_pool.add((a.host, a.port))
|
|
||||||
|
|
||||||
def _request_objects(self):
|
def _request_objects(self):
|
||||||
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
||||||
|
@ -508,13 +507,4 @@ class Connection(ConnectionBase):
|
||||||
self.vectors_to_send.update(getdata.vectors)
|
self.vectors_to_send.update(getdata.vectors)
|
||||||
|
|
||||||
|
|
||||||
class Bootstrapper(ConnectionBase):
|
|
||||||
"""A special type of connection to find IP nodes"""
|
|
||||||
def _process_msg_addr(self, m):
|
|
||||||
super()._process_msg_addr(m)
|
|
||||||
shared.node_pool.discard((self.host, self.port))
|
|
||||||
self.status = 'disconnecting'
|
|
||||||
self.send_queue.put(None)
|
|
||||||
|
|
||||||
|
|
||||||
shared.connection = Connection
|
shared.connection = Connection
|
||||||
|
|
|
@ -101,22 +101,18 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
|
||||||
|
|
||||||
|
|
||||||
def bootstrap_from_dns():
|
def bootstrap_from_dns():
|
||||||
"""Addes addresses of bootstrap servers to core nodes"""
|
"""Addes addresses of bootstrap servers to known nodes"""
|
||||||
try:
|
try:
|
||||||
for port in (8080, 8444):
|
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||||
for item in socket.getaddrinfo(
|
shared.unchecked_node_pool.add((item[4][0], 8080))
|
||||||
'bootstrap{}.bitmessage.org'.format(port), 80,
|
logging.debug(
|
||||||
proto=socket.IPPROTO_TCP
|
'Adding %s to unchecked_node_pool'
|
||||||
):
|
' based on DNS bootstrap method', item[4][0])
|
||||||
try:
|
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
||||||
addr = item[4][0]
|
shared.unchecked_node_pool.add((item[4][0], 8444))
|
||||||
socket.inet_pton(item[0], addr)
|
logging.debug(
|
||||||
except (TypeError, socket.error):
|
'Adding %s to unchecked_node_pool'
|
||||||
continue
|
' based on DNS bootstrap method', item[4][0])
|
||||||
else:
|
|
||||||
shared.core_nodes.add((addr, port))
|
|
||||||
except socket.gaierror:
|
|
||||||
logging.info('Failed to do a DNS query')
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.info('Error during DNS bootstrap', exc_info=True)
|
logging.info('Error during DNS bootstrap', exc_info=True)
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from . import proofofwork, shared, structure
|
from . import proofofwork, shared, structure
|
||||||
from .connection import Bootstrapper, Connection
|
from .connection import Connection
|
||||||
from .i2p import I2PDialer
|
from .i2p import I2PDialer
|
||||||
|
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ class Manager(threading.Thread):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(name='Manager')
|
super().__init__(name='Manager')
|
||||||
self.q = queue.Queue()
|
self.q = queue.Queue()
|
||||||
self.bootstrap_pool = []
|
|
||||||
self.last_cleaned_objects = time.time()
|
self.last_cleaned_objects = time.time()
|
||||||
self.last_cleaned_connections = time.time()
|
self.last_cleaned_connections = time.time()
|
||||||
self.last_pickled_objects = time.time()
|
self.last_pickled_objects = time.time()
|
||||||
|
@ -29,15 +28,9 @@ class Manager(threading.Thread):
|
||||||
self.last_published_i2p_destination = \
|
self.last_published_i2p_destination = \
|
||||||
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
|
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
|
||||||
|
|
||||||
def fill_bootstrap_pool(self):
|
|
||||||
"""Populate the bootstrap pool by core nodes and checked ones"""
|
|
||||||
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
|
|
||||||
random.shuffle(self.bootstrap_pool)
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.load_data()
|
self.load_data()
|
||||||
self.clean_objects()
|
self.clean_objects()
|
||||||
self.fill_bootstrap_pool()
|
|
||||||
while True:
|
while True:
|
||||||
time.sleep(0.8)
|
time.sleep(0.8)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
@ -75,32 +68,9 @@ class Manager(threading.Thread):
|
||||||
with shared.objects_lock:
|
with shared.objects_lock:
|
||||||
del shared.objects[vector]
|
del shared.objects[vector]
|
||||||
|
|
||||||
def manage_connections(self):
|
@staticmethod
|
||||||
"""Open new connections if needed, remove closed ones"""
|
def manage_connections():
|
||||||
hosts = set()
|
hosts = set()
|
||||||
|
|
||||||
def connect(target, connection_class=Connection):
|
|
||||||
"""
|
|
||||||
Open a connection of *connection_class*
|
|
||||||
to the *target* (host, port)
|
|
||||||
"""
|
|
||||||
c = connection_class(*target)
|
|
||||||
c.start()
|
|
||||||
with shared.connections_lock:
|
|
||||||
shared.connections.add(c)
|
|
||||||
|
|
||||||
def bootstrap():
|
|
||||||
"""Bootstrap from DNS seed-nodes and known nodes"""
|
|
||||||
try:
|
|
||||||
target = self.bootstrap_pool.pop()
|
|
||||||
except IndexError:
|
|
||||||
logging.warning(
|
|
||||||
'Ran out of bootstrap nodes, refilling')
|
|
||||||
self.fill_bootstrap_pool()
|
|
||||||
return
|
|
||||||
logging.info('Starting a bootstrapper for %s:%s', *target)
|
|
||||||
connect(target, Bootstrapper)
|
|
||||||
|
|
||||||
outgoing_connections = 0
|
outgoing_connections = 0
|
||||||
for c in shared.connections.copy():
|
for c in shared.connections.copy():
|
||||||
if not c.is_alive() or c.status == 'disconnected':
|
if not c.is_alive() or c.status == 'disconnected':
|
||||||
|
@ -131,8 +101,6 @@ class Manager(threading.Thread):
|
||||||
tuple(shared.unchecked_node_pool), 16))
|
tuple(shared.unchecked_node_pool), 16))
|
||||||
else:
|
else:
|
||||||
to_connect.update(shared.unchecked_node_pool)
|
to_connect.update(shared.unchecked_node_pool)
|
||||||
if outgoing_connections < shared.outgoing_connections / 2:
|
|
||||||
bootstrap()
|
|
||||||
shared.unchecked_node_pool.difference_update(to_connect)
|
shared.unchecked_node_pool.difference_update(to_connect)
|
||||||
if len(shared.node_pool) > 8:
|
if len(shared.node_pool) > 8:
|
||||||
to_connect.update(random.sample(
|
to_connect.update(random.sample(
|
||||||
|
@ -174,8 +142,11 @@ class Manager(threading.Thread):
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
connect((host, port))
|
c = Connection(host, port)
|
||||||
|
c.start()
|
||||||
hosts.add(group)
|
hosts.add(group)
|
||||||
|
with shared.connections_lock:
|
||||||
|
shared.connections.add(c)
|
||||||
shared.hosts = hosts
|
shared.hosts = hosts
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -219,7 +190,7 @@ class Manager(threading.Thread):
|
||||||
'r', newline='', encoding='ascii'
|
'r', newline='', encoding='ascii'
|
||||||
) as src:
|
) as src:
|
||||||
reader = csv.reader(src)
|
reader = csv.reader(src)
|
||||||
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
|
shared.core_nodes = {tuple(row) for row in reader}
|
||||||
shared.node_pool.update(shared.core_nodes)
|
shared.node_pool.update(shared.core_nodes)
|
||||||
|
|
||||||
with open(
|
with open(
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
"""Tests for network connections"""
|
"""Tests for network connections"""
|
||||||
import ipaddress
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
|
@ -74,43 +73,10 @@ class TestNetwork(unittest.TestCase):
|
||||||
|
|
||||||
def _make_initial_nodes(self):
|
def _make_initial_nodes(self):
|
||||||
Manager.load_data()
|
Manager.load_data()
|
||||||
core_nodes_len = len(shared.core_nodes)
|
self.assertGreaterEqual(len(shared.core_nodes), 3)
|
||||||
self.assertGreaterEqual(core_nodes_len, 3)
|
|
||||||
|
|
||||||
main.bootstrap_from_dns()
|
main.bootstrap_from_dns()
|
||||||
self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len)
|
self.assertGreaterEqual(len(shared.unchecked_node_pool), 3)
|
||||||
for host, _ in shared.core_nodes:
|
|
||||||
try:
|
|
||||||
ipaddress.IPv4Address(host)
|
|
||||||
except ipaddress.AddressValueError:
|
|
||||||
try:
|
|
||||||
ipaddress.IPv6Address(host)
|
|
||||||
except ipaddress.AddressValueError:
|
|
||||||
self.fail('Found not an IP address in the core nodes')
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.fail('No IPv6 address found in the core nodes')
|
|
||||||
|
|
||||||
def test_bootstrap(self):
|
|
||||||
"""Start bootstrappers and check node pool"""
|
|
||||||
if shared.core_nodes:
|
|
||||||
shared.core_nodes = set()
|
|
||||||
if shared.unchecked_node_pool:
|
|
||||||
shared.unchecked_node_pool = set()
|
|
||||||
|
|
||||||
self._make_initial_nodes()
|
|
||||||
self.assertEqual(len(shared.unchecked_node_pool), 0)
|
|
||||||
|
|
||||||
for node in shared.core_nodes:
|
|
||||||
c = connection.Bootstrapper(*node)
|
|
||||||
c.start()
|
|
||||||
c.join()
|
|
||||||
if len(shared.unchecked_node_pool) > 2:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.fail(
|
|
||||||
'Failed to find at least 3 nodes'
|
|
||||||
' after running %s bootstrappers' % len(shared.core_nodes))
|
|
||||||
|
|
||||||
def test_connection(self):
|
def test_connection(self):
|
||||||
"""Check a normal connection - should receive objects"""
|
"""Check a normal connection - should receive objects"""
|
||||||
|
@ -249,31 +215,3 @@ class TestListener(TestProcessProto):
|
||||||
if c.status == 'fully_established':
|
if c.status == 'fully_established':
|
||||||
self.fail('Established a connection')
|
self.fail('Established a connection')
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
class TestBootstrapProcess(TestProcessProto):
|
|
||||||
"""A separate test case for bootstrapping with a minode process"""
|
|
||||||
_listen = True
|
|
||||||
_connection_limit = 24
|
|
||||||
|
|
||||||
def test_bootstrap(self):
|
|
||||||
"""Start a bootstrapper for the local process and check node pool"""
|
|
||||||
if shared.unchecked_node_pool:
|
|
||||||
shared.unchecked_node_pool = set()
|
|
||||||
|
|
||||||
started = time.time()
|
|
||||||
while not self.connections():
|
|
||||||
if time.time() - started > 60:
|
|
||||||
self.fail('Failed to establish a connection')
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
for _ in range(3):
|
|
||||||
c = connection.Bootstrapper('127.0.0.1', 8444)
|
|
||||||
c.start()
|
|
||||||
c.join()
|
|
||||||
if len(shared.unchecked_node_pool) > 2:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.fail(
|
|
||||||
'Failed to find at least 3 nodes'
|
|
||||||
' after 3 tries to bootstrap with the local process')
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user