Compare commits

...

43 Commits

Author SHA1 Message Date
4daf047d51
Merge branch 'tor' into testing
All checks were successful
Testing / default (push) Successful in 6m55s
2025-02-21 06:10:56 +02:00
c8510557f2
Merge branch 'shutdown' into testing
All checks were successful
Testing / default (push) Successful in 6m22s
2025-02-21 04:16:40 +02:00
fb243dd8e9
Trying to support the PoW interruption 2025-02-21 03:39:46 +02:00
5f0ab21268
Add a test for PoW interrruption, make a separate test case for PoW 2025-02-21 03:39:29 +02:00
9732f0c7f5
Handle pylint too-many-branches in main.main() 2025-02-21 03:30:38 +02:00
fe6a6af1db
Started a test case for the tor service 2025-02-21 03:30:31 +02:00
56e92a747a
Merge branch 'bootstrap' into testing
All checks were successful
Testing / default (push) Successful in 7m57s
2025-02-18 08:07:49 +02:00
cb239c7d68
Bump version to 0.3.4 2025-02-18 05:25:05 +02:00
ec4e24185a
Don't advertise own peer if run with --no-incoming or reported IP is 127.0.0.1 2025-02-18 05:25:04 +02:00
d17de19aca
Improve the logic of starting tor:
set missing initial port, exit on failure,
increase timeout to 90 reducing number of attempts to 20.
2025-02-18 05:24:55 +02:00
c5a1310083
Conform tor setup to the restrictions for incoming and outgoing connections;
don't start the tor service with --no-incoming if there is a system one.
2025-02-18 05:23:07 +02:00
a0537a9e66
Don't try to resolve DNS seeds if tor is enabled 2025-02-18 05:06:14 +02:00
b1749c368c
Test process with --tor, set env variable HOME for tor 2025-02-18 05:06:13 +02:00
651a4f9681
Update command line dump in README 2025-02-18 05:06:13 +02:00
ff0df70388
Complete help string on the --tor arg 2025-02-18 05:06:13 +02:00
947607937c
Skip TLS also for incoming connections using tor 2025-02-18 05:06:12 +02:00
b465ddff85
Add a stem requirement for testing and a new extra - 'tor' 2025-02-18 05:06:12 +02:00
d6de7c8d1e
A rough implementation of onion service based on pybitmessage plugin;
publishing interval for onionpeer object is a half of its TTL,
as for I2P destination.
2025-02-18 05:02:11 +02:00
59fcd9eb2b
A test for encoding and decoding of onion peer object 2025-02-18 04:39:29 +02:00
1400486b22
Parse socks_proxy arg with urllib.parse and support more parameters 2025-02-18 04:39:29 +02:00
fffb5e6052
Install and start tor in buildbot 2025-02-18 04:39:29 +02:00
cd6f82bc2a
Add a simple blind test for process running with --socks-proxy 2025-02-18 04:39:28 +02:00
63700885a0
Add an extra 'proxy' with a requirement of PySocks 2025-02-18 04:39:28 +02:00
a10a905407
Do not start TLS in onion connections 2025-02-18 04:39:28 +02:00
bdfd39a163
Implement decoding and connection to onion peer:
make a separate a bit controversial class structure.OnionPeer(),
having .from_object() and .to_object() instead of .from_bytes() etc.
2025-02-18 04:39:14 +02:00
452fe8d5f1
A minimal implementation of proxy for outgoing connections using PySocks,
special arg --tor currently just sets host and port for the socks_proxy.
2025-02-18 04:39:07 +02:00
25cb745f05
Merge branch 'protocol' into testing 2025-02-18 03:58:42 +02:00
27e72d2027
Add a minimal sanity check for addresses in the received addr message 2025-02-18 03:52:19 +02:00
2b312c4255
Lower timestamps in the addr message by a half of PyBitmessage ADDRESS_ALIVE 2025-02-18 03:52:11 +02:00
b83262fe66
Merge branch 'memory' into testing 2025-02-18 03:17:04 +02:00
1caf3288b2
Merge branch 'network-nonce' into testing 2025-02-18 03:08:04 +02:00
fa50a844ad
Discard the disconnected node from unchecked_node_pool 2025-02-18 03:07:47 +02:00
b9e5b07e1b
Generate new nonce for the I2P connections and improve nonce handling:
- make tuple (host, port) a key for shared.nonce_pool
    to differentiate incoming tor connections,
  - reserve 127.0.0.1, 8448 for self,
  - convert nonce to string for logging.
2025-02-18 03:07:46 +02:00
d1b0f06ac1
Log also nonce when processing the version message 2025-02-18 03:07:46 +02:00
2147a6a716
Don't connect to nodes with the same nonce 2025-02-18 03:07:46 +02:00
21fe906ac3
Rewrite duplicate connection handling in i2p.listener, correct except clause 2025-02-18 03:07:04 +02:00
2f4cb203c8
Merge branch 'network' into testing 2025-02-18 03:05:00 +02:00
fd5c2c803d
Update request time in vectors_requested to not re-request too often 2025-02-18 03:04:24 +02:00
78e14c1e62
Discard the bootstrapping target from node pool before connecting 2025-02-14 05:58:23 +02:00
13d1a94ddf
Addressed pylint too-many-positional-arguments in connection,
reduced the number of instance attributes.
2025-02-14 05:33:04 +02:00
16c8c412bf
Addressed pylint too-many-positional-arguments in structure.Object 2025-02-14 05:33:04 +02:00
5ee3eec0ae
Addressed pylint too-many-positional-arguments in i2p,
simplified I2PDialer instantiation.
2025-02-14 05:33:03 +02:00
4b38debc35
Addressed pylint too-many-positional-arguments in message.Version 2025-02-14 05:32:57 +02:00
21 changed files with 692 additions and 134 deletions

View File

@ -9,7 +9,8 @@ RUN apt-add-repository ppa:purplei2p/i2pd && apt-get update -qq
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python-is-python3 python3.11-dev python3.11-venv
RUN apt-get install -yq --no-install-suggests --no-install-recommends sudo i2pd
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
sudo i2pd tor
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers

View File

@ -1,3 +1,4 @@
#!/bin/sh
sudo service i2pd start
sudo service tor start

View File

@ -30,7 +30,7 @@ usage: main.py [-h] [-p PORT] [--host HOST] [--debug] [--data-dir DATA_DIR]
[--connection-limit CONNECTION_LIMIT] [--i2p]
[--i2p-tunnel-length I2P_TUNNEL_LENGTH]
[--i2p-sam-host I2P_SAM_HOST] [--i2p-sam-port I2P_SAM_PORT]
[--i2p-transient]
[--i2p-transient] [--socks-proxy SOCKS_PROXY] [--tor]
optional arguments:
-h, --help show this help message and exit
@ -53,6 +53,10 @@ optional arguments:
--i2p-sam-port I2P_SAM_PORT
Port of I2P SAMv3 bridge
--i2p-transient Generate new I2P destination on start
--socks-proxy SOCKS_PROXY
SOCKS proxy address in the form <HOST>:<PORT>
--tor The SOCKS proxy is tor, use 127.0.0.1:9050 if not
specified, start tor and setup a hidden service
```

View File

@ -4,7 +4,9 @@ import base64
import errno
import logging
import math
import os
import random
import re
import select
import socket
import ssl
@ -20,22 +22,19 @@ class ConnectionBase(threading.Thread):
Common code for the connection thread
with minimum command handlers to reuse
"""
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
):
def __init__(self, host, port, s=None, server=False):
self.host = host
self.port = port
self.network = network
self.i2p_remote_dest = i2p_remote_dest
self.network = 'i2p' if port == 'i2p' else 'ip'
if self.network == 'i2p':
self.host_print = self.i2p_remote_dest[:8].decode()
else:
self.host_print = self.host
self.host_print = (
self.host[:8].decode() if self.network == 'i2p' else self.host)
super().__init__(name='Connection to {}:{}'.format(host, port))
self.s = s
self.server = server
self.send_queue = queue.Queue()
self.vectors_to_get = set()
@ -43,22 +42,13 @@ class ConnectionBase(threading.Thread):
self.vectors_requested = {}
self.status = 'ready'
self.tls = False
self.status = 'connected' if bool(s) else 'ready'
self.verack_received = False
self.verack_sent = False
self.s = s
self.remote_version = None
self.server = server
if bool(s):
self.status = 'connected'
self.buffer_receive = b''
self.buffer_send = b''
@ -78,9 +68,14 @@ class ConnectionBase(threading.Thread):
self.s.settimeout(0)
if not self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
version_kwargs = (
{'services': 1} if self.host.endswith('.onion') else {})
self.send_queue.put(message.Version(
('127.0.0.1' if shared.socks_proxy else self.host),
self.port, **version_kwargs))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
self.send_queue.put(message.Version(
'127.0.0.1', 7656, nonce=self._get_nonce()))
while True:
if (
self.on_connection_fully_established_scheduled
@ -149,6 +144,14 @@ class ConnectionBase(threading.Thread):
break
time.sleep(0.2)
def _get_nonce(self):
nonce = shared.nonce_pool.get(('127.0.0.1', 8448))
if nonce is None:
nonce = os.urandom(8)
shared.nonce_pool[('127.0.0.1', 8448)] = nonce
return nonce
def _connect(self):
peer_str = '{0.host_print}:{0.port}'.format(self)
logging.debug('Connecting to %s', peer_str)
@ -238,7 +241,7 @@ class ConnectionBase(threading.Thread):
logging.debug('ssl.SSLError reason: %s', e.reason)
shared.node_pool.discard((self.host, self.port))
return
self.tls = True
logging.debug(
'Established TLS connection with %s:%s (%s)',
self.host_print, self.port, self.s.version())
@ -257,12 +260,17 @@ class ConnectionBase(threading.Thread):
'Established Bitmessage protocol connection to %s:%s',
self.host_print, self.port)
self.on_connection_fully_established_scheduled = False
if self.remote_version.services & 2 and self.network == 'ip':
self._do_tls_handshake() # NODE_SSL
if ( # NODE_SSL
self.remote_version.services & 2 and self.network == 'ip'
and not self.host.endswith('.onion')
and not (self.server and shared.tor)
):
self._do_tls_handshake()
addr = {
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and not c.host.endswith('.onion')
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
@ -377,40 +385,58 @@ class ConnectionBase(threading.Thread):
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
nonce_print = base64.b16encode(version.nonce).decode()
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
or version.nonce in shared.nonce_pool.values()
):
logging.warning(
'Disconnecting v%s node %s with nonce %s',
version.protocol_version, self.host_print, nonce_print)
shared.unchecked_node_pool.discard((self.host, self.port))
self.status = 'disconnecting'
self.send_queue.put(None)
else:
shared.nonce_pool[(self.host, self.port)] = version.nonce
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
'%s:%s claims to be %s (%s)',
self.host_print, self.port, version.user_agent, nonce_print)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
if self.host.endswith('.onion'):
shared.onion_pool.add((self.host, self.port))
else:
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
if (
self.network == 'ip' and shared.listen_for_connections
and version.host != '127.0.0.1'
):
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
version_kwargs = {'services': 1} if shared.tor else {}
self.send_queue.put(message.Version(
self.host, self.port, **version_kwargs))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
self.send_queue.put(message.Version(
'127.0.0.1', 7656, nonce=self._get_nonce()))
def _process_msg_addr(self, m):
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
if not a.host or a.port == 0:
continue
if (a.host, a.port) not in shared.core_nodes:
shared.unchecked_node_pool.add((a.host, a.port))
@ -432,13 +458,13 @@ class ConnectionBase(threading.Thread):
self.send_queue.put(message.GetData(pack))
self.vectors_requested.update({
vector: time.time() for vector in pack
if vector not in self.vectors_requested})
})
self.vectors_to_get.difference_update(pack)
else:
self.send_queue.put(message.GetData(self.vectors_to_get))
self.vectors_requested.update({
vector: time.time() for vector in self.vectors_to_get
if vector not in self.vectors_requested})
})
self.vectors_to_get.clear()
if self.vectors_requested:
self.vectors_requested = {
@ -500,6 +526,13 @@ class Connection(ConnectionBase):
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
elif (
obj.object_type == shared.onion_obj_type
and obj.version == shared.onion_obj_version
):
peer = structure.OnionPeer.from_object(obj)
logging.debug('Received onion peer object: %s', peer)
shared.onion_unchecked_pool.add((peer.host, peer.port))
shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m):
@ -517,4 +550,46 @@ class Bootstrapper(ConnectionBase):
self.send_queue.put(None)
class SocksConnection(Connection):
"""The socks proxied connection"""
def _connect(self):
peer_str = '{0.host_print}:{0.port}'.format(self)
logging.debug('Connecting to %s', peer_str)
import socks # pylint: disable=import-outside-toplevel
proxy_type = socks.PROXY_TYPES[shared.socks_proxy.scheme.upper()]
try:
self.s = socks.create_connection(
(self.host, self.port), 30, None, proxy_type,
shared.socks_proxy.hostname, shared.socks_proxy.port, True,
shared.socks_proxy.username, shared.socks_proxy.password, None)
self.status = 'connected'
logging.debug('Established SOCKS connection to %s', peer_str)
except socket.timeout:
pass
except socks.GeneralProxyError as e:
e = e.socket_err
if isinstance(e, socket.timeout) or (
# general failure, unreachable, refused
not e.errno and re.match(r'^0x0[1,4,5].*', e.msg)
):
logcall = logging.debug
else:
logcall = logging.info
logcall('Connection to %s failed. Reason: %s', peer_str, e)
except OSError as e:
# unreachable, refused, no route
(logging.info if e.errno not in (0, 101, 111, 113)
else logging.debug)(
'Connection to %s failed. Reason: %s', peer_str, e)
except Exception:
logging.info(
'Connection to %s failed.', peer_str, exc_info=True)
if self.status != 'connected':
self.status = 'failed'
shared.connection = Connection

View File

@ -7,14 +7,15 @@ from .util import I2PThread
class I2PDialer(I2PThread):
def __init__(
self, state, destination, nick, sam_host='127.0.0.1', sam_port=7656
self, state, destination, nick=None, *, sam_host=None, sam_port=None
):
self.sam_host = sam_host
self.sam_port = sam_port
# Initially 127.0.0.1:7656
self.sam_host = sam_host or state.i2p_sam_host
self.sam_port = sam_port or state.i2p_sam_port
self.nick = nick
self.destination = destination
self.nick = nick or state.i2p_session_nick
super().__init__(state, name='I2P Dial to {}'.format(self.destination))
@ -27,9 +28,7 @@ class I2PDialer(I2PThread):
logging.debug('Connecting to %s', self.destination)
self._connect()
if not self.state.shutting_down and self.success:
c = self.state.connection(
self.destination, 'i2p', self.s, 'i2p',
False, self.destination)
c = self.state.connection(self.destination, 'i2p', self.s, False)
c.start()
self.state.connections.add(c)

View File

@ -15,8 +15,6 @@ class I2PListener(I2PThread):
self.version_reply = []
self.new_socket()
def new_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
@ -31,26 +29,31 @@ class I2PListener(I2PThread):
def run(self):
while not self.state.shutting_down:
self.new_socket()
duplicate = False
try:
destination = self._receive_line().split()[0]
logging.info(
'Incoming I2P connection from: %s', destination.decode())
hosts = set()
for c in self.state.connections.copy():
hosts.add(c.host)
for d in self.state.i2p_dialers.copy():
hosts.add(d.destination)
if destination in hosts:
logging.debug('Rejecting duplicate I2P connection.')
self.s.close()
else:
c = self.state.connection(
destination, 'i2p', self.s, 'i2p', True, destination)
c.start()
self.state.connections.add(c)
c = None
self.new_socket()
except socket.timeout:
pass
continue
for c in self.state.connections.copy():
if c.host == destination:
duplicate = True
break
else:
for d in self.state.i2p_dialers.copy():
if d.destination == destination:
duplicate = True
break
if duplicate:
logging.info('Rejecting duplicate I2P connection.')
self.s.close()
else:
c = self.state.connection(destination, 'i2p', self.s, True)
c.start()
self.state.connections.add(c)
c = None
logging.debug('Shutting down I2P Listener')

View File

@ -5,8 +5,15 @@ import base64
import logging
import multiprocessing
import os
import re
import signal
import socket
from urllib import parse
try:
import socks
except ImportError:
socks = None
from . import i2p, shared
from .advertiser import Advertiser
@ -52,6 +59,16 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
'--i2p-transient', action='store_true',
help='Generate new I2P destination on start')
if socks is not None:
parser.add_argument(
'--socks-proxy',
help='SOCKS proxy address in the form <HOST>:<PORT>')
parser.add_argument(
'--tor', action='store_true',
help='The SOCKS proxy is tor, use 127.0.0.1:9050 if not specified,'
' start tor and setup a hidden service'
)
args = parser.parse_args()
if args.port:
shared.listening_port = args.port
@ -71,7 +88,8 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
if args.no_ip:
shared.ip_enabled = False
if args.trusted_peer:
if len(args.trusted_peer) > 50:
if len(args.trusted_peer
) > 50 and not args.trusted_peer.endswith('onion'):
# I2P
shared.trusted_peer = (args.trusted_peer.encode(), 'i2p')
else:
@ -99,6 +117,17 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
if args.i2p_transient:
shared.i2p_transient = True
if socks is None:
return
if args.tor:
shared.tor = True
if not args.socks_proxy:
args.socks_proxy = '127.0.0.1:9050'
if args.socks_proxy:
if not re.match(r'^.*://', args.socks_proxy):
args.socks_proxy = '//' + args.socks_proxy
shared.socks_proxy = parse.urlparse(args.socks_proxy, scheme='socks5')
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to core nodes"""
@ -242,7 +271,30 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
if shared.ip_enabled and not shared.trusted_peer:
if shared.socks_proxy and shared.send_outgoing_connections:
try:
socks.PROXY_TYPES[shared.socks_proxy.scheme.upper()]
except KeyError:
logging.error('Unsupported proxy schema!')
return
if shared.tor:
try:
from . import tor # pylint: disable=import-outside-toplevel
if not tor.start_tor_service():
logging.warning('The tor service has not started.')
tor = None
except ImportError:
logging.info('Failed to import tor module.', exc_info=True)
tor = None
if not tor:
try:
socket.socket().bind(('127.0.0.1', 9050))
return
except (OSError, socket.error):
pass
elif shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
if shared.i2p_enabled:

View File

@ -11,7 +11,7 @@ import threading
import time
from . import proofofwork, shared, structure
from .connection import Bootstrapper, Connection
from .connection import Bootstrapper, Connection, SocksConnection
from .i2p import I2PDialer
@ -28,6 +28,10 @@ class Manager(threading.Thread):
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
# Publish onion 4-8 min later
self.last_published_onion_peer = \
self.last_published_i2p_destination - 3 * 3600 + \
random.uniform(1, 2) * 240 # nosec B311
def fill_bootstrap_pool(self):
"""Populate the bootstrap pool by core nodes and checked ones"""
@ -59,6 +63,9 @@ class Manager(threading.Thread):
if now - self.last_published_i2p_destination > 3600:
self.publish_i2p_destination()
self.last_published_i2p_destination = now
if now - self.last_published_onion_peer > 3600 * 4:
self.publish_onion_peer()
self.last_published_onion_peer = now
@staticmethod
def clean_objects():
@ -96,6 +103,7 @@ class Manager(threading.Thread):
self.fill_bootstrap_pool()
return
logging.info('Starting a bootstrapper for %s:%s', *target)
shared.node_pool.discard(target)
connect(target, Bootstrapper)
outgoing_connections = 0
@ -103,6 +111,10 @@ class Manager(threading.Thread):
if not c.is_alive() or c.status == 'disconnected':
with shared.connections_lock:
shared.connections.remove(c)
try:
del shared.nonce_pool[(c.host, c.port)]
except KeyError:
pass
else:
hosts.add(structure.NetAddrNoPrefix.network_group(c.host))
if not c.server:
@ -123,17 +135,31 @@ class Manager(threading.Thread):
):
if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16:
sample_length = 16
if shared.tor:
if len(shared.onion_unchecked_pool) > 4:
to_connect.update(random.sample(
tuple(shared.onion_unchecked_pool), 4))
else:
to_connect.update(shared.onion_unchecked_pool)
shared.onion_unchecked_pool.difference_update(to_connect)
if len(shared.onion_pool) > 2:
to_connect.update(random.sample(
tuple(shared.onion_pool), 2))
else:
to_connect.update(shared.onion_pool)
sample_length = 8
if len(shared.unchecked_node_pool) > sample_length:
to_connect.update(random.sample(
tuple(shared.unchecked_node_pool), 16))
tuple(shared.unchecked_node_pool), sample_length))
else:
to_connect.update(shared.unchecked_node_pool)
if outgoing_connections < shared.outgoing_connections / 2:
bootstrap()
shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8:
if len(shared.node_pool) > sample_length / 2:
to_connect.update(random.sample(
tuple(shared.node_pool), 8))
tuple(shared.node_pool), int(sample_length / 2)))
else:
to_connect.update(shared.node_pool)
@ -157,10 +183,7 @@ class Manager(threading.Thread):
if port == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and host != shared.i2p_dest_pub:
try:
d = I2PDialer(
shared,
host, shared.i2p_session_nick,
shared.i2p_sam_host, shared.i2p_sam_port)
d = I2PDialer(shared, host)
d.start()
hosts.add(d.destination)
shared.i2p_dialers.add(d)
@ -171,7 +194,9 @@ class Manager(threading.Thread):
else:
continue
else:
connect((host, port))
connect(
(host, port),
Connection if not shared.socks_proxy else SocksConnection)
hosts.add(group)
@staticmethod
@ -210,6 +235,17 @@ class Manager(threading.Thread):
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'br'
) as src:
shared.onion_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
@ -257,6 +293,13 @@ class Manager(threading.Thread):
shared.i2p_unchecked_node_pool = set(random.sample(
tuple(shared.i2p_unchecked_node_pool), 100))
if len(shared.onion_pool) > 1000:
shared.onion_pool = set(
random.sample(shared.onion_pool, 1000))
if len(shared.onion_unchecked_pool) > 100:
shared.onion_unchecked_pool = set(
random.sample(shared.onion_unchecked_pool, 100))
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'bw'
@ -266,7 +309,11 @@ class Manager(threading.Thread):
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.i2p_node_pool, dst, protocol=3)
logging.debug('Saved nodes')
with open(
os.path.join(shared.data_directory, 'onion_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.onion_pool, dst, protocol=3)
logging.debug('Saved nodes')
except Exception:
logging.warning('Error while saving nodes', exc_info=True)
@ -278,7 +325,16 @@ class Manager(threading.Thread):
dest_pub_raw = base64.b64decode(
shared.i2p_dest_pub, altchars=b'-~')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
shared.stream, object_payload=dest_pub_raw)
proofofwork.do_pow_and_publish(obj)
@staticmethod
def publish_onion_peer():
"""Make and publish a `structure.OnionPeer`"""
if shared.onion_hostname:
logging.info('Publishing our onion peer')
obj = structure.OnionPeer(
shared.onion_hostname, shared.listening_port).to_object()
proofofwork.do_pow_and_publish(obj)

View File

@ -114,9 +114,10 @@ def _payload_read_int(data):
class Version(IMessage):
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent, streams=None
self, host, port,
nonce=shared.nonce, services=shared.services,
*, streams=None, user_agent=shared.user_agent,
protocol_version=shared.protocol_version,
):
self.host = host
self.port = port
@ -189,7 +190,8 @@ class Version(IMessage):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
host, port, nonce, services, streams=streams,
protocol_version=protocol_version, user_agent=user_agent)
class Inv(IMessage):

View File

@ -19,8 +19,13 @@ def _pow_worker(target, initial_hash, q):
while trial_value > target:
nonce += 1
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(
struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
try:
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(
struct.pack('>Q', nonce) + initial_hash
).digest()).digest()[:8])[0]
except KeyboardInterrupt:
q.put(None)
return
q.put(struct.pack('>Q', nonce))
@ -36,11 +41,16 @@ def _worker(obj):
nonce = q.get()
p.join()
if nonce is None:
if not shared.shutting_down:
logging.warning('Got None nonce from _pow_worker!')
return
logging.debug(
'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t)
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
obj.expires_time, obj.object_type, obj.version, obj.stream_number,
object_payload=obj.object_payload, nonce=nonce)
logging.debug(
'Object vector is %s', base64.b16encode(obj.vector).decode())

View File

@ -21,11 +21,17 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.3/'
user_agent = b'/MiNode:0.3.4/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250
i2p_dest_obj_version = 1
onion_obj_type = 0x746f72
onion_obj_version = 3
socks_proxy = None
tor = False
onion_hostname = ''
i2p_enabled = False
i2p_transient = False
@ -54,11 +60,15 @@ core_nodes = set()
node_pool = set()
unchecked_node_pool = set()
nonce_pool = {}
i2p_core_nodes = set()
i2p_node_pool = set()
i2p_unchecked_node_pool = set()
onion_pool = set()
onion_unchecked_pool = set()
outgoing_connections = 8
connection_limit = 250

View File

@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
"""Protocol structures"""
import base64
import binascii
import hashlib
import logging
import re
import socket
import struct
import time
@ -62,8 +64,8 @@ class VarInt(IStructure):
class Object():
"""The 'object' message payload"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
self, expires_time, object_type, version, stream_number,
*, object_payload, tag=None, nonce=b'\x00' * 8
):
self.nonce = nonce
self.expires_time = expires_time
@ -74,7 +76,7 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
self.tag = tag or (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
@ -98,7 +100,8 @@ class Object():
payload[:stream_number_varint_length]).n
payload = payload[stream_number_varint_length:]
return cls(
nonce, expires_time, object_type, version, stream_number, payload)
expires_time, object_type, version, stream_number,
object_payload=payload, nonce=nonce)
def to_bytes(self):
"""Serialize to bytes object payload"""
@ -227,7 +230,7 @@ class NetAddr(IStructure):
def to_bytes(self):
b = b''
b += struct.pack('>Q', int(time.time()))
b += struct.pack('>Q', int(time.time() - 5400))
b += struct.pack('>I', self.stream)
b += NetAddrNoPrefix(self.services, self.host, self.port).to_bytes()
return b
@ -237,3 +240,42 @@ class NetAddr(IStructure):
stream, net_addr = struct.unpack('>QI26s', b)[1:]
n = NetAddrNoPrefix.from_bytes(net_addr)
return cls(n.services, n.host, n.port, stream)
class OnionPeer():
"""An object, containing onion peer info"""
def __init__(self, host, port=8444, stream=None, dest_pub=None):
self.stream = stream or shared.stream
self.host = host
self.port = port
try:
self.dest_pub = dest_pub or base64.b32decode(
re.search(r'(.*)\.onion', host).groups()[0], True)
except (AttributeError, binascii.Error) as e:
raise ValueError('Malformed hostname') from e
def __repr__(self):
return 'onion_peer, stream: {}, host: {}, port {}'.format(
self.stream, self.host, self.port)
def to_object(self):
"""Encode the onion peer to the `Object`"""
payload = b''
payload += VarInt(self.port).to_bytes()
payload += b'\xfd\x87\xd8\x7e\xeb\x43'
payload += self.dest_pub
return Object(
int(time.time() + 8 * 3600), shared.onion_obj_type,
shared.onion_obj_version, self.stream, object_payload=payload)
@classmethod
def from_object(cls, obj):
"""Decode the onion peer from an `Object` instance"""
payload = obj.object_payload
port_length = VarInt.length(payload[0])
port = VarInt.from_bytes(payload[:port_length]).n
if payload[port_length:port_length + 6] != b'\xfd\x87\xd8\x7e\xeb\x43':
raise ValueError('Malformed onion peer object')
dest_pub = payload[port_length + 6:]
host = base64.b32encode(dest_pub).lower().decode() + '.onion'
return cls(host, port, obj.stream_number, dest_pub)

9
minode/tests/common.py Normal file
View File

@ -0,0 +1,9 @@
"""Common expressions for the tests"""
import socket
try:
socket.socket().bind(('127.0.0.1', 9050))
tor_port_free = True
except (OSError, socket.error):
tor_port_free = False

View File

@ -13,6 +13,8 @@ import psutil
from minode.i2p import util
from minode.structure import NetAddrNoPrefix
from .common import tor_port_free
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
@ -185,3 +187,21 @@ class TestProcessI2P(TestProcess):
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
@unittest.skipIf(tor_port_free, 'No running tor detected')
class TestProcessTor(TestProcessProto):
"""A test case for minode process running with tor enabled"""
_process_cmd = ['minode', '--tor']
_wait_time = 60
def test_connections(self):
"""Check minode process connections"""
for _ in range(self._wait_time):
time.sleep(0.5)
connections = self.connections()
for c in connections:
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 9050)
if len(connections) > self._connection_limit / 2:
break

View File

@ -0,0 +1,77 @@
"""Special tests for PoW"""
import base64
import logging
import multiprocessing
import os
import queue
import signal
import threading
import time
import unittest
from minode import proofofwork, shared, structure
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] [%(levelname)s] %(message)s')
multiprocessing.set_start_method('spawn')
class TestProofofwork(unittest.TestCase):
"""Test components of proof of work"""
def setUp(self):
shared.objects = {}
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
int(time.time() + 300), 42, 1,
shared.stream, object_payload=b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
obj.expires_time, obj.object_type, obj.version, obj.stream_number,
object_payload=obj.object_payload, nonce=nonce)
self.assertTrue(obj.is_valid())
def test_interrupt(self):
"""Send signals to _pow_worker process"""
obj = structure.Object(
int(time.time() + 300), 42, 1, 2, object_payload=os.urandom(4096))
# pylint: disable=protected-access
threading.Thread(target=proofofwork._worker, args=(obj, )).start()
time.sleep(1)
worker = multiprocessing.active_children()[0]
# worker.terminate()
os.kill(worker.pid, signal.SIGINT)
time.sleep(1)
self.assertFalse(worker.is_alive())

View File

@ -1,7 +1,5 @@
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
@ -19,10 +17,13 @@ sample_addr_data = unhexlify(
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
# expires_time, 42, 1, 2, object_payload=b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
sample_onion_host = \
'bmtestlmgmvpbsg7kzmrxu47chs3cdou2tj4t5iloocgujzsf3e7rbqd.onion'
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
@ -135,13 +136,13 @@ class TestStructure(unittest.TestCase):
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
int(time.time() + 3000000), 42, 1, 1, object_payload=b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
int(time.time() + 300), 42, 1, 2, object_payload=b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
@ -155,40 +156,20 @@ class TestStructure(unittest.TestCase):
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())
def test_onion_peer(self):
"""Make an onion peer object and decode it back"""
with self.assertRaises(ValueError):
onion_peer = structure.OnionPeer('testing2')
with self.assertRaises(ValueError):
onion_peer = structure.OnionPeer('testing.onion')
onion_peer = structure.OnionPeer(sample_onion_host)
self.assertEqual(onion_peer.stream, shared.stream)
obj = onion_peer.to_object()
self.assertEqual(obj.object_type, shared.onion_obj_type)
self.assertEqual(obj.version, shared.onion_obj_version)
decoded = structure.OnionPeer.from_object(obj)
self.assertEqual(decoded.dest_pub, onion_peer.dest_pub)
self.assertEqual(decoded.port, onion_peer.port)
obj.object_payload = obj.object_payload[0:1] + obj.object_payload[2:]
with self.assertRaises(ValueError):
structure.OnionPeer.from_object(obj)

64
minode/tests/test_tor.py Normal file
View File

@ -0,0 +1,64 @@
"""Tests for tor module"""
import collections
import os
import tempfile
import unittest
from minode import shared
from .common import tor_port_free
try:
from minode import tor
except ImportError:
tor = None
Proxy = collections.namedtuple('Proxy', ['hostname', 'port'])
@unittest.skipIf(
tor_port_free or tor is None, 'Inapropriate environment for tor service')
class TestTor(unittest.TestCase):
"""A test case running the tor service"""
tor = None
_files = ['onion_dest_priv.key', 'onion_dest.pub']
@classmethod
def cleanup(cls):
"""Remove used files"""
for f in cls._files:
try:
os.remove(os.path.join(shared.data_directory, f))
except FileNotFoundError:
pass
@classmethod
def setUpClass(cls):
shared.data_directory = tempfile.gettempdir()
shared.socks_proxy = Proxy('127.0.0.1', 9050)
cls.cleanup()
@classmethod
def tearDownClass(cls):
if cls.tor:
cls.tor.close()
cls.cleanup()
def test_tor(self):
"""Start the tor service as in main and check the environment"""
self.tor = tor.start_tor_service()
if not self.tor:
self.fail('The tor service has hot started.')
with open(
os.path.join(shared.data_directory, 'onion_dest.pub'),
'r', encoding='ascii'
) as key_file:
onion_key = key_file.read()
self.assertEqual(onion_key + '.onion', shared.onion_hostname)
# with open(
# os.path.join(shared.data_directory, 'onion_dest_priv.key'), 'rb'
# ) as key_file:
# private_key = key_file.read()

147
minode/tor.py Normal file
View File

@ -0,0 +1,147 @@
"""Tor specific procedures"""
import logging
import os
import stat
import random
import tempfile
import stem
import stem.control
import stem.process
import stem.util
import stem.version
from . import shared
def logwrite(line):
"""A simple log writing handler for tor messages"""
try:
level, line = line.split('[', 1)[1].split(']', 1)
except (IndexError, ValueError):
logging.warning(line)
else:
if level in ('err', 'warn'):
logging.info('(tor)%s', line)
def start_tor_service():
"""Start own tor instance and configure a hidden service"""
try:
socket_dir = os.path.join(shared.data_directory, 'tor')
os.makedirs(socket_dir, exist_ok=True)
except OSError:
try:
socket_dir = tempfile.mkdtemp()
except OSError:
logging.info('Failed to create a temp dir.')
return
if os.getuid() == 0:
logging.info('Tor is not going to start as root')
return
try:
present_permissions = os.stat(socket_dir)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
allowed_permissions = ((1 << 32) - 1) ^ disallowed_permissions
os.chmod(socket_dir, allowed_permissions & present_permissions)
except OSError:
logging.debug('Failed to set dir permissions.')
return
stem.util.log.get_logger().setLevel(logging.WARNING)
control_socket = os.path.abspath(os.path.join(socket_dir, 'tor_control'))
port = str(shared.socks_proxy.port)
tor_config = {
'SocksPort': port,
'ControlSocket': control_socket}
for attempt in range(20):
if attempt > 0:
port = random.randint(32767, 65535) # nosec B311
tor_config['SocksPort'] = str(port)
try:
stem.process.launch_tor_with_config(
tor_config, take_ownership=True, timeout=90,
init_msg_handler=logwrite)
except OSError:
if not attempt:
if not shared.listen_for_connections:
return
try:
stem.version.get_system_tor_version()
except IOError:
return
continue
else:
logging.info('Started tor on port %s', port)
break
else:
logging.debug('Failed to start tor.')
return
if not shared.listen_for_connections:
return True
try:
controller = stem.control.Controller.from_socket_file(control_socket)
controller.authenticate()
except stem.SocketError:
logging.debug('Failed to instantiate or authenticate on controller.')
return
onionkey = onionkeytype = None
try:
with open(
os.path.join(shared.data_directory, 'onion_dest_priv.key'),
'r', encoding='ascii'
) as src:
onionkey = src.read()
logging.debug('Loaded onion service private key.')
onionkeytype = 'ED25519-V3'
except FileNotFoundError:
pass
except Exception:
logging.info(
'Error while loading onion service private key.', exc_info=True)
response = controller.create_ephemeral_hidden_service(
shared.listening_port, key_type=onionkeytype or 'NEW',
key_content=onionkey or 'BEST'
)
if not response.is_ok():
logging.info('Bad response from controller ):')
return
shared.onion_hostname = '{}.onion'.format(response.service_id)
logging.info('Started hidden service %s', shared.onion_hostname)
if onionkey:
return controller
try:
with open(
os.path.join(shared.data_directory, 'onion_dest_priv.key'),
'w', encoding='ascii'
) as src:
src.write(response.private_key)
logging.debug('Saved onion service private key.')
except Exception:
logging.warning(
'Error while saving onion service private key.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'onion_dest.pub'),
'w', encoding='ascii'
) as src:
src.write(response.service_id)
logging.debug('Saved onion service public key.')
except Exception:
logging.warning(
'Error while saving onion service public key.', exc_info=True)
return controller

View File

@ -1,2 +1,4 @@
coverage
psutil
PySocks
stem

View File

@ -24,6 +24,7 @@ setup(
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
entry_points={'console_scripts': ['minode = minode.main:main']},
extras_require={'proxy': ['PySocks'], 'tor': ['PySocks', 'stem>1.8.0']},
classifiers=[
"License :: OSI Approved :: MIT License"
"Operating System :: OS Independent",

View File

@ -3,6 +3,8 @@ envlist = reset,py3{6,7,8,9,10,11},stats
skip_missing_interpreters = true
[testenv]
setenv =
HOME = {envtmpdir}
deps = -rrequirements.txt
commands =
coverage run -a -m tests