Compare commits

..

4 Commits

Author SHA1 Message Date
Lee Miller 652aaccfd6
Move s and state into the base 2023-08-02 04:43:25 +03:00
Dmitri Bogomolov 0d221317b7
Inherit I2P classes from base util.I2PThread() 2023-08-02 04:43:25 +03:00
Lee Miller ebfe9ebf9f
Install and start i2pd in buildbot 2023-08-02 04:43:25 +03:00
Dmitri Bogomolov e417bc2752
Try to test with i2pd:
- TestProcessI2P runs minode with i2p args with _connection_limit = 4
 - TestProcess waits for connections _wait_time sec (120 for TestProcessI2P)
2023-08-02 04:42:51 +03:00
14 changed files with 162 additions and 403 deletions

View File

@ -1,7 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -86,4 +86,4 @@ will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)

View File

@ -3,7 +3,6 @@
import base64
import errno
import logging
import math
import random
import select
import socket
@ -38,7 +37,7 @@ class Connection(threading.Thread):
self.vectors_to_get = set()
self.vectors_to_send = set()
self.vectors_requested = {}
self.vectors_requested = dict()
self.status = 'ready'
@ -65,7 +64,6 @@ class Connection(threading.Thread):
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
def run(self):
if self.s is None:
@ -110,7 +108,11 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
@ -135,13 +137,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'ping', b''))
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info(
logging.debug(
'Disconnected from %s:%s', self.host_print, self.port)
break
time.sleep(0.2)
@ -188,10 +190,7 @@ class Connection(threading.Thread):
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -251,8 +250,6 @@ class Connection(threading.Thread):
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
if len(shared.node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
@ -335,9 +332,7 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
version = message.Version.from_bytes(m.to_bytes())
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -422,12 +417,8 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
@ -435,14 +426,7 @@ class Connection(threading.Thread):
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys())
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
if self.vectors_to_get:
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
if len(self.vectors_to_get) > 64:

View File

@ -17,8 +17,6 @@ class I2PController(I2PThread):
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
while True:
if state.shutting_down:
return
try:
self.s = socket.create_connection((self.host, self.port))
break

View File

@ -37,7 +37,7 @@ class I2PDialer(I2PThread):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -45,5 +45,6 @@ class I2PDialer(I2PThread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning(
'Error while connecting to %s', self.destination)
self.success = False

View File

@ -2,9 +2,11 @@
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -20,7 +22,7 @@ def handler(s, f): # pylint: disable=unused-argument
shared.shutting_down = True
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
def parse_arguments():
"""Parsing arguments"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
@ -100,6 +102,56 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
shared.i2p_transient = True
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
try:
@ -145,7 +197,7 @@ def start_ip_listener():
'Error while starting IPv4 listener on port %s.'
' However the IPv6 one seems to be working'
' and will probably accept IPv4 connections.', # 48 on macos
shared.listening_port, exc_info=e.errno not in (48, 98))
shared.listening_port, exc_info=(e.errno not in (48, 98)))
else:
logging.warning(
'Error while starting IPv4 listener on port %s.'
@ -238,6 +290,8 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
@ -246,6 +300,18 @@ def main():
# so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager()
manager.start()

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
@ -26,11 +25,9 @@ class Manager(threading.Thread):
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
now = time.time()
@ -56,17 +53,12 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
if shared.objects[vector].is_expired():
with shared.objects_lock:
del shared.objects[vector]
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
@staticmethod
def manage_connections():
@ -146,59 +138,6 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
@ -248,5 +187,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
1, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -9,7 +9,7 @@ from . import shared, structure
class Header():
"""Message header structure"""
"""Common message header"""
def __init__(self, command, payload_length, payload_checksum):
self.command = command
self.payload_length = payload_length
@ -24,7 +24,6 @@ class Header():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -34,7 +33,6 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
@ -61,7 +59,6 @@ class Message():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
@ -70,7 +67,6 @@ class Message():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
@ -91,19 +87,12 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
"""The version message"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent, streams=None
user_agent=shared.user_agent
):
self.host = host
self.port = port
@ -112,9 +101,6 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -129,20 +115,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
1, self.host, self.port).to_bytes()
shared.services, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
self.services, '127.0.0.1', 8444).to_bytes()
shared.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_message(cls, m):
def from_bytes(cls, b):
m = Message.from_bytes(b)
payload = m.payload
( # unused: timestamp, net_addr_local
@ -156,28 +142,24 @@ class Version():
payload = payload[80:]
user_agent_length, payload = _payload_read_int(payload)
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
return cls(host, port, protocol_version, services, nonce, user_agent)
class Inv():
"""The inv message payload"""
"""The inv message"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -194,7 +176,11 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -209,7 +195,7 @@ class Inv():
class GetData():
"""The getdata message payload"""
"""The getdata message"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -226,7 +212,11 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -241,7 +231,7 @@ class GetData():
class Addr():
"""The addr message payload"""
"""The addr message"""
def __init__(self, addresses):
self.addresses = addresses
@ -258,8 +248,11 @@ class Addr():
def from_message(cls, m):
payload = m.payload
# not validating addr_count
_, payload = _payload_read_int(payload)
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
addresses = set()
@ -268,37 +261,3 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -21,7 +21,7 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.2/'
user_agent = b'/MiNode:0.3.1/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250

View File

@ -46,7 +46,7 @@ class VarInt():
class Object():
"""The 'object' message payload"""
"""object message"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
@ -60,19 +60,12 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@classmethod
def from_message(cls, m):
"""Decode message payload"""
payload = m.payload
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
payload = payload[20:]
@ -87,7 +80,6 @@ class Object():
nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
@ -98,11 +90,9 @@ class Object():
return payload
def is_expired(self):
"""Check if object's TTL is expired"""
return self.expires_time + 3 * 3600 < time.time()
def is_valid(self):
"""Checks the object validity"""
if self.is_expired():
logging.debug(
'Invalid object %s, reason: expired',
@ -118,16 +108,18 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != shared.stream:
if self.stream_number != 1:
logging.warning(
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
self.nonce + h).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
@ -137,7 +129,6 @@ class Object():
return True
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
@ -147,7 +138,6 @@ class Object():
length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
"""Compute the initial hash for PoW"""
return hashlib.sha512(self.to_bytes()[8:]).digest()

View File

@ -3,9 +3,10 @@ import unittest
from binascii import unhexlify
from minode import message
from minode.shared import magic_bytes
MAGIC = 0xE9BEB4D9
# 500 identical peers:
# import ipaddress
# from hyperbit import net, packet
@ -13,56 +14,26 @@ from minode.shared import magic_bytes
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_addr_data = unhexlify(
sample_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# protocol.CreatePacket(b'ping', b'test')
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
def test_packet(self):
"""Check packet creation and parsing by message.Message"""
msg = message.Message(b'ping', b'test').to_bytes()
self.assertEqual(msg[:len(magic_bytes)], magic_bytes)
with self.assertRaises(ValueError):
# wrong magic
message.Message.from_bytes(msg[1:])
with self.assertRaises(ValueError):
# wrong length
message.Message.from_bytes(msg[:-1])
with self.assertRaises(ValueError):
# wrong checksum
message.Message.from_bytes(msg[:-1] + b'\x00')
msg = message.Message.from_bytes(sample_ping_msg)
self.assertEqual(msg.command, b'ping')
self.assertEqual(msg.payload, b'test')
"""Check the packet created by message.Message()"""
head = unhexlify(b'%x' % MAGIC)
self.assertEqual(
message.Message(b'ping', b'').to_bytes()[:len(head)], head)
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_addr_data)
msg = message.Message(b'addr', sample_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -70,32 +41,3 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -60,12 +60,6 @@ class TestProcessProto(unittest.TestCase):
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
@ -86,10 +80,15 @@ class TestProcess(TestProcessProto):
"""Check minode process connections"""
_started = time.time()
def connections():
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
def continue_check_limit(extra_time):
for _ in range(extra_time * 2):
for t in range(extra_time * 2):
self.assertLessEqual(
len(self.connections()),
len(connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
@ -98,8 +97,8 @@ class TestProcess(TestProcessProto):
' by --connection-limit')
time.sleep(1)
for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2:
for t in range(self._wait_time * 2):
if len(connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
@ -114,8 +113,8 @@ class TestProcess(TestProcessProto):
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
self.fail('Listening while started with --no-incoming')
return
return self.fail(
'Listening while started with --no-incoming')
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
@ -131,16 +130,3 @@ class TestProcessI2P(TestProcess):
_wait_time = 120
_listen = True
_listening_port = 8448
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']

View File

@ -1,40 +1,14 @@
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
import struct
from binascii import unhexlify
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
# for one of bootstrap servers, port 8080,
# everything else is like in test_message: 1626611891, 1, 1
sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
from minode import structure
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -94,80 +68,3 @@ class TestStructure(unittest.TestCase):
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('0102030405060708090a0b0c0d0e0f10'))
addr = structure.NetAddr.from_bytes(sample_addr_data)
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
self.assertEqual(addr.port, 8080)
self.assertEqual(addr.stream, 1)
self.assertEqual(addr.services, 1)
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -39,7 +39,5 @@ omit =
[coverage:report]
ignore_errors = true
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8
[MESSAGES CONTROL]
disable = invalid-name