Compare commits

...

43 Commits

Author SHA1 Message Date
Lee Miller c9a3877b92
Lower logging level for connection error messages in I2PDialer 2023-12-24 01:44:08 +02:00
Lee Miller d7ee73843e
Adjust pylint design checker parameters:
raise max-args to 8, add max-attributes with the same value.
2023-10-14 03:43:22 +03:00
Lee Miller 9bcaea12cf
Specifically skip B311 in manager by bandit 2023-10-14 01:06:31 +03:00
Lee Miller e4c2c1be16
Make load_data a static method in manager,
use ascii while loading nodes csv.
2023-10-14 01:06:31 +03:00
Lee Miller a7187d8dfd
Suppress some too-many-* pylint design warnings in parse_arguments() 2023-10-14 01:06:31 +03:00
Lee Miller ddf07fd506
Set object tag for object types supporting it 2023-10-12 19:50:18 +03:00
Lee Miller 2145f5839e
Cover the main proofofwork call and worker procedure 2023-10-12 19:50:15 +03:00
Lee Miller b806906af4
Add Error message class, handle fatal 2023-10-12 19:49:32 +03:00
Lee Miller 3f61bd694b
Define a helper function to read a varint and trim payload 2023-10-12 19:49:32 +03:00
Lee Miller 7812e4bbc2
Use shared.stream when assembling i2p_dest object instead of hardcoded 1 2023-10-12 19:49:32 +03:00
Lee Miller fda6ecfe01
Unify and improve message.Version:
- from_message() decoding method as in other messages;
  - support multiple streams and move stream check to connection;
  - use shared.stream instead of hardcoded 1;
  - replace values from shared with the instance attributes in to_bytes(),
    put conventional 1 as services of a remote host.
2023-10-12 19:49:32 +03:00
Lee Miller 428580a980
Add a test for version message 2023-10-12 19:49:32 +03:00
Lee Miller 399fc6f21f
Improve structure.Object:
- use shared.stream instead of hardcoded 1;
  - reuse pow_initial_hash() in is_valid().
2023-10-12 19:49:31 +03:00
Lee Miller 218905739c
Add a test for object covering also proofofwork 2023-10-12 19:48:52 +03:00
Lee Miller e4887734a0
Send ping into inactive connection, not pong 2023-10-07 17:55:49 +03:00
Lee Miller ae40a3d0b8
Update copyright notes 2023-10-07 17:53:11 +03:00
Lee Miller fe508c176b
Update the Specification link 2023-08-23 02:47:03 +03:00
Lee Miller 58a80bb4a4
Remove unreachable except clause for ConnectionResetError
- handled in socket.error branch. TODO: follow PEP 3151
2023-08-20 01:19:11 +03:00
Lee Miller 8755e56167
Replace Manager.clean_objects() by the extended version from main
and call it upon the Manager start.
2023-08-20 01:14:17 +03:00
Lee Miller 45a4a8fd31
Manifest disconnecting 2023-08-16 03:42:16 +03:00
Lee Miller 644a09ba0b
Set maximum args to 7 for pylint design checker 2023-08-15 00:13:36 +03:00
Lee Miller 67ecbf95d3
Suppress false positives on unsubscriptable-object in connection 2023-08-14 23:59:57 +03:00
Lee Miller b38e00c0a3
Handle pylint warnings in test_process, suppress fixme globally 2023-08-14 06:05:29 +03:00
Lee Miller e249e501cc
Fix formatting lint issues 2023-08-14 05:53:20 +03:00
Lee Miller 4f1e14da2a
Finish test_address() using a sample data, generated with pybitmessage 2023-08-14 05:15:09 +03:00
Lee Miller dd2b0b89af
Improve docstrings in message and structure and add more 2023-08-14 05:15:08 +03:00
Lee Miller 3788b12a28
Complete test_packet()
with parsing a prepared message and assertion of validation,
including message.Header; use magic_bytes from shared
assuming the value from the Spec.

Cover message.Message except for repr
2023-08-14 05:14:31 +03:00
Lee Miller c6d8bd64b2
Bump version to 0.3.2 2023-08-12 06:36:17 +03:00
Lee Miller 6558245a32
Minimal implementation of anti-intersection delay to wait before getdata 2023-08-12 06:35:15 +03:00
Lee Miller 1dfe98cf1f
Test shutting down minode --i2p if there is no running i2pd 2023-08-10 03:55:48 +03:00
Lee Miller e8dc62f08b
Allow shutting down while starting I2P listener 2023-08-10 03:52:16 +03:00
Lee Miller 761c95d561
Fix SSLError in incoming connection with python 3.10 2023-08-04 00:27:38 +03:00
Lee Miller 42995c5ca7
Check host and port of I2P connections 2023-08-02 23:02:21 +03:00
Lee Miller c6d0160001
Move s and state into the base 2023-08-02 05:23:33 +03:00
Dmitri Bogomolov 82c4062325
Inherit I2P classes from base util.I2PThread() 2023-08-02 05:23:33 +03:00
Lee Miller 0584956d13
Install and start i2pd in buildbot 2023-08-02 05:23:33 +03:00
Dmitri Bogomolov 7113916347
Try to test with i2pd:
- TestProcessI2P runs minode with i2p args with _connection_limit = 4
 - TestProcess waits for connections _wait_time sec (120 for TestProcessI2P)
2023-08-02 05:23:33 +03:00
Lee Miller 1d82774c96
Fix unused variable in test_process 2023-08-02 05:17:58 +03:00
Lee Miller 5abaa94d42
Rename pylint section in tox.ini, suppress f-string suggestion for now 2023-08-02 05:11:59 +03:00
Lee Miller 1a1db393c1
Fix some pylint warnings in the tests 2023-08-02 00:17:06 +03:00
Lee Miller 5fc10563f6
Update command line dump 2023-07-30 03:46:04 +03:00
Lee Miller e95f2b522a
Add basic docstrings 2023-07-30 03:45:59 +03:00
Lee Miller 5a4d6b686e
Replace contact and URLs 2023-07-28 00:11:30 +03:00
23 changed files with 499 additions and 207 deletions

View File

@ -2,8 +2,15 @@ FROM ubuntu:focal
RUN apt-get update
RUN apt-get install -yq software-properties-common
RUN apt-add-repository ppa:purplei2p/i2pd
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
RUN python3.9 -m pip install setuptools wheel
RUN python3.9 -m pip install --upgrade pip tox virtualenv

3
.buildbot/ubuntu/build.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
sudo service i2pd start

View File

@ -1,6 +1,7 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -11,7 +11,7 @@ objects inside the network.
## Running
```
git clone https://github.com/g1itch/MiNode.git
git clone https://git.bitmessage.org/lee.miller/MiNode.git
```
```
cd MiNode
@ -30,6 +30,7 @@ usage: main.py [-h] [-p PORT] [--host HOST] [--debug] [--data-dir DATA_DIR]
[--connection-limit CONNECTION_LIMIT] [--i2p]
[--i2p-tunnel-length I2P_TUNNEL_LENGTH]
[--i2p-sam-host I2P_SAM_HOST] [--i2p-sam-port I2P_SAM_PORT]
[--i2p-transient]
optional arguments:
-h, --help show this help message and exit
@ -51,6 +52,7 @@ optional arguments:
Host of I2P SAMv3 bridge
--i2p-sam-port I2P_SAM_PORT
Port of I2P SAMv3 bridge
--i2p-transient Generate new I2P destination on start
```
@ -80,8 +82,8 @@ If you add `trustedpeer = 127.0.0.1:8444` to `keys.dat` file in PyBitmessage it
will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Contact
- g1itch: BM-NC4h7r3HGcJgqNuwSEpGcSiVij3BKuXa
- lee.miller: BM-2cX1pX2goWAuZB5bLqj17x23EFjufHmygv
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)

View File

@ -1,3 +1,6 @@
"""
Advertiser thread advertises new addresses and objects among all connections
"""
import logging
import threading
import time
@ -6,6 +9,7 @@ from . import message, shared
class Advertiser(threading.Thread):
"""The advertiser thread"""
def __init__(self):
super().__init__(name='Advertiser')

View File

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
"""The logic and behaviour of a single connection"""
import base64
import errno
import logging
import math
import random
import select
import socket
@ -14,6 +16,7 @@ from . import message, shared, structure
class Connection(threading.Thread):
"""The connection object"""
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
@ -35,7 +38,7 @@ class Connection(threading.Thread):
self.vectors_to_get = set()
self.vectors_to_send = set()
self.vectors_requested = dict()
self.vectors_requested = {}
self.status = 'ready'
@ -62,6 +65,7 @@ class Connection(threading.Thread):
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
def run(self):
if self.s is None:
@ -106,11 +110,7 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
@ -135,13 +135,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'pong', b''))
self.send_queue.put(message.Message(b'ping', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.debug(
logging.info(
'Disconnected from %s:%s', self.host_print, self.port)
break
time.sleep(0.2)
@ -188,7 +188,10 @@ class Connection(threading.Thread):
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
context = ssl.create_default_context()
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -248,6 +251,8 @@ class Connection(threading.Thread):
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
if len(shared.node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
@ -330,7 +335,9 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -415,8 +422,12 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
@ -424,7 +435,14 @@ class Connection(threading.Thread):
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys())
if self.vectors_to_get:
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
if len(self.vectors_to_get) > 64:

View File

@ -1,3 +1,4 @@
"""A package for working with I2P"""
from .controller import I2PController
from .dialer import I2PDialer
from .listener import I2PListener

View File

@ -3,22 +3,22 @@ import base64
import logging
import os
import socket
import threading
import time
from .util import receive_line, pub_from_priv
from .util import I2PThread, pub_from_priv
class I2PController(threading.Thread):
class I2PController(I2PThread):
def __init__(self, state, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(name='I2P Controller')
super().__init__(state, name='I2P Controller')
self.state = state
self.host = host
self.port = port
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
while True:
if state.shutting_down:
return
try:
self.s = socket.create_connection((self.host, self.port))
break
@ -41,15 +41,6 @@ class I2PController(threading.Thread):
self.create_session()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PController <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PController -> %s', command)
self.s.sendall(command)
def init_connection(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()

View File

@ -1,23 +1,22 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import receive_line
from .util import I2PThread
class I2PDialer(threading.Thread):
class I2PDialer(I2PThread):
def __init__(
self, state, destination, nick, sam_host='127.0.0.1', sam_port=7656
):
self.state = state
self.sam_host = sam_host
self.sam_port = sam_port
self.nick = nick
self.destination = destination
super().__init__(name='I2P Dial to {}'.format(self.destination))
super().__init__(state, name='I2P Dial to {}'.format(self.destination))
self.s = socket.create_connection((self.sam_host, self.sam_port))
@ -34,20 +33,11 @@ class I2PDialer(threading.Thread):
c.start()
self.state.connections.add(c)
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PDialer <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PDialer -> %s', command)
self.s.sendall(command)
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.warning('Error while connecting to %s', self.destination)
logging.debug('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -55,6 +45,5 @@ class I2PDialer(threading.Thread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.warning(
'Error while connecting to %s', self.destination)
logging.debug('Error while connecting to %s', self.destination)
self.success = False

View File

@ -1,35 +1,22 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import receive_line
from .util import I2PThread
class I2PListener(threading.Thread):
class I2PListener(I2PThread):
def __init__(self, state, nick, host='127.0.0.1', port=7656):
super().__init__(name='I2P Listener')
super().__init__(state, name='I2P Listener')
self.state = state
self.host = host
self.port = port
self.nick = nick
self.s = None
self.version_reply = []
self.new_socket()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def new_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import threading
def receive_line(s):
@ -14,16 +15,35 @@ def receive_line(s):
return data[0]
class I2PThread(threading.Thread):
"""
Abstract I2P thread with _receive_line() and _send() methods,
reused in I2PDialer, I2PListener and I2PController
"""
def __init__(self, state, name=''):
super().__init__(name=name)
self.state = state
self.s = None
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def pub_from_priv(priv):
priv = base64.b64decode(priv, altchars=b'-~')
# 256 for public key + 128 for signing key + 3 for certificate header
# + value of bytes priv[385:387]
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
pub = base64.b64encode(pub, altchars=b'-~')
return pub
return base64.b64encode(pub, altchars=b'-~')
def b32_from_pub(pub):
return base64.b32encode(
hashlib.sha256(base64.b64decode(pub, b'-~')).digest()
).replace(b"=", b"").lower() + b'.b32.i2p'
).replace(b'=', b'').lower() + b'.b32.i2p'

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Listener thread creates connection objects for incoming connections"""
import logging
import socket
import threading
@ -8,6 +9,7 @@ from .connection import Connection
class Listener(threading.Thread):
"""The listener thread"""
def __init__(self, host, port, family=socket.AF_INET):
super().__init__(name='Listener')
self.host = host

View File

@ -1,11 +1,10 @@
# -*- coding: utf-8 -*-
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -16,11 +15,13 @@ from .listener import Listener
def handler(s, f): # pylint: disable=unused-argument
"""Signal handler"""
logging.info('Gracefully shutting down MiNode')
shared.shutting_down = True
def parse_arguments():
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
"""Parsing arguments"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
parser.add_argument('--host', help='Listening host')
@ -99,56 +100,8 @@ def parse_arguments():
shared.i2p_transient = True
def load_data():
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8080))
@ -165,6 +118,7 @@ def bootstrap_from_dns():
def start_ip_listener():
"""Starts `.listener.Listener`"""
listener_ipv4 = None
listener_ipv6 = None
@ -191,7 +145,7 @@ def start_ip_listener():
'Error while starting IPv4 listener on port %s.'
' However the IPv6 one seems to be working'
' and will probably accept IPv4 connections.', # 48 on macos
shared.listening_port, exc_info=(e.errno not in (48, 98)))
shared.listening_port, exc_info=e.errno not in (48, 98))
else:
logging.warning(
'Error while starting IPv4 listener on port %s.'
@ -201,6 +155,7 @@ def start_ip_listener():
def start_i2p_listener():
"""Starts I2P threads"""
# Grab I2P destinations from old object file
for obj in shared.objects.values():
if obj.object_type == shared.i2p_dest_obj_type:
@ -263,6 +218,7 @@ def start_i2p_listener():
def main():
"""Script entry point"""
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
@ -282,8 +238,6 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
@ -292,18 +246,6 @@ def main():
# so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager()
manager.start()

View File

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
@ -14,6 +16,7 @@ from .i2p import I2PDialer
class Manager(threading.Thread):
"""The manager thread"""
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
@ -23,9 +26,11 @@ class Manager(threading.Thread):
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
now = time.time()
@ -51,12 +56,17 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
if shared.objects[vector].is_expired():
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
with shared.objects_lock:
del shared.objects[vector]
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
@staticmethod
def manage_connections():
@ -136,6 +146,59 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
@ -185,5 +248,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
1, dest_pub_raw)
shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Protocol message objects"""
import base64
import hashlib
import struct
@ -8,6 +9,7 @@ from . import shared, structure
class Header():
"""Message header structure"""
def __init__(self, command, payload_length, payload_checksum):
self.command = command
self.payload_length = payload_length
@ -22,6 +24,7 @@ class Header():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -31,6 +34,7 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
@ -43,6 +47,7 @@ class Header():
class Message():
"""Common message structure"""
def __init__(self, command, payload):
self.command = command
self.payload = payload
@ -56,6 +61,7 @@ class Message():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
@ -64,6 +70,7 @@ class Message():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
@ -84,11 +91,19 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent
user_agent=shared.user_agent, streams=None
):
self.host = host
self.port = port
@ -97,6 +112,9 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -111,20 +129,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
shared.services, self.host, self.port).to_bytes()
1, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
shared.services, '127.0.0.1', 8444).to_bytes()
self.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_bytes(cls, b):
m = Message.from_bytes(b)
def from_message(cls, m):
payload = m.payload
( # unused: timestamp, net_addr_local
@ -138,23 +156,28 @@ class Version():
payload = payload[80:]
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent_length, payload = _payload_read_int(payload)
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
return cls(host, port, protocol_version, services, nonce, user_agent)
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
class Inv():
"""The inv message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -171,11 +194,7 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -190,6 +209,7 @@ class Inv():
class GetData():
"""The getdata message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -206,11 +226,7 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -225,6 +241,7 @@ class GetData():
class Addr():
"""The addr message payload"""
def __init__(self, addresses):
self.addresses = addresses
@ -241,11 +258,8 @@ class Addr():
def from_message(cls, m):
payload = m.payload
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
# not validating addr_count
_, payload = _payload_read_int(payload)
addresses = set()
@ -254,3 +268,37 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -1,3 +1,4 @@
"""Doing proof of work"""
import base64
import hashlib
import logging

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Common variables and structures, referred in different threads"""
import logging
import os
import queue
@ -20,7 +21,7 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.1/'
user_agent = b'/MiNode:0.3.2/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Protocol structures"""
import base64
import hashlib
import logging
@ -10,6 +11,7 @@ from . import shared
class VarInt():
"""varint object"""
def __init__(self, n):
self.n = n
@ -44,6 +46,7 @@ class VarInt():
class Object():
"""The 'object' message payload"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
@ -57,12 +60,19 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@classmethod
def from_message(cls, m):
"""Decode message payload"""
payload = m.payload
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
payload = payload[20:]
@ -77,6 +87,7 @@ class Object():
nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
@ -87,9 +98,11 @@ class Object():
return payload
def is_expired(self):
"""Check if object's TTL is expired"""
return self.expires_time + 3 * 3600 < time.time()
def is_valid(self):
"""Checks the object validity"""
if self.is_expired():
logging.debug(
'Invalid object %s, reason: expired',
@ -105,18 +118,16 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != 1:
if self.stream_number != shared.stream:
logging.warning(
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + h).digest()).digest()[:8], 'big')
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
@ -126,6 +137,7 @@ class Object():
return True
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
@ -135,10 +147,12 @@ class Object():
length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
"""Compute the initial hash for PoW"""
return hashlib.sha512(self.to_bytes()[8:]).digest()
class NetAddrNoPrefix():
"""Network address"""
def __init__(self, services, host, port):
self.services = services
self.host = host
@ -171,6 +185,7 @@ class NetAddrNoPrefix():
class NetAddr():
"""Network address with time and stream"""
def __init__(self, services, host, port, stream=shared.stream):
self.stream = stream
self.services = services

View File

@ -1,11 +1,11 @@
"""Tests for messages"""
import unittest
from binascii import unhexlify
from minode import message
from minode.shared import magic_bytes
magic = 0xE9BEB4D9
# 500 identical peers:
# import ipaddress
# from hyperbit import net, packet
@ -13,26 +13,56 @@ magic = 0xE9BEB4D9
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_data = unhexlify(
sample_addr_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# protocol.CreatePacket(b'ping', b'test')
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
def test_packet(self):
"""Check the packet created by message.Message()"""
head = unhexlify(b'%x' % magic)
self.assertEqual(
message.Message(b'ping', b'').to_bytes()[:len(head)], head)
"""Check packet creation and parsing by message.Message"""
msg = message.Message(b'ping', b'test').to_bytes()
self.assertEqual(msg[:len(magic_bytes)], magic_bytes)
with self.assertRaises(ValueError):
# wrong magic
message.Message.from_bytes(msg[1:])
with self.assertRaises(ValueError):
# wrong length
message.Message.from_bytes(msg[:-1])
with self.assertRaises(ValueError):
# wrong checksum
message.Message.from_bytes(msg[:-1] + b'\x00')
msg = message.Message.from_bytes(sample_ping_msg)
self.assertEqual(msg.command, b'ping')
self.assertEqual(msg.payload, b'test')
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_data)
msg = message.Message(b'addr', sample_addr_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -40,3 +70,32 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,5 +1,7 @@
"""Blind tests, starting the minode process"""
import unittest
import signal
import socket
import subprocess
import sys
import tempfile
@ -7,6 +9,12 @@ import time
import psutil
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
@ -42,7 +50,7 @@ class TestProcessProto(unittest.TestCase):
@classmethod
def tearDownClass(cls):
"""Ensures that pybitmessage stopped and removes files"""
"""Ensures that process stopped and removes files"""
try:
if not cls._stop_process(10):
try:
@ -52,6 +60,12 @@ class TestProcessProto(unittest.TestCase):
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
@ -72,15 +86,10 @@ class TestProcess(TestProcessProto):
"""Check minode process connections"""
_started = time.time()
def connections():
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
def continue_check_limit(extra_time):
for t in range(extra_time * 2):
for _ in range(extra_time * 2):
self.assertLessEqual(
len(connections()),
len(self.connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
@ -89,8 +98,8 @@ class TestProcess(TestProcessProto):
' by --connection-limit')
time.sleep(1)
for t in range(self._wait_time * 2):
if len(connections()) > self._connection_limit / 2:
for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
@ -105,10 +114,33 @@ class TestProcess(TestProcessProto):
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
return self.fail(
'Listening while started with --no-incoming')
self.fail('Listening while started with --no-incoming')
return
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_connection_limit = 4
_wait_time = 120
_listen = True
_listening_port = 8448
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']

View File

@ -1,13 +1,40 @@
import unittest
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
from binascii import unhexlify
from minode import structure
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
# for one of bootstrap servers, port 8080,
# everything else is like in test_message: 1626611891, 1, 1
sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -67,3 +94,80 @@ class TestStructure(unittest.TestCase):
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('0102030405060708090a0b0c0d0e0f10'))
addr = structure.NetAddr.from_bytes(sample_addr_data)
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
self.assertEqual(addr.port, 8080)
self.assertEqual(addr.stream, 1)
self.assertEqual(addr.services, 1)
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -20,7 +20,7 @@ setup(
long_description=README,
license='MIT',
author='Krzysztof Oziomek',
url='https://github.com/g1itch/MiNode',
url='https://git.bitmessage.org/lee.miller/MiNode',
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
entry_points={'console_scripts': ['minode = minode.main:main']},

View File

@ -39,5 +39,7 @@ omit =
[coverage:report]
ignore_errors = true
[MESSAGES CONTROL]
disable = invalid-name
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8