Compare commits

...

17 Commits
v0.3 ... tor

Author SHA1 Message Date
Lee Miller 2712d21214
Bump version to 0.3.4 2023-10-17 06:05:21 +03:00
Lee Miller 750cfdf4cc
Test process with --tor, set env variable HOME for tor 2023-10-17 06:05:21 +03:00
Lee Miller 377010c9b7
Update command line dump in README 2023-10-17 06:05:21 +03:00
Lee Miller d3d05d0084
Complete help string on the --tor arg 2023-10-17 06:05:21 +03:00
Lee Miller d3a9f128a9
Skip TLS also for incoming connections using tor 2023-10-17 06:05:21 +03:00
Lee Miller 8f83634068
Add a stem requirement for testing and a new extra - 'tor' 2023-10-17 06:05:20 +03:00
Lee Miller 3d52b5488d
A rough implementation of onion service based on pybitmessage plugin 2023-10-17 06:05:20 +03:00
Lee Miller 6c432d38ac
A test for encoding and decoding of onion peer object 2023-10-17 06:05:20 +03:00
Lee Miller f8e0eefb39
Parse socks_proxy arg with urllib.parse and support more parameters 2023-10-17 06:05:00 +03:00
Lee Miller 7b62a7b14a
Resolve the conflict between socks proxy and i2p
by preventing their simultaneous use.
2023-10-17 06:04:50 +03:00
Lee Miller b26cf7322d
Reduce logging verbosity for socks.GeneralProxyError 2023-10-17 06:04:50 +03:00
Lee Miller 4c63a1be40
Install and start tor in buildbot 2023-10-17 06:04:50 +03:00
Lee Miller 8fc40a9d7d
Add a simple blind test for process running with --socks-proxy 2023-10-17 06:04:50 +03:00
Lee Miller dadde74d24
Add an extra 'proxy' with a requirement of PySocks 2023-10-17 06:04:50 +03:00
Lee Miller cee80ca34e
Do not start TLS in onion connections 2023-10-17 06:04:50 +03:00
Lee Miller a1427290c1
Implement decoding and connection to onion peer:
make a separate a bit controversial class structure.OnionPeer(),
having .from_object() and .to_object() instead of .from_bytes() etc.
2023-10-17 06:04:23 +03:00
Lee Miller 8fd34d879d
A minimal implementation of proxy for outgoing connections using PySocks,
special arg --tor currently just sets host and port for the socks_proxy.
2023-10-17 05:41:33 +03:00
14 changed files with 406 additions and 23 deletions

View File

@ -8,7 +8,8 @@ RUN apt-add-repository ppa:purplei2p/i2pd
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv \
sudo i2pd tor
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers

View File

@ -1,3 +1,4 @@
#!/bin/sh
sudo service i2pd start
sudo service tor start

View File

@ -30,7 +30,7 @@ usage: main.py [-h] [-p PORT] [--host HOST] [--debug] [--data-dir DATA_DIR]
[--connection-limit CONNECTION_LIMIT] [--i2p]
[--i2p-tunnel-length I2P_TUNNEL_LENGTH]
[--i2p-sam-host I2P_SAM_HOST] [--i2p-sam-port I2P_SAM_PORT]
[--i2p-transient]
[--i2p-transient] [--socks-proxy SOCKS_PROXY] [--tor]
optional arguments:
-h, --help show this help message and exit
@ -53,6 +53,10 @@ optional arguments:
--i2p-sam-port I2P_SAM_PORT
Port of I2P SAMv3 bridge
--i2p-transient Generate new I2P destination on start
--socks-proxy SOCKS_PROXY
SOCKS proxy address in the form <HOST>:<PORT>
--tor The SOCKS proxy is tor, use 127.0.0.1:9050 if not
specified, start tor and setup a hidden service
```

View File

@ -5,6 +5,7 @@ import errno
import logging
import math
import random
import re
import select
import socket
import ssl
@ -75,7 +76,11 @@ class Connection(threading.Thread):
self.s.settimeout(0)
if not self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
version_kwargs = (
{'services': 1} if self.host.endswith('.onion') else {})
self.send_queue.put(message.Version(
('127.0.0.1' if shared.socks_proxy else self.host),
self.port, **version_kwargs))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
while True:
@ -150,15 +155,26 @@ class Connection(threading.Thread):
peer_str = '{0.host_print}:{0.port}'.format(self)
logging.debug('Connecting to %s', peer_str)
timeout = 30 if shared.tor else 10
try:
self.s = socket.create_connection((self.host, self.port), 10)
self.s = socket.create_connection((self.host, self.port), timeout)
self.status = 'connected'
logging.debug('Established TCP connection to %s', peer_str)
except socket.timeout:
pass
except OSError as e:
try: # possible socks.GeneralProxyError
e = e.socket_err
if isinstance(e, socket.timeout) or (
# general failure, unreachable, refused
not e.errno and re.match(r'^0x0[1,4,5].*', e.msg)
):
e.errno = 0
except AttributeError:
pass
# unreachable, refused, no route
(logging.info if e.errno not in (101, 111, 113)
(logging.info if e.errno not in (0, 101, 111, 113)
else logging.debug)(
'Connection to %s failed. Reason: %s', peer_str, e)
except Exception:
@ -244,12 +260,17 @@ class Connection(threading.Thread):
'Established Bitmessage protocol connection to %s:%s',
self.host_print, self.port)
self.on_connection_fully_established_scheduled = False
if self.remote_version.services & 2 and self.network == 'ip':
self._do_tls_handshake() # NODE_SSL
if ( # NODE_SSL
self.remote_version.services & 2 and self.network == 'ip'
and not self.host.endswith('.onion')
and not (self.server and shared.tor)
):
self._do_tls_handshake()
addr = {
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and not c.host.endswith('.onion')
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
@ -355,9 +376,14 @@ class Connection(threading.Thread):
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
if self.host.endswith('.onion'):
shared.onion_pool.add(
(self.host, self.port))
else:
shared.node_pool.add((self.host, self.port))
shared.address_advertise_queue.put(
structure.NetAddr(
version.services, self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
@ -365,8 +391,10 @@ class Connection(threading.Thread):
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(
message.Version(self.host, self.port))
version_kwargs = (
{'services': 1} if shared.tor else {})
self.send_queue.put(message.Version(
self.host, self.port, **version_kwargs))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
@ -404,6 +432,13 @@ class Connection(threading.Thread):
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
elif (
obj.object_type == shared.onion_obj_type
and obj.version == shared.onion_obj_version
):
peer = structure.OnionPeer.from_object(obj)
logging.debug('Received onion peer object: %s', peer)
shared.onion_unchecked_pool.add((peer.host, peer.port))
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':

View File

@ -5,8 +5,15 @@ import base64
import logging
import multiprocessing
import os
import re
import signal
import socket
from urllib import parse
try:
import socks
except ImportError:
socks = None
from . import i2p, shared
from .advertiser import Advertiser
@ -52,6 +59,16 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
'--i2p-transient', action='store_true',
help='Generate new I2P destination on start')
if socks is not None:
parser.add_argument(
'--socks-proxy',
help='SOCKS proxy address in the form <HOST>:<PORT>')
parser.add_argument(
'--tor', action='store_true',
help='The SOCKS proxy is tor, use 127.0.0.1:9050 if not specified,'
' start tor and setup a hidden service'
)
args = parser.parse_args()
if args.port:
shared.listening_port = args.port
@ -71,7 +88,8 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
if args.no_ip:
shared.ip_enabled = False
if args.trusted_peer:
if len(args.trusted_peer) > 50:
if len(args.trusted_peer
) > 50 and not args.trusted_peer.endswith('onion'):
# I2P
shared.trusted_peer = (args.trusted_peer.encode(), 'i2p')
else:
@ -98,6 +116,14 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
shared.i2p_sam_port = args.i2p_sam_port
if args.i2p_transient:
shared.i2p_transient = True
if args.tor:
shared.tor = True
if not args.socks_proxy:
args.socks_proxy = '127.0.0.1:9050'
if args.socks_proxy:
if not re.match(r'^.*://', args.socks_proxy):
args.socks_proxy = '//' + args.socks_proxy
shared.socks_proxy = parse.urlparse(args.socks_proxy, scheme='socks5')
def bootstrap_from_dns():
@ -238,6 +264,44 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
if shared.socks_proxy:
if shared.i2p_enabled:
logging.error(
'Unfortunately you cannot use both I2P and SOCKS proxy.')
return
try:
proxy_type = socks.PROXY_TYPES[shared.socks_proxy.scheme.upper()]
except KeyError:
logging.error('Unsupported proxy schema!')
return
# FIXME: the Connection() code would be too complex
# without this monkeypatching
def create_connection(
dst_pair, timeout=None, source_address=None,
proxy_type=proxy_type,
proxy_addr=shared.socks_proxy.hostname,
proxy_port=shared.socks_proxy.port, proxy_rdns=True,
proxy_username=shared.socks_proxy.username,
proxy_password=shared.socks_proxy.password,
socket_options=None
):
return socks.create_connection(
dst_pair, timeout, source_address, proxy_type, proxy_addr,
proxy_port, proxy_rdns, proxy_username, proxy_password,
socket_options)
socket.create_connection = create_connection
if shared.tor:
try:
from . import tor
except ImportError:
logging.info('Failed to import tor module.', exc_info=True)
else:
if not tor.start_tor_service():
logging.warning('Failed to start tor service.')
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()

View File

@ -25,7 +25,7 @@ class Manager(threading.Thread):
self.last_pickled_objects = time.time()
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
self.last_published_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def run(self):
@ -49,9 +49,10 @@ class Manager(threading.Thread):
if now - self.last_pickled_nodes > 60:
self.pickle_nodes()
self.last_pickled_nodes = now
if now - self.last_published_i2p_destination > 3600:
if now - self.last_published_destination > 3600:
self.publish_i2p_destination()
self.last_published_i2p_destination = now
self.publish_onion_peer()
self.last_published_destination = now
@staticmethod
def clean_objects():
@ -96,14 +97,28 @@ class Manager(threading.Thread):
):
if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16:
sample_length = 16
if shared.tor:
if len(shared.onion_unchecked_pool) > 4:
to_connect.update(random.sample(
shared.onion_unchecked_pool, 4))
else:
to_connect.update(shared.onion_unchecked_pool)
shared.onion_unchecked_pool.difference_update(to_connect)
if len(shared.onion_pool) > 2:
to_connect.update(random.sample(shared.onion_pool, 2))
else:
to_connect.update(shared.onion_pool)
sample_length = 8
if len(shared.unchecked_node_pool) > sample_length:
to_connect.update(random.sample(
shared.unchecked_node_pool, 16))
shared.unchecked_node_pool, sample_length))
else:
to_connect.update(shared.unchecked_node_pool)
shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8:
to_connect.update(random.sample(shared.node_pool, 8))
if len(shared.node_pool) > sample_length / 2:
to_connect.update(random.sample(
shared.node_pool, int(sample_length / 2)))
else:
to_connect.update(shared.node_pool)
@ -182,12 +197,23 @@ class Manager(threading.Thread):
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'br'
) as src:
shared.onion_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
@ -226,6 +252,13 @@ class Manager(threading.Thread):
shared.i2p_unchecked_node_pool = set(
random.sample(shared.i2p_unchecked_node_pool, 100))
if len(shared.onion_pool) > 1000:
shared.onion_pool = set(
random.sample(shared.onion_pool, 1000))
if len(shared.onion_unchecked_pool) > 100:
shared.onion_unchecked_pool = set(
random.sample(shared.onion_unchecked_pool, 100))
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'bw'
@ -235,7 +268,11 @@ class Manager(threading.Thread):
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.i2p_node_pool, dst, protocol=3)
logging.debug('Saved nodes')
with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.onion_pool, dst, protocol=3)
logging.debug('Saved nodes')
except Exception:
logging.warning('Error while saving nodes', exc_info=True)
@ -250,3 +287,11 @@ class Manager(threading.Thread):
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)
@staticmethod
def publish_onion_peer():
if shared.tor:
logging.info('Publishing our onion peer')
obj = structure.OnionPeer(
shared.onion_hostname, shared.listening_port).to_object()
proofofwork.do_pow_and_publish(obj)

View File

@ -21,11 +21,17 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.2/'
user_agent = b'/MiNode:0.3.4/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250
i2p_dest_obj_version = 1
onion_obj_type = 0x746f72
onion_obj_version = 3
socks_proxy = None
tor = False
onion_hostname = ''
i2p_enabled = False
i2p_transient = False
@ -59,6 +65,9 @@ i2p_core_nodes = set()
i2p_node_pool = set()
i2p_unchecked_node_pool = set()
onion_pool = set()
onion_unchecked_pool = set()
outgoing_connections = 8
connection_limit = 250

View File

@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
"""Protocol structures"""
import base64
import binascii
import hashlib
import logging
import re
import socket
import struct
import time
@ -208,3 +210,39 @@ class NetAddr():
stream, net_addr = struct.unpack('>QI26s', b)[1:]
n = NetAddrNoPrefix.from_bytes(net_addr)
return cls(n.services, n.host, n.port, stream)
class OnionPeer():
def __init__(self, host, port=8444, stream=None, dest_pub=None):
self.stream = stream or shared.stream
self.host = host
self.port = port
try:
self.dest_pub = dest_pub or base64.b32decode(
re.search(r'(.*)\.onion', host).groups()[0], True)
except (AttributeError, binascii.Error) as e:
raise ValueError('Malformed hostname') from e
def __repr__(self):
return 'onion_peer, stream: {}, host: {}, port {}'.format(
self.stream, self.host, self.port)
def to_object(self):
payload = b''
payload += VarInt(self.port).to_bytes()
payload += b'\xfd\x87\xd8\x7e\xeb\x43'
payload += self.dest_pub
return Object(
b'\x00' * 8, int(time.time() + 8 * 3600), shared.onion_obj_type,
shared.onion_obj_version, self.stream, payload)
@classmethod
def from_object(cls, obj):
payload = obj.object_payload
port_length = VarInt.length(payload[0])
port = VarInt.from_bytes(payload[:port_length]).n
if payload[port_length:port_length + 6] != b'\xfd\x87\xd8\x7e\xeb\x43':
raise ValueError('Malformed onion peer object')
dest_pub = payload[port_length + 6:]
host = base64.b32encode(dest_pub).lower().decode() + '.onion'
return cls(host, port, obj.stream_number, dest_pub)

View File

@ -14,6 +14,11 @@ try:
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
try:
socket.socket().bind(('127.0.0.1', 9050))
tor_port_free = True
except (OSError, socket.error):
tor_port_free = False
class TestProcessProto(unittest.TestCase):
@ -144,3 +149,21 @@ class TestProcessI2P(TestProcess):
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
@unittest.skipIf(tor_port_free, 'No running tor detected')
class TestProcessTor(TestProcessProto):
"""A test case for minode process running with tor enabled"""
_process_cmd = ['minode', '--tor']
_wait_time = 60
def test_connections(self):
"""Check minode process connections"""
for _ in range(self._wait_time):
time.sleep(0.5)
connections = self.connections()
for c in connections:
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 9050)
if len(connections) > self._connection_limit / 2:
break

View File

@ -23,6 +23,9 @@ sample_addr_data = unhexlify(
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
sample_onion_host = \
'bmtestlmgmvpbsg7kzmrxu47chs3cdou2tj4t5iloocgujzsf3e7rbqd.onion'
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
@ -171,3 +174,21 @@ class TestStructure(unittest.TestCase):
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())
def test_onion_peer(self):
"""Make an onion peer object and decode it back"""
with self.assertRaises(ValueError):
onion_peer = structure.OnionPeer('testing2')
with self.assertRaises(ValueError):
onion_peer = structure.OnionPeer('testing.onion')
onion_peer = structure.OnionPeer(sample_onion_host)
self.assertEqual(onion_peer.stream, shared.stream)
obj = onion_peer.to_object()
self.assertEqual(obj.object_type, shared.onion_obj_type)
self.assertEqual(obj.version, shared.onion_obj_version)
decoded = structure.OnionPeer.from_object(obj)
self.assertEqual(decoded.dest_pub, onion_peer.dest_pub)
self.assertEqual(decoded.port, onion_peer.port)
obj.object_payload = obj.object_payload[0:1] + obj.object_payload[2:]
with self.assertRaises(ValueError):
structure.OnionPeer.from_object(obj)

137
minode/tor.py Normal file
View File

@ -0,0 +1,137 @@
"""Tor specific procedures"""
import logging
import os
import stat
import random
import tempfile
import stem
import stem.control
import stem.process
import stem.util
import stem.version
from . import shared
def logwrite(line):
"""A simple log writing handler for tor messages"""
try:
level, line = line.split('[', 1)[1].split(']', 1)
except (IndexError, ValueError):
logging.warning(line)
else:
if level in ('err', 'warn'):
logging.info('(tor)%s', line)
def start_tor_service():
"""Start own tor instance and configure a hidden service"""
try:
socket_dir = os.path.join(shared.data_directory, 'tor')
os.makedirs(socket_dir, exist_ok=True)
except OSError:
try:
socket_dir = tempfile.mkdtemp()
except OSError:
logging.info('Failed to create a temp dir.')
return
try:
present_permissions = os.stat(socket_dir)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
allowed_permissions = ((1 << 32) - 1) ^ disallowed_permissions
os.chmod(socket_dir, allowed_permissions & present_permissions)
except OSError:
logging.debug('Failed to set dir permissions.')
return
stem.util.log.get_logger().setLevel(logging.WARNING)
control_socket = os.path.abspath(os.path.join(socket_dir, 'tor_control'))
tor_config = {
'SocksPort': str(shared.socks_proxy.port),
'ControlSocket': control_socket}
for attempt in range(50):
if attempt > 0:
port = random.randint(32767, 65535) # nosec B311
tor_config['SocksPort'] = str(port)
try:
stem.process.launch_tor_with_config(
tor_config, take_ownership=True, timeout=20,
init_msg_handler=logwrite)
except OSError:
if not attempt:
try:
stem.version.get_system_tor_version()
except IOError:
return
continue
else:
logging.info('Started tor on port %s', port)
break
else:
logging.debug('Failed to start tor.')
return
try:
controller = stem.control.Controller.from_socket_file(control_socket)
controller.authenticate()
except stem.SocketError:
logging.debug('Failed to instantiate or authenticate on controller.')
return
onionkey = onionkeytype = None
try:
with open(
os.path.join(shared.data_directory, 'onion_dest_priv.key'),
'r', encoding='ascii'
) as src:
onionkey = src.read()
logging.debug('Loaded onion service private key.')
onionkeytype = 'ED25519-V3'
except FileNotFoundError:
pass
except Exception:
logging.info(
'Error while loading onion service private key.', exc_info=True)
response = controller.create_ephemeral_hidden_service(
shared.listening_port, key_type=onionkeytype or 'NEW',
key_content=onionkey or 'BEST'
)
if not response.is_ok():
logging.info('Bad response from controller ):')
return
shared.onion_hostname = '{}.onion'.format(response.service_id)
logging.info('Started hidden service %s', shared.onion_hostname)
if onionkey:
return True
try:
with open(
os.path.join(shared.data_directory, 'onion_dest_priv.key'),
'w', encoding='ascii'
) as src:
src.write(response.private_key)
logging.debug('Saved onion service private key.')
except Exception:
logging.warning(
'Error while saving onion service private key.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'onion_dest.pub'),
'w', encoding='ascii'
) as src:
src.write(response.service_id)
logging.debug('Saved onion service public key.')
except Exception:
logging.warning(
'Error while saving onion service public key.', exc_info=True)
return True

View File

@ -1,2 +1,4 @@
coverage
psutil
PySocks
stem

View File

@ -24,6 +24,7 @@ setup(
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
entry_points={'console_scripts': ['minode = minode.main:main']},
extras_require={'proxy': ['PySocks'], 'tor': ['PySocks', 'stem']},
classifiers=[
"License :: OSI Approved :: MIT License"
"Operating System :: OS Independent",

View File

@ -3,6 +3,8 @@ envlist = reset,py{36,37,38,39,310},stats
skip_missing_interpreters = true
[testenv]
setenv =
HOME = {envtmpdir}
deps = -rrequirements.txt
commands =
coverage run -a -m tests