Compare commits

...

39 Commits

Author SHA1 Message Date
c9a3877b92
Lower logging level for connection error messages in I2PDialer 2023-12-24 01:44:08 +02:00
d7ee73843e
Adjust pylint design checker parameters:
raise max-args to 8, add max-attributes with the same value.
2023-10-14 03:43:22 +03:00
9bcaea12cf
Specifically skip B311 in manager by bandit 2023-10-14 01:06:31 +03:00
e4c2c1be16
Make load_data a static method in manager,
use ascii while loading nodes csv.
2023-10-14 01:06:31 +03:00
a7187d8dfd
Suppress some too-many-* pylint design warnings in parse_arguments() 2023-10-14 01:06:31 +03:00
ddf07fd506
Set object tag for object types supporting it 2023-10-12 19:50:18 +03:00
2145f5839e
Cover the main proofofwork call and worker procedure 2023-10-12 19:50:15 +03:00
b806906af4
Add Error message class, handle fatal 2023-10-12 19:49:32 +03:00
3f61bd694b
Define a helper function to read a varint and trim payload 2023-10-12 19:49:32 +03:00
7812e4bbc2
Use shared.stream when assembling i2p_dest object instead of hardcoded 1 2023-10-12 19:49:32 +03:00
fda6ecfe01
Unify and improve message.Version:
- from_message() decoding method as in other messages;
  - support multiple streams and move stream check to connection;
  - use shared.stream instead of hardcoded 1;
  - replace values from shared with the instance attributes in to_bytes(),
    put conventional 1 as services of a remote host.
2023-10-12 19:49:32 +03:00
428580a980
Add a test for version message 2023-10-12 19:49:32 +03:00
399fc6f21f
Improve structure.Object:
- use shared.stream instead of hardcoded 1;
  - reuse pow_initial_hash() in is_valid().
2023-10-12 19:49:31 +03:00
218905739c
Add a test for object covering also proofofwork 2023-10-12 19:48:52 +03:00
e4887734a0
Send ping into inactive connection, not pong 2023-10-07 17:55:49 +03:00
ae40a3d0b8
Update copyright notes 2023-10-07 17:53:11 +03:00
fe508c176b
Update the Specification link 2023-08-23 02:47:03 +03:00
58a80bb4a4
Remove unreachable except clause for ConnectionResetError
- handled in socket.error branch. TODO: follow PEP 3151
2023-08-20 01:19:11 +03:00
8755e56167
Replace Manager.clean_objects() by the extended version from main
and call it upon the Manager start.
2023-08-20 01:14:17 +03:00
45a4a8fd31
Manifest disconnecting 2023-08-16 03:42:16 +03:00
644a09ba0b
Set maximum args to 7 for pylint design checker 2023-08-15 00:13:36 +03:00
67ecbf95d3
Suppress false positives on unsubscriptable-object in connection 2023-08-14 23:59:57 +03:00
b38e00c0a3
Handle pylint warnings in test_process, suppress fixme globally 2023-08-14 06:05:29 +03:00
e249e501cc
Fix formatting lint issues 2023-08-14 05:53:20 +03:00
4f1e14da2a
Finish test_address() using a sample data, generated with pybitmessage 2023-08-14 05:15:09 +03:00
dd2b0b89af
Improve docstrings in message and structure and add more 2023-08-14 05:15:08 +03:00
3788b12a28
Complete test_packet()
with parsing a prepared message and assertion of validation,
including message.Header; use magic_bytes from shared
assuming the value from the Spec.

Cover message.Message except for repr
2023-08-14 05:14:31 +03:00
c6d8bd64b2
Bump version to 0.3.2 2023-08-12 06:36:17 +03:00
6558245a32
Minimal implementation of anti-intersection delay to wait before getdata 2023-08-12 06:35:15 +03:00
1dfe98cf1f
Test shutting down minode --i2p if there is no running i2pd 2023-08-10 03:55:48 +03:00
e8dc62f08b
Allow shutting down while starting I2P listener 2023-08-10 03:52:16 +03:00
761c95d561
Fix SSLError in incoming connection with python 3.10 2023-08-04 00:27:38 +03:00
42995c5ca7
Check host and port of I2P connections 2023-08-02 23:02:21 +03:00
c6d0160001
Move s and state into the base 2023-08-02 05:23:33 +03:00
82c4062325
Inherit I2P classes from base util.I2PThread() 2023-08-02 05:23:33 +03:00
0584956d13
Install and start i2pd in buildbot 2023-08-02 05:23:33 +03:00
7113916347
Try to test with i2pd:
- TestProcessI2P runs minode with i2p args with _connection_limit = 4
 - TestProcess waits for connections _wait_time sec (120 for TestProcessI2P)
2023-08-02 05:23:33 +03:00
1d82774c96
Fix unused variable in test_process 2023-08-02 05:17:58 +03:00
5abaa94d42
Rename pylint section in tox.ini, suppress f-string suggestion for now 2023-08-02 05:11:59 +03:00
18 changed files with 464 additions and 210 deletions

View File

@ -2,8 +2,15 @@ FROM ubuntu:focal
RUN apt-get update
RUN apt-get install -yq software-properties-common
RUN apt-add-repository ppa:purplei2p/i2pd
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
RUN python3.9 -m pip install setuptools wheel
RUN python3.9 -m pip install --upgrade pip tox virtualenv

3
.buildbot/ubuntu/build.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
sudo service i2pd start

View File

@ -1,6 +1,7 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -86,4 +86,4 @@ will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)

View File

@ -3,6 +3,7 @@
import base64
import errno
import logging
import math
import random
import select
import socket
@ -37,7 +38,7 @@ class Connection(threading.Thread):
self.vectors_to_get = set()
self.vectors_to_send = set()
self.vectors_requested = dict()
self.vectors_requested = {}
self.status = 'ready'
@ -64,6 +65,7 @@ class Connection(threading.Thread):
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
def run(self):
if self.s is None:
@ -108,11 +110,7 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
@ -137,13 +135,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'pong', b''))
self.send_queue.put(message.Message(b'ping', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.debug(
logging.info(
'Disconnected from %s:%s', self.host_print, self.port)
break
time.sleep(0.2)
@ -190,7 +188,10 @@ class Connection(threading.Thread):
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
context = ssl.create_default_context()
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -250,6 +251,8 @@ class Connection(threading.Thread):
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
if len(shared.node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
@ -332,7 +335,9 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -417,8 +422,12 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
@ -426,7 +435,14 @@ class Connection(threading.Thread):
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys())
if self.vectors_to_get:
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
if len(self.vectors_to_get) > 64:

View File

@ -3,22 +3,22 @@ import base64
import logging
import os
import socket
import threading
import time
from .util import receive_line, pub_from_priv
from .util import I2PThread, pub_from_priv
class I2PController(threading.Thread):
class I2PController(I2PThread):
def __init__(self, state, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(name='I2P Controller')
super().__init__(state, name='I2P Controller')
self.state = state
self.host = host
self.port = port
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
while True:
if state.shutting_down:
return
try:
self.s = socket.create_connection((self.host, self.port))
break
@ -41,15 +41,6 @@ class I2PController(threading.Thread):
self.create_session()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PController <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PController -> %s', command)
self.s.sendall(command)
def init_connection(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()

View File

@ -1,23 +1,22 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import receive_line
from .util import I2PThread
class I2PDialer(threading.Thread):
class I2PDialer(I2PThread):
def __init__(
self, state, destination, nick, sam_host='127.0.0.1', sam_port=7656
):
self.state = state
self.sam_host = sam_host
self.sam_port = sam_port
self.nick = nick
self.destination = destination
super().__init__(name='I2P Dial to {}'.format(self.destination))
super().__init__(state, name='I2P Dial to {}'.format(self.destination))
self.s = socket.create_connection((self.sam_host, self.sam_port))
@ -34,20 +33,11 @@ class I2PDialer(threading.Thread):
c.start()
self.state.connections.add(c)
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PDialer <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PDialer -> %s', command)
self.s.sendall(command)
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.warning('Error while connecting to %s', self.destination)
logging.debug('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -55,6 +45,5 @@ class I2PDialer(threading.Thread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.warning(
'Error while connecting to %s', self.destination)
logging.debug('Error while connecting to %s', self.destination)
self.success = False

View File

@ -1,35 +1,22 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import receive_line
from .util import I2PThread
class I2PListener(threading.Thread):
class I2PListener(I2PThread):
def __init__(self, state, nick, host='127.0.0.1', port=7656):
super().__init__(name='I2P Listener')
super().__init__(state, name='I2P Listener')
self.state = state
self.host = host
self.port = port
self.nick = nick
self.s = None
self.version_reply = []
self.new_socket()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def new_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import threading
def receive_line(s):
@ -14,16 +15,35 @@ def receive_line(s):
return data[0]
class I2PThread(threading.Thread):
"""
Abstract I2P thread with _receive_line() and _send() methods,
reused in I2PDialer, I2PListener and I2PController
"""
def __init__(self, state, name=''):
super().__init__(name=name)
self.state = state
self.s = None
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def pub_from_priv(priv):
priv = base64.b64decode(priv, altchars=b'-~')
# 256 for public key + 128 for signing key + 3 for certificate header
# + value of bytes priv[385:387]
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
pub = base64.b64encode(pub, altchars=b'-~')
return pub
return base64.b64encode(pub, altchars=b'-~')
def b32_from_pub(pub):
return base64.b32encode(
hashlib.sha256(base64.b64decode(pub, b'-~')).digest()
).replace(b"=", b"").lower() + b'.b32.i2p'
).replace(b'=', b'').lower() + b'.b32.i2p'

View File

@ -2,11 +2,9 @@
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -22,7 +20,7 @@ def handler(s, f): # pylint: disable=unused-argument
shared.shutting_down = True
def parse_arguments():
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
"""Parsing arguments"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
@ -102,56 +100,6 @@ def parse_arguments():
shared.i2p_transient = True
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
try:
@ -197,7 +145,7 @@ def start_ip_listener():
'Error while starting IPv4 listener on port %s.'
' However the IPv6 one seems to be working'
' and will probably accept IPv4 connections.', # 48 on macos
shared.listening_port, exc_info=(e.errno not in (48, 98)))
shared.listening_port, exc_info=e.errno not in (48, 98))
else:
logging.warning(
'Error while starting IPv4 listener on port %s.'
@ -290,8 +238,6 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
@ -300,18 +246,6 @@ def main():
# so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager()
manager.start()

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
@ -25,9 +26,11 @@ class Manager(threading.Thread):
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
now = time.time()
@ -53,12 +56,17 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
if shared.objects[vector].is_expired():
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
with shared.objects_lock:
del shared.objects[vector]
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
@staticmethod
def manage_connections():
@ -138,6 +146,59 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
@ -187,5 +248,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
1, dest_pub_raw)
shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -9,7 +9,7 @@ from . import shared, structure
class Header():
"""Common message header"""
"""Message header structure"""
def __init__(self, command, payload_length, payload_checksum):
self.command = command
self.payload_length = payload_length
@ -24,6 +24,7 @@ class Header():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -33,6 +34,7 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
@ -59,6 +61,7 @@ class Message():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
@ -67,6 +70,7 @@ class Message():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
@ -87,12 +91,19 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message"""
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent
user_agent=shared.user_agent, streams=None
):
self.host = host
self.port = port
@ -101,6 +112,9 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -115,20 +129,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
shared.services, self.host, self.port).to_bytes()
1, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
shared.services, '127.0.0.1', 8444).to_bytes()
self.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_bytes(cls, b):
m = Message.from_bytes(b)
def from_message(cls, m):
payload = m.payload
( # unused: timestamp, net_addr_local
@ -142,24 +156,28 @@ class Version():
payload = payload[80:]
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent_length, payload = _payload_read_int(payload)
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
return cls(host, port, protocol_version, services, nonce, user_agent)
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
class Inv():
"""The inv message"""
"""The inv message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -176,11 +194,7 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -195,7 +209,7 @@ class Inv():
class GetData():
"""The getdata message"""
"""The getdata message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -212,11 +226,7 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -231,7 +241,7 @@ class GetData():
class Addr():
"""The addr message"""
"""The addr message payload"""
def __init__(self, addresses):
self.addresses = addresses
@ -248,11 +258,8 @@ class Addr():
def from_message(cls, m):
payload = m.payload
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
# not validating addr_count
_, payload = _payload_read_int(payload)
addresses = set()
@ -261,3 +268,37 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -21,7 +21,7 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.1/'
user_agent = b'/MiNode:0.3.2/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250

View File

@ -46,7 +46,7 @@ class VarInt():
class Object():
"""object message"""
"""The 'object' message payload"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
@ -60,12 +60,19 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@classmethod
def from_message(cls, m):
"""Decode message payload"""
payload = m.payload
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
payload = payload[20:]
@ -80,6 +87,7 @@ class Object():
nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
@ -90,9 +98,11 @@ class Object():
return payload
def is_expired(self):
"""Check if object's TTL is expired"""
return self.expires_time + 3 * 3600 < time.time()
def is_valid(self):
"""Checks the object validity"""
if self.is_expired():
logging.debug(
'Invalid object %s, reason: expired',
@ -108,18 +118,16 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != 1:
if self.stream_number != shared.stream:
logging.warning(
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + h).digest()).digest()[:8], 'big')
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
@ -129,6 +137,7 @@ class Object():
return True
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
@ -138,6 +147,7 @@ class Object():
length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
"""Compute the initial hash for PoW"""
return hashlib.sha512(self.to_bytes()[8:]).digest()

View File

@ -3,10 +3,9 @@ import unittest
from binascii import unhexlify
from minode import message
from minode.shared import magic_bytes
MAGIC = 0xE9BEB4D9
# 500 identical peers:
# import ipaddress
# from hyperbit import net, packet
@ -14,26 +13,56 @@ MAGIC = 0xE9BEB4D9
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_data = unhexlify(
sample_addr_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# protocol.CreatePacket(b'ping', b'test')
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
def test_packet(self):
"""Check the packet created by message.Message()"""
head = unhexlify(b'%x' % MAGIC)
self.assertEqual(
message.Message(b'ping', b'').to_bytes()[:len(head)], head)
"""Check packet creation and parsing by message.Message"""
msg = message.Message(b'ping', b'test').to_bytes()
self.assertEqual(msg[:len(magic_bytes)], magic_bytes)
with self.assertRaises(ValueError):
# wrong magic
message.Message.from_bytes(msg[1:])
with self.assertRaises(ValueError):
# wrong length
message.Message.from_bytes(msg[:-1])
with self.assertRaises(ValueError):
# wrong checksum
message.Message.from_bytes(msg[:-1] + b'\x00')
msg = message.Message.from_bytes(sample_ping_msg)
self.assertEqual(msg.command, b'ping')
self.assertEqual(msg.payload, b'test')
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_data)
msg = message.Message(b'addr', sample_addr_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -41,3 +70,32 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,6 +1,7 @@
"""Blind tests, starting the minode process"""
import unittest
import signal
import socket
import subprocess
import sys
import tempfile
@ -8,6 +9,12 @@ import time
import psutil
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
@ -53,6 +60,12 @@ class TestProcessProto(unittest.TestCase):
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
@ -73,15 +86,10 @@ class TestProcess(TestProcessProto):
"""Check minode process connections"""
_started = time.time()
def connections():
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
def continue_check_limit(extra_time):
for t in range(extra_time * 2):
for _ in range(extra_time * 2):
self.assertLessEqual(
len(connections()),
len(self.connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
@ -90,8 +98,8 @@ class TestProcess(TestProcessProto):
' by --connection-limit')
time.sleep(1)
for t in range(self._wait_time * 2):
if len(connections()) > self._connection_limit / 2:
for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
@ -106,10 +114,33 @@ class TestProcess(TestProcessProto):
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
return self.fail(
'Listening while started with --no-incoming')
self.fail('Listening while started with --no-incoming')
return
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_connection_limit = 4
_wait_time = 120
_listen = True
_listening_port = 8448
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']

View File

@ -1,14 +1,40 @@
"""Tests for structures"""
import unittest
import base64
import logging
import queue
import struct
import time
import unittest
from binascii import unhexlify
from minode import structure
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
# for one of bootstrap servers, port 8080,
# everything else is like in test_message: 1626611891, 1, 1
sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -68,3 +94,80 @@ class TestStructure(unittest.TestCase):
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('0102030405060708090a0b0c0d0e0f10'))
addr = structure.NetAddr.from_bytes(sample_addr_data)
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
self.assertEqual(addr.port, 8080)
self.assertEqual(addr.stream, 1)
self.assertEqual(addr.services, 1)
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -39,5 +39,7 @@ omit =
[coverage:report]
ignore_errors = true
[MESSAGES CONTROL]
disable = invalid-name
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8