Implement decoding and connection to onion peer:

make a separate a bit controversial class structure.OnionPeer(),
having .from_object() and .to_object() instead of .from_bytes() etc.
This commit is contained in:
Lee Miller 2023-08-20 03:00:40 +03:00
parent 7736846646
commit a54a3fe409
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
4 changed files with 98 additions and 8 deletions

View File

@ -266,6 +266,7 @@ class ConnectionBase(threading.Thread):
addr = { addr = {
structure.NetAddr(c.remote_version.services, c.host, c.port) structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p' for c in shared.connections if c.network != 'i2p'
and not c.host.endswith('.onion')
and c.server is False and c.status == 'fully_established'} and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object # pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637 # https://github.com/pylint-dev/pylint/issues/3637
@ -396,9 +397,12 @@ class ConnectionBase(threading.Thread):
if not self.server: if not self.server:
self.send_queue.put('fully_established') self.send_queue.put('fully_established')
if self.network == 'ip': if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr( if self.host.endswith('.onion'):
version.services, self.host, self.port)) shared.onion_pool.add((self.host, self.port))
shared.node_pool.add((self.host, self.port)) else:
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p': elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p')) shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip': if self.network == 'ip':
@ -502,6 +506,13 @@ class Connection(ConnectionBase):
' adding to i2p_unchecked_node_pool') ' adding to i2p_unchecked_node_pool')
logging.debug(dest) logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
elif (
obj.object_type == shared.onion_obj_type
and obj.version == shared.onion_obj_version
):
peer = structure.OnionPeer.from_object(obj)
logging.debug('Received onion peer object: %s', peer)
shared.onion_unchecked_pool.add((peer.host, peer.port))
shared.vector_advertise_queue.put(obj.vector) shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m): def _process_msg_getdata(self, m):

View File

@ -96,15 +96,29 @@ class Manager(threading.Thread):
): ):
if shared.ip_enabled: if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16: sample_length = 16
if shared.tor:
if len(shared.onion_unchecked_pool) > 4:
to_connect.update(random.sample(
tuple(shared.onion_unchecked_pool), 4))
else:
to_connect.update(shared.onion_unchecked_pool)
shared.onion_unchecked_pool.difference_update(to_connect)
if len(shared.onion_pool) > 2:
to_connect.update(random.sample(
tuple(shared.onion_pool), 2))
else:
to_connect.update(shared.onion_pool)
sample_length = 8
if len(shared.unchecked_node_pool) > sample_length:
to_connect.update(random.sample( to_connect.update(random.sample(
tuple(shared.unchecked_node_pool), 16)) tuple(shared.unchecked_node_pool), sample_length))
else: else:
to_connect.update(shared.unchecked_node_pool) to_connect.update(shared.unchecked_node_pool)
shared.unchecked_node_pool.difference_update(to_connect) shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8: if len(shared.node_pool) > sample_length / 2:
to_connect.update(random.sample( to_connect.update(random.sample(
tuple(shared.node_pool), 8)) tuple(shared.node_pool), int(sample_length / 2)))
else: else:
to_connect.update(shared.node_pool) to_connect.update(shared.node_pool)
@ -186,6 +200,17 @@ class Manager(threading.Thread):
logging.warning( logging.warning(
'Error while loading nodes from disk.', exc_info=True) 'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'br'
) as src:
shared.onion_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open( with open(
os.path.join(shared.source_directory, 'core_nodes.csv'), os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii' 'r', newline='', encoding='ascii'
@ -231,6 +256,13 @@ class Manager(threading.Thread):
shared.i2p_unchecked_node_pool = set(random.sample( shared.i2p_unchecked_node_pool = set(random.sample(
tuple(shared.i2p_unchecked_node_pool), 100)) tuple(shared.i2p_unchecked_node_pool), 100))
if len(shared.onion_pool) > 1000:
shared.onion_pool = set(
random.sample(shared.onion_pool, 1000))
if len(shared.onion_unchecked_pool) > 100:
shared.onion_unchecked_pool = set(
random.sample(shared.onion_unchecked_pool, 100))
try: try:
with open( with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'bw' os.path.join(shared.data_directory, 'nodes.pickle'), 'bw'
@ -240,7 +272,11 @@ class Manager(threading.Thread):
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw' os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw'
) as dst: ) as dst:
pickle.dump(shared.i2p_node_pool, dst, protocol=3) pickle.dump(shared.i2p_node_pool, dst, protocol=3)
logging.debug('Saved nodes') with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.onion_pool, dst, protocol=3)
logging.debug('Saved nodes')
except Exception: except Exception:
logging.warning('Error while saving nodes', exc_info=True) logging.warning('Error while saving nodes', exc_info=True)

View File

@ -26,6 +26,8 @@ timeout = 600
header_length = 24 header_length = 24
i2p_dest_obj_type = 0x493250 i2p_dest_obj_type = 0x493250
i2p_dest_obj_version = 1 i2p_dest_obj_version = 1
onion_obj_type = 0x746f72
onion_obj_version = 3
socks_proxy = None socks_proxy = None
tor = False tor = False
@ -62,6 +64,9 @@ i2p_core_nodes = set()
i2p_node_pool = set() i2p_node_pool = set()
i2p_unchecked_node_pool = set() i2p_unchecked_node_pool = set()
onion_pool = set()
onion_unchecked_pool = set()
outgoing_connections = 8 outgoing_connections = 8
connection_limit = 250 connection_limit = 250

View File

@ -1,8 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Protocol structures""" """Protocol structures"""
import base64 import base64
import binascii
import hashlib import hashlib
import logging import logging
import re
import socket import socket
import struct import struct
import time import time
@ -223,3 +225,39 @@ class NetAddr():
stream, net_addr = struct.unpack('>QI26s', b)[1:] stream, net_addr = struct.unpack('>QI26s', b)[1:]
n = NetAddrNoPrefix.from_bytes(net_addr) n = NetAddrNoPrefix.from_bytes(net_addr)
return cls(n.services, n.host, n.port, stream) return cls(n.services, n.host, n.port, stream)
class OnionPeer():
def __init__(self, host, port=8444, stream=None, dest_pub=None):
self.stream = stream or shared.stream
self.host = host
self.port = port
try:
self.dest_pub = dest_pub or base64.b32decode(
re.search(r'(.*)\.onion', host).groups()[0], True)
except (AttributeError, binascii.Error) as e:
raise ValueError('Malformed hostname') from e
def __repr__(self):
return 'onion_peer, stream: {}, host: {}, port {}'.format(
self.stream, self.host, self.port)
def to_object(self):
payload = b''
payload += VarInt(self.port).to_bytes()
payload += b'\xfd\x87\xd8\x7e\xeb\x43'
payload += self.dest_pub
return Object(
b'\x00' * 8, int(time.time() + 8 * 3600), shared.onion_obj_type,
shared.onion_obj_version, self.stream, payload)
@classmethod
def from_object(cls, obj):
payload = obj.object_payload
port_length = VarInt.length(payload[0])
port = VarInt.from_bytes(payload[:port_length]).n
if payload[port_length:port_length + 6] != b'\xfd\x87\xd8\x7e\xeb\x43':
raise ValueError('Malformed onion peer object')
dest_pub = payload[port_length + 6:]
host = base64.b32encode(dest_pub).lower().decode() + '.onion'
return cls(host, port, obj.stream_number, dest_pub)