Compare commits

..

8 Commits
v0.3 ... test

Author SHA1 Message Date
Lee Miller dba3e318f3
A test case for incoming connection, needs debugging 2023-07-26 05:31:13 +03:00
Lee Miller 5804d2b2db
Catch RuntimeError for multiple invocation of main() 2023-07-26 05:14:30 +03:00
Lee Miller e505ea2047
Edit docstring mentioning pybitmessage 2023-07-26 02:50:37 +03:00
Lee Miller b34a39829f
Preparing to test with pybitmessage 2023-07-24 23:49:28 +03:00
Lee Miller 45dc9ed376
Copy connections before check 2023-03-15 14:25:12 +02:00
Lee Miller 354c975b40
Try to improve connection limit handling. 2023-03-14 23:34:32 +02:00
Lee Miller 83ddc63418
Unify main:
- same multiprocessing start method for start.sh and setuptools entry point,
 - make it possible to run outside of the main thread.
2023-01-24 06:34:05 +02:00
Lee Miller 1368f4c85c
Recreating blind tests running minode in the same process 2023-01-24 06:29:28 +02:00
24 changed files with 364 additions and 503 deletions

View File

@ -2,15 +2,18 @@ FROM ubuntu:focal
RUN apt-get update
RUN apt-get install -yq software-properties-common
RUN apt-add-repository ppa:purplei2p/i2pd
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
software-properties-common build-essential libcap-dev libffi-dev \
libssl-dev python-all-dev python-setuptools python-six git
RUN python3.9 -m pip install setuptools wheel
RUN python3.9 -m pip install --upgrade pip tox virtualenv
RUN git clone https://github.com/Bitmessage/PyBitmessage.git
RUN cd PyBitmessage; python2 setup.py install; python3 setup.py install
ADD . .

View File

@ -1,3 +0,0 @@
#!/bin/sh
sudo service i2pd start

View File

@ -1,7 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -11,7 +11,7 @@ objects inside the network.
## Running
```
git clone https://git.bitmessage.org/lee.miller/MiNode.git
git clone https://github.com/g1itch/MiNode.git
```
```
cd MiNode
@ -30,7 +30,6 @@ usage: main.py [-h] [-p PORT] [--host HOST] [--debug] [--data-dir DATA_DIR]
[--connection-limit CONNECTION_LIMIT] [--i2p]
[--i2p-tunnel-length I2P_TUNNEL_LENGTH]
[--i2p-sam-host I2P_SAM_HOST] [--i2p-sam-port I2P_SAM_PORT]
[--i2p-transient]
optional arguments:
-h, --help show this help message and exit
@ -52,7 +51,6 @@ optional arguments:
Host of I2P SAMv3 bridge
--i2p-sam-port I2P_SAM_PORT
Port of I2P SAMv3 bridge
--i2p-transient Generate new I2P destination on start
```
@ -82,8 +80,8 @@ If you add `trustedpeer = 127.0.0.1:8444` to `keys.dat` file in PyBitmessage it
will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Contact
- lee.miller: BM-2cX1pX2goWAuZB5bLqj17x23EFjufHmygv
- g1itch: BM-NC4h7r3HGcJgqNuwSEpGcSiVij3BKuXa
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)

View File

@ -1,6 +1,3 @@
"""
Advertiser thread advertises new addresses and objects among all connections
"""
import logging
import threading
import time
@ -9,7 +6,6 @@ from . import message, shared
class Advertiser(threading.Thread):
"""The advertiser thread"""
def __init__(self):
super().__init__(name='Advertiser')

View File

@ -1,9 +1,7 @@
# -*- coding: utf-8 -*-
"""The logic and behaviour of a single connection"""
import base64
import errno
import logging
import math
import random
import select
import socket
@ -16,7 +14,6 @@ from . import message, shared, structure
class Connection(threading.Thread):
"""The connection object"""
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
@ -38,7 +35,7 @@ class Connection(threading.Thread):
self.vectors_to_get = set()
self.vectors_to_send = set()
self.vectors_requested = {}
self.vectors_requested = dict()
self.status = 'ready'
@ -65,7 +62,6 @@ class Connection(threading.Thread):
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
def run(self):
if self.s is None:
@ -110,7 +106,11 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
@ -135,13 +135,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'ping', b''))
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info(
logging.debug(
'Disconnected from %s:%s', self.host_print, self.port)
break
time.sleep(0.2)
@ -188,10 +188,7 @@ class Connection(threading.Thread):
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -251,8 +248,6 @@ class Connection(threading.Thread):
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
if len(shared.node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
@ -335,9 +330,7 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
version = message.Version.from_bytes(m.to_bytes())
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -422,12 +415,8 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
@ -435,14 +424,7 @@ class Connection(threading.Thread):
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys())
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
if self.vectors_to_get:
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
if len(self.vectors_to_get) > 64:

View File

@ -1,4 +1,3 @@
"""A package for working with I2P"""
from .controller import I2PController
from .dialer import I2PDialer
from .listener import I2PListener

View File

@ -3,22 +3,22 @@ import base64
import logging
import os
import socket
import threading
import time
from .util import I2PThread, pub_from_priv
from .util import receive_line, pub_from_priv
class I2PController(I2PThread):
class I2PController(threading.Thread):
def __init__(self, state, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(state, name='I2P Controller')
super().__init__(name='I2P Controller')
self.state = state
self.host = host
self.port = port
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
while True:
if state.shutting_down:
return
try:
self.s = socket.create_connection((self.host, self.port))
break
@ -41,6 +41,15 @@ class I2PController(I2PThread):
self.create_session()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PController <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PController -> %s', command)
self.s.sendall(command)
def init_connection(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()

View File

@ -1,22 +1,23 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import I2PThread
from .util import receive_line
class I2PDialer(I2PThread):
class I2PDialer(threading.Thread):
def __init__(
self, state, destination, nick, sam_host='127.0.0.1', sam_port=7656
):
self.state = state
self.sam_host = sam_host
self.sam_port = sam_port
self.nick = nick
self.destination = destination
super().__init__(state, name='I2P Dial to {}'.format(self.destination))
super().__init__(name='I2P Dial to {}'.format(self.destination))
self.s = socket.create_connection((self.sam_host, self.sam_port))
@ -33,11 +34,20 @@ class I2PDialer(I2PThread):
c.start()
self.state.connections.add(c)
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PDialer <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PDialer -> %s', command)
self.s.sendall(command)
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -45,5 +55,6 @@ class I2PDialer(I2PThread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning(
'Error while connecting to %s', self.destination)
self.success = False

View File

@ -1,22 +1,35 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import I2PThread
from .util import receive_line
class I2PListener(I2PThread):
class I2PListener(threading.Thread):
def __init__(self, state, nick, host='127.0.0.1', port=7656):
super().__init__(state, name='I2P Listener')
super().__init__(name='I2P Listener')
self.state = state
self.host = host
self.port = port
self.nick = nick
self.s = None
self.version_reply = []
self.new_socket()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def new_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import threading
def receive_line(s):
@ -15,35 +14,16 @@ def receive_line(s):
return data[0]
class I2PThread(threading.Thread):
"""
Abstract I2P thread with _receive_line() and _send() methods,
reused in I2PDialer, I2PListener and I2PController
"""
def __init__(self, state, name=''):
super().__init__(name=name)
self.state = state
self.s = None
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def pub_from_priv(priv):
priv = base64.b64decode(priv, altchars=b'-~')
# 256 for public key + 128 for signing key + 3 for certificate header
# + value of bytes priv[385:387]
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
return base64.b64encode(pub, altchars=b'-~')
pub = base64.b64encode(pub, altchars=b'-~')
return pub
def b32_from_pub(pub):
return base64.b32encode(
hashlib.sha256(base64.b64decode(pub, b'-~')).digest()
).replace(b'=', b'').lower() + b'.b32.i2p'
).replace(b"=", b"").lower() + b'.b32.i2p'

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
"""Listener thread creates connection objects for incoming connections"""
import logging
import socket
import threading
@ -9,7 +8,6 @@ from .connection import Connection
class Listener(threading.Thread):
"""The listener thread"""
def __init__(self, host, port, family=socket.AF_INET):
super().__init__(name='Listener')
self.host = host

View File

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -15,13 +16,11 @@ from .listener import Listener
def handler(s, f): # pylint: disable=unused-argument
"""Signal handler"""
logging.info('Gracefully shutting down MiNode')
shared.shutting_down = True
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
"""Parsing arguments"""
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
parser.add_argument('--host', help='Listening host')
@ -100,8 +99,56 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
shared.i2p_transient = True
def load_data():
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8080))
@ -118,7 +165,6 @@ def bootstrap_from_dns():
def start_ip_listener():
"""Starts `.listener.Listener`"""
listener_ipv4 = None
listener_ipv6 = None
@ -145,7 +191,7 @@ def start_ip_listener():
'Error while starting IPv4 listener on port %s.'
' However the IPv6 one seems to be working'
' and will probably accept IPv4 connections.', # 48 on macos
shared.listening_port, exc_info=e.errno not in (48, 98))
shared.listening_port, exc_info=(e.errno not in (48, 98)))
else:
logging.warning(
'Error while starting IPv4 listener on port %s.'
@ -155,7 +201,6 @@ def start_ip_listener():
def start_i2p_listener():
"""Starts I2P threads"""
# Grab I2P destinations from old object file
for obj in shared.objects.values():
if obj.object_type == shared.i2p_dest_obj_type:
@ -218,10 +263,10 @@ def start_i2p_listener():
def main():
"""Script entry point"""
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
try:
multiprocessing.set_start_method('spawn')
except RuntimeError:
pass
parse_arguments()
logging.basicConfig(
@ -229,6 +274,12 @@ def main():
format='[%(asctime)s] [%(levelname)s] %(message)s')
logging.info('Starting MiNode')
try:
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
except ValueError:
logging.warning('Working outside of the main thread!')
logging.info('Data directory: %s', shared.data_directory)
if not os.path.exists(shared.data_directory):
try:
@ -238,6 +289,8 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
@ -246,6 +299,18 @@ def main():
# so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager()
manager.start()
@ -257,5 +322,4 @@ def main():
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
main()

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
@ -16,7 +14,6 @@ from .i2p import I2PDialer
class Manager(threading.Thread):
"""The manager thread"""
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
@ -26,11 +23,9 @@ class Manager(threading.Thread):
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
now = time.time()
@ -56,17 +51,12 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
if shared.objects[vector].is_expired():
with shared.objects_lock:
del shared.objects[vector]
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
@staticmethod
def manage_connections():
@ -138,7 +128,7 @@ class Manager(threading.Thread):
' an I2P connection', exc_info=True)
else:
continue
else:
elif outgoing_connections < shared.outgoing_connections:
c = Connection(addr[0], addr[1])
c.start()
hosts.add(c.host)
@ -146,59 +136,6 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
@ -248,5 +185,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
1, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
"""Protocol message objects"""
import base64
import hashlib
import struct
@ -9,7 +8,6 @@ from . import shared, structure
class Header():
"""Message header structure"""
def __init__(self, command, payload_length, payload_checksum):
self.command = command
self.payload_length = payload_length
@ -24,7 +22,6 @@ class Header():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -34,7 +31,6 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
@ -47,7 +43,6 @@ class Header():
class Message():
"""Common message structure"""
def __init__(self, command, payload):
self.command = command
self.payload = payload
@ -61,7 +56,6 @@ class Message():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
@ -70,7 +64,6 @@ class Message():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
@ -91,19 +84,11 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent, streams=None
user_agent=shared.user_agent
):
self.host = host
self.port = port
@ -112,9 +97,6 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -129,20 +111,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
1, self.host, self.port).to_bytes()
shared.services, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
self.services, '127.0.0.1', 8444).to_bytes()
shared.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_message(cls, m):
def from_bytes(cls, b):
m = Message.from_bytes(b)
payload = m.payload
( # unused: timestamp, net_addr_local
@ -156,28 +138,23 @@ class Version():
payload = payload[80:]
user_agent_length, payload = _payload_read_int(payload)
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
return cls(host, port, protocol_version, services, nonce, user_agent)
class Inv():
"""The inv message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -194,7 +171,11 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -209,7 +190,6 @@ class Inv():
class GetData():
"""The getdata message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -226,7 +206,11 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -241,7 +225,6 @@ class GetData():
class Addr():
"""The addr message payload"""
def __init__(self, addresses):
self.addresses = addresses
@ -258,8 +241,11 @@ class Addr():
def from_message(cls, m):
payload = m.payload
# not validating addr_count
_, payload = _payload_read_int(payload)
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
addresses = set()
@ -268,37 +254,3 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -1,4 +1,3 @@
"""Doing proof of work"""
import base64
import hashlib
import logging

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
"""Common variables and structures, referred in different threads"""
import logging
import os
import queue
@ -21,7 +20,7 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.2/'
user_agent = b'/MiNode:0.3.1/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
"""Protocol structures"""
import base64
import hashlib
import logging
@ -11,7 +10,6 @@ from . import shared
class VarInt():
"""varint object"""
def __init__(self, n):
self.n = n
@ -46,7 +44,6 @@ class VarInt():
class Object():
"""The 'object' message payload"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
@ -60,19 +57,12 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@classmethod
def from_message(cls, m):
"""Decode message payload"""
payload = m.payload
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
payload = payload[20:]
@ -87,7 +77,6 @@ class Object():
nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
@ -98,11 +87,9 @@ class Object():
return payload
def is_expired(self):
"""Check if object's TTL is expired"""
return self.expires_time + 3 * 3600 < time.time()
def is_valid(self):
"""Checks the object validity"""
if self.is_expired():
logging.debug(
'Invalid object %s, reason: expired',
@ -118,16 +105,18 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != shared.stream:
if self.stream_number != 1:
logging.warning(
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
self.nonce + h).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
@ -137,7 +126,6 @@ class Object():
return True
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
@ -147,12 +135,10 @@ class Object():
length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
"""Compute the initial hash for PoW"""
return hashlib.sha512(self.to_bytes()[8:]).digest()
class NetAddrNoPrefix():
"""Network address"""
def __init__(self, services, host, port):
self.services = services
self.host = host
@ -185,7 +171,6 @@ class NetAddrNoPrefix():
class NetAddr():
"""Network address with time and stream"""
def __init__(self, services, host, port, stream=shared.stream):
self.stream = stream
self.services = services

83
minode/tests/test_app.py Normal file
View File

@ -0,0 +1,83 @@
import unittest
import sys
import tempfile
import time
import threading
from minode import shared
from minode.main import main as app
class TestAppProto(unittest.TestCase):
"""Import and start the application"""
_process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 6
_listen = False
_listening_port = None
home = None
@classmethod
def _build_app_args(cls):
if not cls.home:
cls.home = tempfile.gettempdir()
args = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
args += ['--no-incoming']
elif cls._listening_port:
args += ['-p', str(cls._listening_port)]
return args
def _connections(self):
return [
c for c in shared.connections.copy()
if c.status == 'fully_established']
@classmethod
def setUpClass(cls):
sys.argv = cls._build_app_args()
cls.app = threading.Thread(name="minode", target=app, daemon=True)
cls.app.start()
@classmethod
def tearDownClass(cls):
shared.shutting_down = True
class TestApp(TestAppProto):
"""Check the app parameters"""
_wait_time = 120
_check_limit = True
def test_connections(self):
"""Check connections"""
_started = time.time()
def continue_check_limit(extra_time):
for t in range(extra_time * 4):
self.assertLessEqual(
len(self._connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
else self._connection_limit) + 1,
'Opened more connections than required'
' by --connection-limit')
time.sleep(0.5)
for t in range(self._wait_time * 2):
if len(self._connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
else:
self.fail(
'Failed establish at least %s connections in %s sec'
% (self._connection_limit / 2, self._wait_time))
if self._check_limit:
continue_check_limit(_time_to_connect)

View File

@ -1,11 +1,11 @@
"""Tests for messages"""
import unittest
from binascii import unhexlify
from minode import message
from minode.shared import magic_bytes
magic = 0xE9BEB4D9
# 500 identical peers:
# import ipaddress
# from hyperbit import net, packet
@ -13,56 +13,26 @@ from minode.shared import magic_bytes
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_addr_data = unhexlify(
sample_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# protocol.CreatePacket(b'ping', b'test')
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
def test_packet(self):
"""Check packet creation and parsing by message.Message"""
msg = message.Message(b'ping', b'test').to_bytes()
self.assertEqual(msg[:len(magic_bytes)], magic_bytes)
with self.assertRaises(ValueError):
# wrong magic
message.Message.from_bytes(msg[1:])
with self.assertRaises(ValueError):
# wrong length
message.Message.from_bytes(msg[:-1])
with self.assertRaises(ValueError):
# wrong checksum
message.Message.from_bytes(msg[:-1] + b'\x00')
msg = message.Message.from_bytes(sample_ping_msg)
self.assertEqual(msg.command, b'ping')
self.assertEqual(msg.payload, b'test')
"""Check the packet created by message.Message()"""
head = unhexlify(b'%x' % magic)
self.assertEqual(
message.Message(b'ping', b'').to_bytes()[:len(head)], head)
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_addr_data)
msg = message.Message(b'addr', sample_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -70,32 +40,3 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,19 +1,17 @@
"""Blind tests, starting the minode process"""
import unittest
import os
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import threading
import time
import psutil
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
from minode import shared
from minode.main import main as app
class TestProcessProto(unittest.TestCase):
@ -26,18 +24,23 @@ class TestProcessProto(unittest.TestCase):
home = None
@classmethod
def setUpClass(cls):
def _build_app_args(cls, extra=None):
if not cls.home:
cls.home = tempfile.gettempdir()
cmd = cls._process_cmd + [
args = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
cmd += ['--no-incoming']
args += ['--no-incoming']
elif cls._listening_port:
cmd += ['-p', str(cls._listening_port)]
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
args += ['-p', str(cls._listening_port)]
if extra:
args += extra
print('ARGS: %r' % args)
return args
@classmethod
def _stop_process(cls, timeout=5):
@ -48,9 +51,14 @@ class TestProcessProto(unittest.TestCase):
return False
return True
@classmethod
def setUpClass(cls):
cmd = cls._build_app_args()
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
@classmethod
def tearDownClass(cls):
"""Ensures that process stopped and removes files"""
"""Ensures the process is stopped and removes files"""
try:
if not cls._stop_process(10):
try:
@ -60,12 +68,6 @@ class TestProcessProto(unittest.TestCase):
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
@ -86,10 +88,15 @@ class TestProcess(TestProcessProto):
"""Check minode process connections"""
_started = time.time()
def connections():
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
def continue_check_limit(extra_time):
for _ in range(extra_time * 2):
for t in range(extra_time * 2):
self.assertLessEqual(
len(self.connections()),
len(connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
@ -98,8 +105,8 @@ class TestProcess(TestProcessProto):
' by --connection-limit')
time.sleep(1)
for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2:
for t in range(self._wait_time * 2):
if len(connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
@ -114,8 +121,8 @@ class TestProcess(TestProcessProto):
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
self.fail('Listening while started with --no-incoming')
return
return self.fail(
'Listening while started with --no-incoming')
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
@ -123,24 +130,39 @@ class TestProcess(TestProcessProto):
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_connection_limit = 4
_wait_time = 120
_listen = True
_listening_port = 8448
class TestConnectivity(TestProcessProto):
"""Check connectivity between instances"""
_process_cmd = [
'minode', '--trusted-peer', '127.0.0.1:8445']
# _listen = True
@classmethod
def setUpClass(cls):
super(TestConnectivity, cls).setUpClass()
cls._listen = True
cls._listening_port = 8445
cls.home = os.path.join(cls.home, 'client')
os.mkdir(cls.home)
# sys.argv = cls._build_app_args(['--trusted-peer', '127.0.0.1:8444'])
cls._process_cmd = ['minode', '--no-outgoing']
sys.argv = cls._build_app_args()
cls.app = threading.Thread(name="minode", target=app, daemon=True)
cls.app.start()
@classmethod
def tearDownClass(cls):
super(TestConnectivity, cls).tearDownClass()
shared.shutting_down = True
shutil.rmtree(cls.home)
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
"""Check the connection with trusted peer"""
time.sleep(5)
for t in range(10):
time.sleep(1)
connection_count = len([
c for c in shared.connections.copy()
if c.status == 'fully_established'])
if connection_count != 1:
self.fail("Unexpected connection count: %i" % connection_count)

View File

@ -1,40 +1,13 @@
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
import struct
from binascii import unhexlify
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
# for one of bootstrap servers, port 8080,
# everything else is like in test_message: 1626611891, 1, 1
sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
from minode import structure
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -94,80 +67,3 @@ class TestStructure(unittest.TestCase):
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('0102030405060708090a0b0c0d0e0f10'))
addr = structure.NetAddr.from_bytes(sample_addr_data)
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
self.assertEqual(addr.port, 8080)
self.assertEqual(addr.stream, 1)
self.assertEqual(addr.services, 1)
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -20,7 +20,7 @@ setup(
long_description=README,
license='MIT',
author='Krzysztof Oziomek',
url='https://git.bitmessage.org/lee.miller/MiNode',
url='https://github.com/g1itch/MiNode',
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
entry_points={'console_scripts': ['minode = minode.main:main']},

View File

@ -39,7 +39,5 @@ omit =
[coverage:report]
ignore_errors = true
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8
[MESSAGES CONTROL]
disable = invalid-name