Compare commits
5 Commits
Author | SHA1 | Date |
---|---|---|
Lee Miller | 210284acc9 | |
Lee Miller | 39f92fd1b2 | |
Lee Miller | 032cfb1bab | |
Lee Miller | 6f9f15e895 | |
Lee Miller | e24271c728 |
|
@ -15,8 +15,11 @@ import time
|
||||||
from . import message, shared, structure
|
from . import message, shared, structure
|
||||||
|
|
||||||
|
|
||||||
class Connection(threading.Thread):
|
class ConnectionBase(threading.Thread):
|
||||||
"""The connection object"""
|
"""
|
||||||
|
Common code for the connection thread
|
||||||
|
with minimum command handlers to reuse
|
||||||
|
"""
|
||||||
def __init__(
|
def __init__(
|
||||||
self, host, port, s=None, network='ip', server=False,
|
self, host, port, s=None, network='ip', server=False,
|
||||||
i2p_remote_dest=b''
|
i2p_remote_dest=b''
|
||||||
|
@ -334,87 +337,13 @@ class Connection(threading.Thread):
|
||||||
break
|
break
|
||||||
|
|
||||||
def _process_message(self, m):
|
def _process_message(self, m):
|
||||||
if m.command == b'version':
|
if m.command == b'verack':
|
||||||
version = message.Version.from_bytes(m.to_bytes())
|
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
|
|
||||||
if (
|
|
||||||
version.protocol_version != shared.protocol_version
|
|
||||||
or version.nonce == shared.nonce
|
|
||||||
):
|
|
||||||
self.status = 'disconnecting'
|
|
||||||
self.send_queue.put(None)
|
|
||||||
else:
|
|
||||||
logging.info(
|
|
||||||
'%s:%s claims to be %s',
|
|
||||||
self.host_print, self.port, version.user_agent)
|
|
||||||
self.send_queue.put(message.Message(b'verack', b''))
|
|
||||||
self.verack_sent = True
|
|
||||||
self.remote_version = version
|
|
||||||
if not self.server:
|
|
||||||
self.send_queue.put('fully_established')
|
|
||||||
if self.network == 'ip':
|
|
||||||
shared.address_advertise_queue.put(structure.NetAddr(
|
|
||||||
version.services, self.host, self.port))
|
|
||||||
shared.node_pool.add((self.host, self.port))
|
|
||||||
elif self.network == 'i2p':
|
|
||||||
shared.i2p_node_pool.add((self.host, 'i2p'))
|
|
||||||
if self.network == 'ip':
|
|
||||||
shared.address_advertise_queue.put(structure.NetAddr(
|
|
||||||
shared.services, version.host, shared.listening_port))
|
|
||||||
if self.server:
|
|
||||||
if self.network == 'ip':
|
|
||||||
self.send_queue.put(
|
|
||||||
message.Version(self.host, self.port))
|
|
||||||
else:
|
|
||||||
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
|
||||||
|
|
||||||
elif m.command == b'verack':
|
|
||||||
self.verack_received = True
|
self.verack_received = True
|
||||||
logging.debug(
|
logging.debug(
|
||||||
'%s:%s -> %s', self.host_print, self.port, 'verack')
|
'%s:%s -> %s', self.host_print, self.port, 'verack')
|
||||||
if self.server:
|
if self.server:
|
||||||
self.send_queue.put('fully_established')
|
self.send_queue.put('fully_established')
|
||||||
|
|
||||||
elif m.command == b'inv':
|
|
||||||
inv = message.Inv.from_message(m)
|
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
|
||||||
to_get = inv.vectors.copy()
|
|
||||||
to_get.difference_update(shared.objects.keys())
|
|
||||||
self.vectors_to_get.update(to_get)
|
|
||||||
# Do not send objects they already have.
|
|
||||||
self.vectors_to_send.difference_update(inv.vectors)
|
|
||||||
|
|
||||||
elif m.command == b'object':
|
|
||||||
obj = structure.Object.from_message(m)
|
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
|
|
||||||
self.vectors_requested.pop(obj.vector, None)
|
|
||||||
self.vectors_to_get.discard(obj.vector)
|
|
||||||
if obj.is_valid() and obj.vector not in shared.objects:
|
|
||||||
with shared.objects_lock:
|
|
||||||
shared.objects[obj.vector] = obj
|
|
||||||
if (
|
|
||||||
obj.object_type == shared.i2p_dest_obj_type
|
|
||||||
and obj.version == shared.i2p_dest_obj_version
|
|
||||||
):
|
|
||||||
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
|
|
||||||
logging.debug(
|
|
||||||
'Received I2P destination object,'
|
|
||||||
' adding to i2p_unchecked_node_pool')
|
|
||||||
logging.debug(dest)
|
|
||||||
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
|
||||||
shared.vector_advertise_queue.put(obj.vector)
|
|
||||||
|
|
||||||
elif m.command == b'getdata':
|
|
||||||
getdata = message.GetData.from_message(m)
|
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
|
|
||||||
self.vectors_to_send.update(getdata.vectors)
|
|
||||||
|
|
||||||
elif m.command == b'addr':
|
|
||||||
addr = message.Addr.from_message(m)
|
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
|
|
||||||
for a in addr.addresses:
|
|
||||||
shared.unchecked_node_pool.add((a.host, a.port))
|
|
||||||
|
|
||||||
elif m.command == b'ping':
|
elif m.command == b'ping':
|
||||||
logging.debug('%s:%s -> ping', self.host_print, self.port)
|
logging.debug('%s:%s -> ping', self.host_print, self.port)
|
||||||
self.send_queue.put(message.Message(b'pong', b''))
|
self.send_queue.put(message.Message(b'pong', b''))
|
||||||
|
@ -424,7 +353,50 @@ class Connection(threading.Thread):
|
||||||
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
|
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
|
try:
|
||||||
|
getattr(self, '_process_msg_{}'.format(m.command.decode()))(m)
|
||||||
|
except (AttributeError, UnicodeDecodeError):
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
|
||||||
|
|
||||||
|
def _process_msg_version(self, m):
|
||||||
|
version = message.Version.from_bytes(m.to_bytes())
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
|
||||||
|
if (
|
||||||
|
version.protocol_version != shared.protocol_version
|
||||||
|
or version.nonce == shared.nonce
|
||||||
|
):
|
||||||
|
self.status = 'disconnecting'
|
||||||
|
self.send_queue.put(None)
|
||||||
|
else:
|
||||||
|
logging.info(
|
||||||
|
'%s:%s claims to be %s',
|
||||||
|
self.host_print, self.port, version.user_agent)
|
||||||
|
self.send_queue.put(message.Message(b'verack', b''))
|
||||||
|
self.verack_sent = True
|
||||||
|
self.remote_version = version
|
||||||
|
if not self.server:
|
||||||
|
self.send_queue.put('fully_established')
|
||||||
|
if self.network == 'ip':
|
||||||
|
shared.address_advertise_queue.put(structure.NetAddr(
|
||||||
|
version.services, self.host, self.port))
|
||||||
|
shared.node_pool.add((self.host, self.port))
|
||||||
|
elif self.network == 'i2p':
|
||||||
|
shared.i2p_node_pool.add((self.host, 'i2p'))
|
||||||
|
if self.network == 'ip':
|
||||||
|
shared.address_advertise_queue.put(structure.NetAddr(
|
||||||
|
shared.services, version.host, shared.listening_port))
|
||||||
|
if self.server:
|
||||||
|
if self.network == 'ip':
|
||||||
|
self.send_queue.put(message.Version(self.host, self.port))
|
||||||
|
else:
|
||||||
|
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
||||||
|
|
||||||
|
def _process_msg_addr(self, m):
|
||||||
|
addr = message.Addr.from_message(m)
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
|
||||||
|
for a in addr.addresses:
|
||||||
|
if (a.host, a.port) not in shared.core_nodes:
|
||||||
|
shared.unchecked_node_pool.add((a.host, a.port))
|
||||||
|
|
||||||
def _request_objects(self):
|
def _request_objects(self):
|
||||||
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
||||||
|
@ -483,4 +455,49 @@ class Connection(threading.Thread):
|
||||||
message.Message(b'object', obj.to_bytes()))
|
message.Message(b'object', obj.to_bytes()))
|
||||||
|
|
||||||
|
|
||||||
|
class Connection(ConnectionBase):
|
||||||
|
"""The connection with all commands implementation"""
|
||||||
|
def _process_msg_inv(self, m):
|
||||||
|
inv = message.Inv.from_message(m)
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
||||||
|
to_get = inv.vectors.copy()
|
||||||
|
to_get.difference_update(shared.objects.keys())
|
||||||
|
self.vectors_to_get.update(to_get)
|
||||||
|
# Do not send objects they already have.
|
||||||
|
self.vectors_to_send.difference_update(inv.vectors)
|
||||||
|
|
||||||
|
def _process_msg_object(self, m):
|
||||||
|
obj = structure.Object.from_message(m)
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
|
||||||
|
self.vectors_requested.pop(obj.vector, None)
|
||||||
|
self.vectors_to_get.discard(obj.vector)
|
||||||
|
if obj.is_valid() and obj.vector not in shared.objects:
|
||||||
|
with shared.objects_lock:
|
||||||
|
shared.objects[obj.vector] = obj
|
||||||
|
if (
|
||||||
|
obj.object_type == shared.i2p_dest_obj_type
|
||||||
|
and obj.version == shared.i2p_dest_obj_version
|
||||||
|
):
|
||||||
|
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
|
||||||
|
logging.debug(
|
||||||
|
'Received I2P destination object,'
|
||||||
|
' adding to i2p_unchecked_node_pool')
|
||||||
|
logging.debug(dest)
|
||||||
|
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
||||||
|
shared.vector_advertise_queue.put(obj.vector)
|
||||||
|
|
||||||
|
def _process_msg_getdata(self, m):
|
||||||
|
getdata = message.GetData.from_message(m)
|
||||||
|
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
|
||||||
|
self.vectors_to_send.update(getdata.vectors)
|
||||||
|
|
||||||
|
|
||||||
|
class Bootstrapper(ConnectionBase):
|
||||||
|
def _process_msg_addr(self, m):
|
||||||
|
super()._process_msg_addr(m)
|
||||||
|
shared.node_pool.discard((self.host, self.port))
|
||||||
|
self.status = 'disconnecting'
|
||||||
|
self.send_queue.put(None)
|
||||||
|
|
||||||
|
|
||||||
shared.connection = Connection
|
shared.connection = Connection
|
||||||
|
|
|
@ -140,8 +140,7 @@ def load_data():
|
||||||
'r', newline=''
|
'r', newline=''
|
||||||
) as src:
|
) as src:
|
||||||
reader = csv.reader(src)
|
reader = csv.reader(src)
|
||||||
shared.core_nodes = {tuple(row) for row in reader}
|
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
|
||||||
shared.node_pool.update(shared.core_nodes)
|
|
||||||
|
|
||||||
with open(
|
with open(
|
||||||
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
|
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
|
||||||
|
@ -153,18 +152,22 @@ def load_data():
|
||||||
|
|
||||||
|
|
||||||
def bootstrap_from_dns():
|
def bootstrap_from_dns():
|
||||||
"""Addes addresses of bootstrap servers to known nodes"""
|
"""Addes addresses of bootstrap servers to core nodes"""
|
||||||
try:
|
try:
|
||||||
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
for port in (8080, 8444):
|
||||||
shared.unchecked_node_pool.add((item[4][0], 8080))
|
for item in socket.getaddrinfo(
|
||||||
logging.debug(
|
'bootstrap{}.bitmessage.org'.format(port), 80,
|
||||||
'Adding %s to unchecked_node_pool'
|
proto=socket.IPPROTO_TCP
|
||||||
' based on DNS bootstrap method', item[4][0])
|
):
|
||||||
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
try:
|
||||||
shared.unchecked_node_pool.add((item[4][0], 8444))
|
addr = item[4][0]
|
||||||
logging.debug(
|
socket.inet_aton(addr)
|
||||||
'Adding %s to unchecked_node_pool'
|
except (TypeError, socket.error):
|
||||||
' based on DNS bootstrap method', item[4][0])
|
continue
|
||||||
|
else:
|
||||||
|
shared.core_nodes.add((addr, port))
|
||||||
|
except socket.gaierror:
|
||||||
|
pass
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.info('Error during DNS bootstrap', exc_info=True)
|
logging.info('Error during DNS bootstrap', exc_info=True)
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from . import proofofwork, shared, structure
|
from . import proofofwork, shared, structure
|
||||||
from .connection import Connection
|
from .connection import Bootstrapper, Connection
|
||||||
from .i2p import I2PDialer
|
from .i2p import I2PDialer
|
||||||
|
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ class Manager(threading.Thread):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(name='Manager')
|
super().__init__(name='Manager')
|
||||||
self.q = queue.Queue()
|
self.q = queue.Queue()
|
||||||
|
self.bootstrap_pool = []
|
||||||
self.last_cleaned_objects = time.time()
|
self.last_cleaned_objects = time.time()
|
||||||
self.last_cleaned_connections = time.time()
|
self.last_cleaned_connections = time.time()
|
||||||
self.last_pickled_objects = time.time()
|
self.last_pickled_objects = time.time()
|
||||||
|
@ -27,8 +28,13 @@ class Manager(threading.Thread):
|
||||||
self.last_published_i2p_destination = \
|
self.last_published_i2p_destination = \
|
||||||
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
|
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
|
||||||
|
|
||||||
|
def fill_bootstrap_pool(self):
|
||||||
|
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
|
||||||
|
random.shuffle(self.bootstrap_pool)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.clean_objects()
|
self.clean_objects()
|
||||||
|
self.fill_bootstrap_pool()
|
||||||
while True:
|
while True:
|
||||||
time.sleep(0.8)
|
time.sleep(0.8)
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
@ -66,9 +72,22 @@ class Manager(threading.Thread):
|
||||||
with shared.objects_lock:
|
with shared.objects_lock:
|
||||||
del shared.objects[vector]
|
del shared.objects[vector]
|
||||||
|
|
||||||
@staticmethod
|
def manage_connections(self):
|
||||||
def manage_connections():
|
|
||||||
hosts = set()
|
hosts = set()
|
||||||
|
|
||||||
|
def connect(target, connection_class=Connection):
|
||||||
|
c = connection_class(*target)
|
||||||
|
c.start()
|
||||||
|
hosts.add(c.host)
|
||||||
|
with shared.connections_lock:
|
||||||
|
shared.connections.add(c)
|
||||||
|
|
||||||
|
def bootstrap():
|
||||||
|
"""Bootstrap from DNS seed-nodes and known nodes"""
|
||||||
|
target = self.bootstrap_pool.pop()
|
||||||
|
logging.info('Starting a bootstrapper for %s:%s', *target)
|
||||||
|
connect(target, Bootstrapper)
|
||||||
|
|
||||||
outgoing_connections = 0
|
outgoing_connections = 0
|
||||||
for c in shared.connections.copy():
|
for c in shared.connections.copy():
|
||||||
if not c.is_alive() or c.status == 'disconnected':
|
if not c.is_alive() or c.status == 'disconnected':
|
||||||
|
@ -94,16 +113,24 @@ class Manager(threading.Thread):
|
||||||
):
|
):
|
||||||
|
|
||||||
if shared.ip_enabled:
|
if shared.ip_enabled:
|
||||||
if len(shared.unchecked_node_pool) > 16:
|
if shared.unchecked_node_pool:
|
||||||
to_connect.update(random.sample(
|
if len(shared.unchecked_node_pool) > 16:
|
||||||
shared.unchecked_node_pool, 16))
|
to_connect.update(random.sample(
|
||||||
else:
|
shared.unchecked_node_pool, 16))
|
||||||
to_connect.update(shared.unchecked_node_pool)
|
else:
|
||||||
shared.unchecked_node_pool.difference_update(to_connect)
|
to_connect.update(shared.unchecked_node_pool)
|
||||||
if len(shared.node_pool) > 8:
|
shared.unchecked_node_pool.difference_update(to_connect)
|
||||||
to_connect.update(random.sample(shared.node_pool, 8))
|
if len(shared.node_pool) > 8:
|
||||||
else:
|
to_connect.update(random.sample(shared.node_pool, 8))
|
||||||
to_connect.update(shared.node_pool)
|
else:
|
||||||
|
to_connect.update(shared.node_pool)
|
||||||
|
elif outgoing_connections < shared.outgoing_connections / 2:
|
||||||
|
try:
|
||||||
|
bootstrap()
|
||||||
|
except IndexError:
|
||||||
|
logging.warning(
|
||||||
|
'Ran out of bootstrap nodes, refilling')
|
||||||
|
self.fill_bootstrap_pool()
|
||||||
|
|
||||||
if shared.i2p_enabled:
|
if shared.i2p_enabled:
|
||||||
if len(shared.i2p_unchecked_node_pool) > 16:
|
if len(shared.i2p_unchecked_node_pool) > 16:
|
||||||
|
@ -137,11 +164,8 @@ class Manager(threading.Thread):
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
c = Connection(addr[0], addr[1])
|
connect((addr[0], addr[1]))
|
||||||
c.start()
|
|
||||||
hosts.add(c.host)
|
|
||||||
with shared.connections_lock:
|
|
||||||
shared.connections.add(c)
|
|
||||||
shared.hosts = hosts
|
shared.hosts = hosts
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
"""Tests for network connections"""
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from minode import connection, main, shared
|
||||||
|
|
||||||
|
|
||||||
|
class TestNetwork(unittest.TestCase):
|
||||||
|
"""Test case starting connections"""
|
||||||
|
|
||||||
|
def test_bootstrap(self):
|
||||||
|
"""Start bootstrappers and check node pool"""
|
||||||
|
if shared.core_nodes:
|
||||||
|
shared.core_nodes = set()
|
||||||
|
if shared.unchecked_node_pool:
|
||||||
|
shared.unchecked_node_pool = set()
|
||||||
|
|
||||||
|
main.bootstrap_from_dns()
|
||||||
|
|
||||||
|
self.assertGreater(len(shared.core_nodes), 1)
|
||||||
|
self.assertEqual(len(shared.unchecked_node_pool), 0)
|
||||||
|
|
||||||
|
for node in shared.core_nodes:
|
||||||
|
c = connection.Bootstrapper(*node)
|
||||||
|
c.start()
|
||||||
|
c.join()
|
||||||
|
if len(shared.unchecked_node_pool) > 2:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.fail(
|
||||||
|
'Failed to find at least 2 nodes'
|
||||||
|
' after running %s bootstrappers', len(shared.core_nodes))
|
Loading…
Reference in New Issue