Compare commits

..

No commits in common. "v0.3" and "experimental" have entirely different histories.

37 changed files with 522 additions and 1967 deletions

View File

@ -1,21 +0,0 @@
FROM ubuntu:jammy
RUN apt-get update
RUN apt-get install -yq software-properties-common
RUN apt-add-repository ppa:purplei2p/i2pd && apt-get update -qq
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python-is-python3 python3.11-dev python3.11-venv
RUN apt-get install -yq --no-install-suggests --no-install-recommends sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
RUN pip install setuptools wheel
RUN pip install --upgrade pip tox virtualenv
ADD . .
CMD .buildbot/ubuntu/build.sh && .buildbot/ubuntu/test.sh

View File

@ -1,3 +0,0 @@
#!/bin/sh
sudo service i2pd start

View File

@ -1,4 +0,0 @@
#!/bin/sh
tox -e lint-basic || exit 1
tox

View File

@ -1,3 +0,0 @@
.git
.tox
dist

View File

@ -1,20 +0,0 @@
name: Testing
on: [push]
jobs:
default:
runs-on: ubuntu-20.04
steps:
- name: Install dependencies
run: |
apt-get update
apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3-venv python-is-python3
pip install setuptools wheel
pip install --upgrade pip tox virtualenv
- name: Check out repository code
uses: actions/checkout@v3
- name: Quick lint
run: tox -e lint-basic
- name: Run tests
run: tox

View File

@ -1,7 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -1,3 +0,0 @@
include LICENSE
include README.md
include *.sh

View File

@ -1,9 +1,5 @@
# MiNode
[![Testing](https://git.bitmessage.org/Bitmessage/MiNode/actions/workflows/test.yml/badge.svg)](https://git.bitmessage.org/Bitmessage/MiNode/actions?workflow=test.yml)
Python 3 implementation of the Bitmessage protocol. Designed only to route
objects inside the network.
Python 3 implementation of the Bitmessage protocol. Designed only to route objects inside the network.
## Requirements
- python3 (or pypy3)
@ -11,16 +7,16 @@ objects inside the network.
## Running
```
git clone https://git.bitmessage.org/Bitmessage/MiNode.git
git clone https://github.com/TheKysek/MiNode.git
```
```
cd MiNode
chmod +x start.sh
./start.sh
```
It is worth noting that the `start.sh` script no longer tries to do a
`git pull` in order to update to the latest version.
Is is now done by the `update.sh` script.
It is worth noting that the `start.sh` file MiNode no longer tries to do a `git pull` in order to update to the latest version.
Is is now done by the `update.sh` file.
## Command line
```
@ -30,7 +26,6 @@ usage: main.py [-h] [-p PORT] [--host HOST] [--debug] [--data-dir DATA_DIR]
[--connection-limit CONNECTION_LIMIT] [--i2p]
[--i2p-tunnel-length I2P_TUNNEL_LENGTH]
[--i2p-sam-host I2P_SAM_HOST] [--i2p-sam-port I2P_SAM_PORT]
[--i2p-transient]
optional arguments:
-h, --help show this help message and exit
@ -52,25 +47,19 @@ optional arguments:
Host of I2P SAMv3 bridge
--i2p-sam-port I2P_SAM_PORT
Port of I2P SAMv3 bridge
--i2p-transient Generate new I2P destination on start
```
## I2P support
MiNode has support for connections over I2P network.
To use it it needs an I2P router with SAMv3 activated
(both Java I2P and i2pd are supported). Keep in mind that I2P connections
are slow and full synchronization may take a while.
To use it it needs an I2P router with SAMv3 activated (both Java I2P and i2pd are supported).
Keep in mind that I2P connections are slow and full synchronization may take a while.
### Examples
Connect to both IP and I2P networks (SAM bridge on default host and port
127.0.0.1:7656) and set tunnel length to 3 (default is 2).
Connect to both IP and I2P networks (SAM bridge on default host and port 127.0.0.1:7656) and set tunnel length to 3 (default is 2).
```
$ ./start.sh --i2p --i2p-tunnel-length 3
```
Connect only to I2P network and listen for IP connections only from local
machine.
Connect only to I2P network and listen for IP connections only from local machine.
```
$ ./start.sh --i2p --no-ip --host 127.0.0.1
```
@ -78,12 +67,10 @@ or
```
$ ./i2p_bridge.sh
```
If you add `trustedpeer = 127.0.0.1:8444` to `keys.dat` file in PyBitmessage it
will allow you to use it anonymously over I2P with MiNode acting as a bridge.
If you add `trustedpeer = 127.0.0.1:8444` to `keys.dat` file in PyBitmessage it will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Contact
- lee.miller: BM-2cX1pX2goWAuZB5bLqj17x23EFjufHmygv
- TheKysek: BM-2cVUMXVnQXmTJDmb7q1HUyEqkT92qjwGvJ
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)

View File

@ -1,11 +0,0 @@
#!/bin/sh
DOCKERFILE=.buildbot/ubuntu/Dockerfile
docker build -t minode/tox -f $DOCKERFILE .
if [ $? -gt 0 ]; then
docker build --no-cache -t minode/tox -f $DOCKERFILE .
fi
docker run --rm -it minode/tox

View File

@ -1,2 +1,2 @@
#!/bin/sh
python3 -m minode.main --i2p --no-ip --host 127.0.0.1 "$@"
python3 minode/main.py --i2p --no-ip --host 127.0.0.1 "$@"

View File

@ -1,15 +1,12 @@
"""
Advertiser thread advertises new addresses and objects among all connections
"""
import logging
import threading
import time
from . import message, shared
import message
import shared
class Advertiser(threading.Thread):
"""The advertiser thread"""
def __init__(self):
super().__init__(name='Advertiser')
@ -38,8 +35,7 @@ class Advertiser(threading.Thread):
while not shared.address_advertise_queue.empty():
addr = shared.address_advertise_queue.get()
if addr.port == 'i2p':
# We should not try to construct Addr messages
# with I2P destinations (yet)
# We should not try to construct Addr messages with I2P destinations (yet)
continue
addresses_to_advertise.add(addr)
if len(addresses_to_advertise) > 0:

View File

@ -1,9 +1,7 @@
# -*- coding: utf-8 -*-
"""The logic and behaviour of a single connection"""
import base64
import errno
import logging
import math
import random
import select
import socket
@ -12,18 +10,13 @@ import threading
import queue
import time
from . import message, shared, structure
import message
import shared
import structure
class ConnectionBase(threading.Thread):
"""
Common code for the connection thread
with minimum command handlers to reuse
"""
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
):
class Connection(threading.Thread):
def __init__(self, host, port, s=None, network='ip', server=False, i2p_remote_dest=b''):
self.host = host
self.port = port
self.network = network
@ -41,7 +34,7 @@ class ConnectionBase(threading.Thread):
self.vectors_to_get = set()
self.vectors_to_send = set()
self.vectors_requested = {}
self.vectors_requested = dict()
self.status = 'ready'
@ -68,7 +61,6 @@ class ConnectionBase(threading.Thread):
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
def run(self):
if self.s is None:
@ -82,10 +74,7 @@ class ConnectionBase(threading.Thread):
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
while True:
if (
self.on_connection_fully_established_scheduled
and not (self.buffer_send or self.buffer_receive)
):
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
self._on_connection_fully_established()
data = True
try:
@ -95,8 +84,7 @@ class ConnectionBase(threading.Thread):
if data and len(self.buffer_receive) < 4000000:
continue
else:
data = self.s.recv(
self.next_message_size - len(self.buffer_receive))
data = self.s.recv(self.next_message_size - len(self.buffer_receive))
self.buffer_receive += data
except ssl.SSLWantReadError:
if self.status == 'fully_established':
@ -104,71 +92,49 @@ class ConnectionBase(threading.Thread):
self._send_objects()
except socket.error as e:
err = e.args[0]
if err in (errno.EAGAIN, errno.EWOULDBLOCK):
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
if self.status == 'fully_established':
self._request_objects()
self._send_objects()
else:
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
data = None
except ConnectionResetError:
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host_print, self.port))
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
'Disconnecting from %s:%s. Reason:'
' time.time() - self.last_message_received'
' > shared.timeout', self.host_print, self.port)
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
self.host_print, self.port))
self.status = 'disconnecting'
if (
time.time() - self.last_message_received > 30
and self.status != 'fully_established'
and self.status != 'disconnecting'
):
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
logging.debug(
'Disconnecting from %s:%s. Reason:'
' time.time() - self.last_message_received > 30'
' and self.status != "fully_established"',
self.host_print, self.port)
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
self.host_print, self.port))
self.status = 'disconnecting'
if (
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'ping', b''))
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info(
'Disconnected from %s:%s', self.host_print, self.port)
logging.info('Disconnected from {}:{}'.format(self.host_print, self.port))
break
time.sleep(0.2)
def _connect(self):
peer_str = '{0.host_print}:{0.port}'.format(self)
logging.debug('Connecting to %s', peer_str)
logging.debug('Connecting to {}:{}'.format(self.host_print, self.port))
try:
self.s = socket.create_connection((self.host, self.port), 10)
self.status = 'connected'
logging.debug('Established TCP connection to %s', peer_str)
except socket.timeout:
pass
except OSError as e:
# unreachable, refused, no route
(logging.info if e.errno not in (101, 111, 113)
else logging.debug)(
'Connection to %s failed. Reason: %s', peer_str, e)
except Exception:
logging.info(
'Connection to %s failed.', peer_str, exc_info=True)
if self.status != 'connected':
logging.info('Established TCP connection to {}:{}'.format(self.host_print, self.port))
except Exception as e:
logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'failed'
def _send_data(self):
@ -178,48 +144,27 @@ class ConnectionBase(threading.Thread):
self.buffer_send = self.buffer_send[amount:]
except (BlockingIOError, ssl.SSLWantWriteError):
pass
except (
BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError
) as e:
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
except (BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError) as e:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'disconnecting'
def _do_tls_handshake(self):
logging.debug(
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
logging.debug('Initializing TLS connection with {}:{}'.format(self.host_print, self.port))
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
if (
ssl.OPENSSL_VERSION_NUMBER >= 0x10100000
and not ssl.OPENSSL_VERSION.startswith("LibreSSL")
): # OpenSSL>=1.1
if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not ssl.OPENSSL_VERSION.startswith("LibreSSL"):
# OpenSSL>=1.1
context.set_ciphers('AECDH-AES256-SHA@SECLEVEL=0')
else:
context.set_ciphers('AECDH-AES256-SHA')
context.set_ecdh_curve("secp256k1")
context.options = (
ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
| ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE)
# OP_NO_SSL* is deprecated since 3.6
try:
# TODO: ssl.TLSVersion.TLSv1 is deprecated
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1_2
except AttributeError:
pass
context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE
self.s = context.wrap_socket(
self.s, server_side=self.server, do_handshake_on_connect=False)
self.s = context.wrap_socket(self.s, server_side=self.server, do_handshake_on_connect=False)
while True:
try:
@ -230,64 +175,40 @@ class ConnectionBase(threading.Thread):
except ssl.SSLWantWriteError:
select.select([], [self.s], [])
except Exception as e:
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'disconnecting'
if isinstance(e, ssl.SSLError): # pylint: disable=no-member
logging.debug('ssl.SSLError reason: %s', e.reason)
shared.node_pool.discard((self.host, self.port))
return
break
self.tls = True
logging.debug(
'Established TLS connection with %s:%s (%s)',
self.host_print, self.port, self.s.version())
logging.debug('Established TLS connection with {}:{}'.format(self.host_print, self.port))
def _send_message(self, m):
if isinstance(m, message.Message) and m.command == b'object':
logging.debug(
'%s:%s <- %s',
self.host_print, self.port, structure.Object.from_message(m))
if type(m) == message.Message and m.command == b'object':
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, structure.Object.from_message(m)))
else:
logging.debug('%s:%s <- %s', self.host_print, self.port, m)
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, m))
self.buffer_send += m.to_bytes()
def _on_connection_fully_established(self):
logging.info(
'Established Bitmessage protocol connection to %s:%s',
self.host_print, self.port)
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host_print, self.port))
self.on_connection_fully_established_scheduled = False
if self.remote_version.services & 2 and self.network == 'ip':
self._do_tls_handshake() # NODE_SSL
if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL
self._do_tls_handshake()
addr = {
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'}
if len(shared.node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
for a in random.sample(tuple(shared.node_pool), 10)})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)})
if len(shared.unchecked_node_pool) > 10:
addr.update({
structure.NetAddr(1, a[0], a[1])
for a in random.sample(tuple(shared.unchecked_node_pool), 10)})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)})
if len(addr) != 0:
self.send_queue.put(message.Addr(addr))
with shared.objects_lock:
if len(shared.objects) > 0:
to_send = {
vector for vector in shared.objects.keys()
if shared.objects[vector].expires_time > time.time()}
to_send = {vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()}
while len(to_send) > 0:
if len(to_send) > 10000:
# We limit size of inv messaged to 10000 entries
# because they might time out
# in very slow networks (I2P)
pack = random.sample(tuple(to_send), 10000)
# We limit size of inv messaged to 10000 entries because they might time out in very slow networks (I2P)
pack = random.sample(to_send, 10000)
self.send_queue.put(message.Inv(pack))
to_send.difference_update(pack)
else:
@ -313,152 +234,131 @@ class ConnectionBase(threading.Thread):
if self.next_header:
self.next_header = False
try:
h = message.Header.from_bytes(
self.buffer_receive[:shared.header_length])
h = message.Header.from_bytes(self.buffer_receive[:shared.header_length])
except ValueError as e:
self.status = 'disconnecting'
logging.warning(
'Received malformed message from %s:%s: %s',
self.host_print, self.port, e)
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
break
self.next_message_size += h.payload_length
else:
try:
m = message.Message.from_bytes(
self.buffer_receive[:self.next_message_size])
m = message.Message.from_bytes(self.buffer_receive[:self.next_message_size])
except ValueError as e:
self.status = 'disconnecting'
logging.warning(
'Received malformed message from %s:%s, %s',
self.host_print, self.port, e)
logging.warning('Received malformed message from {}:{}, {}'.format(self.host_print, self.port, e))
break
self.next_header = True
self.buffer_receive = self.buffer_receive[
self.next_message_size:]
self.buffer_receive = self.buffer_receive[self.next_message_size:]
self.next_message_size = shared.header_length
self.last_message_received = time.time()
try:
self._process_message(m)
except ValueError as e:
self.status = 'disconnecting'
logging.warning(
'Received malformed message from %s:%s: %s',
self.host_print, self.port, e)
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
break
def _process_message(self, m):
if m.command == b'verack':
self.verack_received = True
logging.debug(
'%s:%s -> %s', self.host_print, self.port, 'verack')
if self.server:
self.send_queue.put('fully_established')
elif m.command == b'ping':
logging.debug('%s:%s -> ping', self.host_print, self.port)
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
else:
try:
getattr(self, '_process_msg_{}'.format(m.command.decode()))(m)
except (AttributeError, UnicodeDecodeError):
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
def _process_msg_version(self, m):
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, str(version)))
if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce:
self.status = 'disconnecting'
self.send_queue.put(None)
else:
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
def _process_msg_addr(self, m):
elif m.command == b'verack':
self.verack_received = True
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
if self.server:
self.send_queue.put('fully_established')
elif m.command == b'inv':
inv = message.Inv.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, inv))
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
elif m.command == b'object':
obj = structure.Object.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if obj.object_type == shared.i2p_dest_obj_type and obj.version == shared.i2p_dest_obj_version:
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
self.vectors_to_send.update(getdata.vectors)
elif m.command == b'addr':
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
for a in addr.addresses:
if (a.host, a.port) not in shared.core_nodes:
shared.unchecked_node_pool.add((a.host, a.port))
elif m.command == b'ping':
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
else:
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
self.vectors_to_get.difference_update(shared.objects.keys())
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
if self.vectors_to_get:
if len(self.vectors_to_get) > 64:
pack = random.sample(tuple(self.vectors_to_get), 64)
pack = random.sample(self.vectors_to_get, 64)
self.send_queue.put(message.GetData(pack))
self.vectors_requested.update({
vector: time.time() for vector in pack
if vector not in self.vectors_requested})
self.vectors_requested.update({vector: time.time() for vector in pack if vector not in self.vectors_requested})
self.vectors_to_get.difference_update(pack)
else:
self.send_queue.put(message.GetData(self.vectors_to_get))
self.vectors_requested.update({
vector: time.time() for vector in self.vectors_to_get
if vector not in self.vectors_requested})
self.vectors_requested.update({vector: time.time() for vector in self.vectors_to_get if vector not in self.vectors_requested})
self.vectors_to_get.clear()
if self.vectors_requested:
self.vectors_requested = {
vector: t for vector, t in self.vectors_requested.items()
if vector not in shared.objects and t > time.time() - 15 * 60}
to_re_request = {
vector for vector, t in self.vectors_requested.items()
if t < time.time() - 10 * 60}
self.vectors_requested = {vector: t for vector, t in self.vectors_requested.items() if vector not in shared.objects and t > time.time() - 15 * 60}
to_re_request = {vector for vector, t in self.vectors_requested.items() if t < time.time() - 10 * 60}
if to_re_request:
self.vectors_to_get.update(to_re_request)
logging.info(
'Re-requesting %i objects from %s:%s',
len(to_re_request), self.host_print, self.port)
logging.debug('Re-requesting {} objects from {}:{}'.format(len(to_re_request), self.host_print, self.port))
def _send_objects(self):
if self.vectors_to_send:
logging.info(
'Preparing to send %s objects', len(self.vectors_to_send))
if len(self.vectors_to_send) > 16:
to_send = random.sample(tuple(self.vectors_to_send), 16)
to_send = random.sample(self.vectors_to_send, 16)
self.vectors_to_send.difference_update(to_send)
else:
to_send = self.vectors_to_send.copy()
@ -467,54 +367,4 @@ class ConnectionBase(threading.Thread):
for vector in to_send:
obj = shared.objects.get(vector, None)
if obj:
self.send_queue.put(
message.Message(b'object', obj.to_bytes()))
class Connection(ConnectionBase):
"""The connection with all commands implementation"""
def _process_msg_inv(self, m):
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
def _process_msg_object(self, m):
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m):
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
class Bootstrapper(ConnectionBase):
"""A special type of connection to find IP nodes"""
def _process_msg_addr(self, m):
super()._process_msg_addr(m)
shared.node_pool.discard((self.host, self.port))
self.status = 'disconnecting'
self.send_queue.put(None)
shared.connection = Connection
self.send_queue.put(message.Message(b'object', obj.to_bytes()))

View File

@ -1,6 +0,0 @@
"""A package for working with I2P"""
from .controller import I2PController
from .dialer import I2PDialer
from .listener import I2PListener
__all__ = ["I2PController", "I2PDialer", "I2PListener"]

View File

@ -3,28 +3,27 @@ import base64
import logging
import os
import socket
import threading
import time
from .util import I2PThread, pub_from_priv
from i2p.util import receive_line, pub_from_priv
import shared
class I2PController(I2PThread):
def __init__(self, state, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(state, name='I2P Controller')
class I2PController(threading.Thread):
def __init__(self, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(name='I2P Controller')
self.host = host
self.port = port
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
while True:
if state.shutting_down:
return
try:
self.s = socket.create_connection((self.host, self.port))
break
except ConnectionRefusedError:
logging.error(
'Error while connecting to I2P SAM bridge. Retrying.')
logging.error("Error while connecting to I2P SAM bridge. Retrying.")
time.sleep(10)
self.version_reply = []
@ -41,6 +40,15 @@ class I2PController(I2PThread):
self.create_session()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PController <- ' + str(line))
return line
def _send(self, command):
# logging.debug('I2PController -> ' + str(command))
self.s.sendall(command)
def init_connection(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
@ -62,23 +70,21 @@ class I2PController(I2PThread):
assert self.dest_priv
def create_session(self):
self._send(
b'SESSION CREATE STYLE=STREAM ID=' + self.nick
+ b' inbound.length=' + str(self.state.i2p_tunnel_length).encode()
+ b' outbound.length=' + str(self.state.i2p_tunnel_length).encode()
+ b' DESTINATION=' + self.dest_priv + b'\n')
self._send(b'SESSION CREATE STYLE=STREAM ID=' + self.nick +
b' inbound.length=' + str(shared.i2p_tunnel_length).encode() +
b' outbound.length=' + str(shared.i2p_tunnel_length).encode() +
b' DESTINATION=' + self.dest_priv + b'\n')
reply = self._receive_line().split()
if b'RESULT=OK' not in reply:
logging.warning(reply)
logging.warning(
'We could not create I2P session, retrying in 5 seconds.')
logging.warning('We could not create I2P session, retrying in 5 seconds.')
time.sleep(5)
self.create_session()
def run(self):
self.s.settimeout(1)
while True:
if not self.state.shutting_down:
if not shared.shutting_down:
try:
msg = self._receive_line().split(b' ')
if msg[0] == b'PING':

View File

@ -1,22 +1,22 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import I2PThread
import shared
from connection import Connection
from i2p.util import receive_line
class I2PDialer(I2PThread):
def __init__(
self, state, destination, nick, sam_host='127.0.0.1', sam_port=7656
):
class I2PDialer(threading.Thread):
def __init__(self, destination, nick, sam_host='127.0.0.1', sam_port=7656):
self.sam_host = sam_host
self.sam_port = sam_port
self.nick = nick
self.destination = destination
super().__init__(state, name='I2P Dial to {}'.format(self.destination))
super().__init__(name='I2P Dial to {}'.format(self.destination))
self.s = socket.create_connection((self.sam_host, self.sam_port))
@ -24,26 +24,31 @@ class I2PDialer(I2PThread):
self.success = True
def run(self):
logging.debug('Connecting to %s', self.destination)
logging.debug('Connecting to {}'.format(self.destination))
self._connect()
if not self.state.shutting_down and self.success:
c = self.state.connection(
self.destination, 'i2p', self.s, 'i2p',
False, self.destination)
if not shared.shutting_down and self.success:
c = Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination)
c.start()
self.state.connections.add(c)
shared.connections.add(c)
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PDialer <- ' + str(line))
return line
def _send(self, command):
# logging.debug('I2PDialer -> ' + str(command))
self.s.sendall(command)
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to {}'.format(self.destination))
self.success = False
self._send(
b'STREAM CONNECT ID=' + self.nick + b' DESTINATION='
+ self.destination + b'\n')
self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to {}'.format(self.destination))
self.success = False

View File

@ -1,22 +1,36 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from .util import I2PThread
from connection import Connection
from i2p.util import receive_line
import shared
class I2PListener(I2PThread):
def __init__(self, state, nick, host='127.0.0.1', port=7656):
super().__init__(state, name='I2P Listener')
class I2PListener(threading.Thread):
def __init__(self, nick, host='127.0.0.1', port=7656):
super().__init__(name='I2P Listener')
self.host = host
self.port = port
self.nick = nick
self.s = None
self.version_reply = []
self.new_socket()
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- ' + str(line))
return line
def _send(self, command):
# logging.debug('I2PListener -> ' + str(command))
self.s.sendall(command)
def new_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
@ -30,26 +44,23 @@ class I2PListener(I2PThread):
self.s.settimeout(1)
def run(self):
while not self.state.shutting_down:
while not shared.shutting_down:
try:
destination = self._receive_line().split()[0]
logging.info(
'Incoming I2P connection from: %s', destination.decode())
logging.info('Incoming I2P connection from: {}'.format(destination.decode()))
hosts = set()
for c in self.state.connections.copy():
for c in shared.connections.copy():
hosts.add(c.host)
for d in self.state.i2p_dialers.copy():
for d in shared.i2p_dialers.copy():
hosts.add(d.destination)
if destination in hosts:
logging.debug('Rejecting duplicate I2P connection.')
self.s.close()
else:
c = self.state.connection(
destination, 'i2p', self.s, 'i2p', True, destination)
c = Connection(destination, 'i2p', self.s, 'i2p', True, destination)
c.start()
self.state.connections.add(c)
c = None
shared.connections.add(c)
self.new_socket()
except socket.timeout:
pass

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import threading
def receive_line(s):
@ -15,35 +14,13 @@ def receive_line(s):
return data[0]
class I2PThread(threading.Thread):
"""
Abstract I2P thread with _receive_line() and _send() methods,
reused in I2PDialer, I2PListener and I2PController
"""
def __init__(self, state, name=''):
super().__init__(name=name)
self.state = state
self.s = None
def _receive_line(self):
line = receive_line(self.s)
# logging.debug('I2PListener <- %s', line)
return line
def _send(self, command):
# logging.debug('I2PListener -> %s', command)
self.s.sendall(command)
def pub_from_priv(priv):
priv = base64.b64decode(priv, altchars=b'-~')
# 256 for public key + 128 for signing key + 3 for certificate header
# + value of bytes priv[385:387]
# 256 for public key + 128 for signing key + 3 for certificate header + value of bytes priv[385:387]
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
return base64.b64encode(pub, altchars=b'-~')
pub = base64.b64encode(pub, altchars=b'-~')
return pub
def b32_from_pub(pub):
return base64.b32encode(
hashlib.sha256(base64.b64decode(pub, b'-~')).digest()
).replace(b'=', b'').lower() + b'.b32.i2p'
return base64.b32encode(hashlib.sha256(base64.b64decode(pub, b'-~')).digest()).replace(b"=", b"").lower() + b'.b32.i2p'

View File

@ -1,5 +1,4 @@
IPHBFm1bfQ9HrUkq07aomTAGn~W1wChE53xprAqIftsF18cuoUCJbMYhdJl~pljhvAXHKDSePdsSWecg8yP3st0Ib0h429XaOdrxpoFJ6MI1ofkg-KFtnZ6sX~Yp5GD-z-Nqdu6H0YBlf~y18ToOT6vTUvyE5Jsb105LmRMUAP0pDon4-da9r2wD~rxGOuvkrT83CftfxAIIT1z3M6ouAFI3UBq-guEyiZszM-01yQ-IgVBXsvnou8DXrlysYeeaimL6LoLhJgTnXIDfHCfUsHbgYK0JvRdimu-eMs~BRTT7-o4N5RJjVDfsS4CUHa6JwuWYg3JNSfaJoGFlM2xeGjNSJUs5e7PkcXeqCTKZQERbdIJcFz~rGcTfvc-OfXjMf6VfU2XORKcYiA21zkHMOkQvmE1dATP8VpQTKcYYZrQrRAc5Wxn7ayf9Gdwtq0EZXeydZv36RVJ03E4CZUGQMxXOFGUXwLFXQ9QCbsbXSoukd3rAGoPgE~GboO1YJh3hAAAA
lSf~Ut81pZVxDwteIXobR7G7qpnZz2eirvyKJgDFMYuOvXZVNA8bgA5qNhXR8lmOlCBCzKsfm-KIo0QndfRZhMYsFXHTxWBiF9SvPPF8c220l-c0s7cCFQwTdJ5UwZciOexsvNxBLv~1GN2DdMEgEJeUVmLvIaynzQuNgWMvr9AVo6rox2x88FZWT8kdZ2Nt0fiBm-UEd5TpZcK4q4U80t1giuqVeJPqWZjhBV-ctVLeGQC5fp~v3Ev2UeCH~43Z5olrFcCWsVL8vxc4wzMWrVZbi95f1gLW1kQ~sgV8fV~G4JanYLV5yRePC19i5VTkvtcq4HN2cEq~BKryP3AxR7m4Msqwe4gTeNEJE9D6jCmaGjZywujCTFUKo3eTbwCr7sMqZcZGzBPIBcK5syzJL05BMX16pP0ziXJmP-KZT1iEjdY3DIGHJ~mfa2lPNxuwceERnHv3BDyNvo0S0AbEQuJdqmdwFWWHL6mfo9uNYaShMYQmy38O3t6nZP9YOO~YBQAEAAcAAA==
SIMipBxbRtEfnWaI5Kx31scSP82trKfA~JVTmd1i6b3gj8QNcBeIMpc1ZVJp4B338mH8pCqeqcS-nlZdMd7uNqCsfQXFZ-bDvM6-qsKwbacM-nLA84UYqgDKMWHHrAbapoyQWVIF5JZzidiyQndunGVBvC1KrTp0Bghn9IXifhAt-EWiWC8JG6QrCreghtK5VQxr0jLcznFbLPUu8vsBvjWripLvnWSaHKK55DClQE85k5bwiFwdbJgg04H2Ku9Zzndg~g01o-iW4IsaEOtRYbjYlTIVpCClbVdlca2zQvQbfc8EkqYZHYHNDOxWsxJPjDv5sDK1Z70pyiSweOuDl27N-moM0pFDBelNSJdzFJl1uo5SN0iMH4xMUfNEEF94IGpxaUDMt2WRrpY2ivIj1pM1byu6uh7lOlGV2XdtFIiZZyIDD6hHeA29CsvwsYKJfYmZu73TRJ7lc2dXqwQe54prt~GyfL5q1-wRZ-5ic3T4~YdrsJ5X6h4iHztHDawwBQAEAAcAAA==
71w249Tlf0-ImX3S5zWA3RC8zfZY7dBY8C-3Bll4yHqJT7YZwmR0u2dxe9Xi7gFN8Q0mhUBtjuNJPwL8XjiwbOPeTGytc5xazmlxI~HcZoY32YBSiushex0sr0VnJOEaa~UduhNRY7X12atmhoSOXWKJ-R9YebWQy9eDvLZinX~ytFCttl9xpxyo5WvqrC6~KTMHjA2xHALhSp5FhLf9fRWCCPl7GRY-7kHuWpa7WVs~P2pe~Uux0RTbJzLH8gwmtp-jTgtisef7mr7l9e~MQ3k3dAO0i~ifXhoBSWrSRUgprwjpmlfm3O85tBSLMsgSJK-TFxtlo-iBCMLaJSZamurOx52uX-QO7WlWaX~S93ELXMd1B0wS~PKtomyRt7i67IFDfPde0FvF~YBxEKIgy3N8Sw4S4W3AqZBIaFMM8-q5x9R08t6jOA5WbdUq7IVjSimecM~AKVNyYCShoxn3pe10bj4Xmc~sU7sDYhTI7bmR~WYYaY1NVMlBrm-Fao4MBQAEAAcAAA==
BAPHJe7Gxr4bwrmJrPFKUSvxuFXqGxbno1NunoKccVklLqWxcfjrpBjVFp8OSKooc-89t-N5syP61YWw2DcL~gm0Z7NOtw9hFaPIa2ooS3an8FhnpX7~2etA-oxG1y-3pKMAMu--b443EHV3u1qaBR33azE5-GQn1qvKjoLGWi-nOZk2ogeF0o2pn6VVG-sXrlQByAF7TsolqrDOPI3P3RaRg09UK06y8CASHIr4yKsmrajO1~hnjpfrqwK1N0Y5DSCs4RXAlM5RPxf4mw116-hCGIJN30xnW2BSOaHHuA5U7Oaj-wbTdXICFpYi0M5sigogMeE1wUNB-vNcpc-hcl44BgBf-JOnWOsFxHTy2TGxkR31ockdoKjeEPOm34wVV2wNxCSGHzO25yW9PaoNdjKeplbQP~PrrRrlrTDXWEL7ZxqF6QOlCSO1UFV8HaPtqTMC13JMpX4PeAkeChtTgGQ1Pw2cQWFDBUwi2Z3x8na5MK8QNdIO2p4VYmJmZ8wgBQAEAAcAAA==
kSDy6eL1pVybvIz6yYzlwz7nqBSgjPQV-YiRHygWFl8r1s60p1vSSfurkPqn8JSaopV~zgQpt5CMK8XxOv9L4NP0cfTAfvY3wCKXb~BbuBGXAXcdh-oSAQ65nXP3rpJ7g-TcdUbqYOhJVeKskHRu6Uv~ZTQyEM23rlI638bWNImy5f9bGw9ff-Fb5xj5IYjNNTWYvXmnB2GvP4TZZMRubGBauByWDDfPVg~0et7UOd7RwcnxTfpygKX41EZOJc05G6A4uBgMJjWQK6RjRa30YJ9M4nwR2xUYLb9y6IAaOZEc0khKjYbUp8KxcaG7spqnMogJR~xgWhRn~lV7b9PVsDL-0vDRuIunG5IGLU~pfviUCiv9H4mNiYft2GVvoJhRQLxWSlibJqmHzrXIE1741qX0NZkX9O9zI2gYG1Yw~t4xqdlVJYtBGMrBsRye0gBbImHhEcKo396yrz3~aZdqXiPNgisamx9tj485RgyO-JCp7NJ6WQ4cZmg6DIlNv-JZAAAA

1 IPHBFm1bfQ9HrUkq07aomTAGn~W1wChE53xprAqIftsF18cuoUCJbMYhdJl~pljhvAXHKDSePdsSWecg8yP3st0Ib0h429XaOdrxpoFJ6MI1ofkg-KFtnZ6sX~Yp5GD-z-Nqdu6H0YBlf~y18ToOT6vTUvyE5Jsb105LmRMUAP0pDon4-da9r2wD~rxGOuvkrT83CftfxAIIT1z3M6ouAFI3UBq-guEyiZszM-01yQ-IgVBXsvnou8DXrlysYeeaimL6LoLhJgTnXIDfHCfUsHbgYK0JvRdimu-eMs~BRTT7-o4N5RJjVDfsS4CUHa6JwuWYg3JNSfaJoGFlM2xeGjNSJUs5e7PkcXeqCTKZQERbdIJcFz~rGcTfvc-OfXjMf6VfU2XORKcYiA21zkHMOkQvmE1dATP8VpQTKcYYZrQrRAc5Wxn7ayf9Gdwtq0EZXeydZv36RVJ03E4CZUGQMxXOFGUXwLFXQ9QCbsbXSoukd3rAGoPgE~GboO1YJh3hAAAA
2 lSf~Ut81pZVxDwteIXobR7G7qpnZz2eirvyKJgDFMYuOvXZVNA8bgA5qNhXR8lmOlCBCzKsfm-KIo0QndfRZhMYsFXHTxWBiF9SvPPF8c220l-c0s7cCFQwTdJ5UwZciOexsvNxBLv~1GN2DdMEgEJeUVmLvIaynzQuNgWMvr9AVo6rox2x88FZWT8kdZ2Nt0fiBm-UEd5TpZcK4q4U80t1giuqVeJPqWZjhBV-ctVLeGQC5fp~v3Ev2UeCH~43Z5olrFcCWsVL8vxc4wzMWrVZbi95f1gLW1kQ~sgV8fV~G4JanYLV5yRePC19i5VTkvtcq4HN2cEq~BKryP3AxR7m4Msqwe4gTeNEJE9D6jCmaGjZywujCTFUKo3eTbwCr7sMqZcZGzBPIBcK5syzJL05BMX16pP0ziXJmP-KZT1iEjdY3DIGHJ~mfa2lPNxuwceERnHv3BDyNvo0S0AbEQuJdqmdwFWWHL6mfo9uNYaShMYQmy38O3t6nZP9YOO~YBQAEAAcAAA==
3 SIMipBxbRtEfnWaI5Kx31scSP82trKfA~JVTmd1i6b3gj8QNcBeIMpc1ZVJp4B338mH8pCqeqcS-nlZdMd7uNqCsfQXFZ-bDvM6-qsKwbacM-nLA84UYqgDKMWHHrAbapoyQWVIF5JZzidiyQndunGVBvC1KrTp0Bghn9IXifhAt-EWiWC8JG6QrCreghtK5VQxr0jLcznFbLPUu8vsBvjWripLvnWSaHKK55DClQE85k5bwiFwdbJgg04H2Ku9Zzndg~g01o-iW4IsaEOtRYbjYlTIVpCClbVdlca2zQvQbfc8EkqYZHYHNDOxWsxJPjDv5sDK1Z70pyiSweOuDl27N-moM0pFDBelNSJdzFJl1uo5SN0iMH4xMUfNEEF94IGpxaUDMt2WRrpY2ivIj1pM1byu6uh7lOlGV2XdtFIiZZyIDD6hHeA29CsvwsYKJfYmZu73TRJ7lc2dXqwQe54prt~GyfL5q1-wRZ-5ic3T4~YdrsJ5X6h4iHztHDawwBQAEAAcAAA==
4 71w249Tlf0-ImX3S5zWA3RC8zfZY7dBY8C-3Bll4yHqJT7YZwmR0u2dxe9Xi7gFN8Q0mhUBtjuNJPwL8XjiwbOPeTGytc5xazmlxI~HcZoY32YBSiushex0sr0VnJOEaa~UduhNRY7X12atmhoSOXWKJ-R9YebWQy9eDvLZinX~ytFCttl9xpxyo5WvqrC6~KTMHjA2xHALhSp5FhLf9fRWCCPl7GRY-7kHuWpa7WVs~P2pe~Uux0RTbJzLH8gwmtp-jTgtisef7mr7l9e~MQ3k3dAO0i~ifXhoBSWrSRUgprwjpmlfm3O85tBSLMsgSJK-TFxtlo-iBCMLaJSZamurOx52uX-QO7WlWaX~S93ELXMd1B0wS~PKtomyRt7i67IFDfPde0FvF~YBxEKIgy3N8Sw4S4W3AqZBIaFMM8-q5x9R08t6jOA5WbdUq7IVjSimecM~AKVNyYCShoxn3pe10bj4Xmc~sU7sDYhTI7bmR~WYYaY1NVMlBrm-Fao4MBQAEAAcAAA==
BAPHJe7Gxr4bwrmJrPFKUSvxuFXqGxbno1NunoKccVklLqWxcfjrpBjVFp8OSKooc-89t-N5syP61YWw2DcL~gm0Z7NOtw9hFaPIa2ooS3an8FhnpX7~2etA-oxG1y-3pKMAMu--b443EHV3u1qaBR33azE5-GQn1qvKjoLGWi-nOZk2ogeF0o2pn6VVG-sXrlQByAF7TsolqrDOPI3P3RaRg09UK06y8CASHIr4yKsmrajO1~hnjpfrqwK1N0Y5DSCs4RXAlM5RPxf4mw116-hCGIJN30xnW2BSOaHHuA5U7Oaj-wbTdXICFpYi0M5sigogMeE1wUNB-vNcpc-hcl44BgBf-JOnWOsFxHTy2TGxkR31ockdoKjeEPOm34wVV2wNxCSGHzO25yW9PaoNdjKeplbQP~PrrRrlrTDXWEL7ZxqF6QOlCSO1UFV8HaPtqTMC13JMpX4PeAkeChtTgGQ1Pw2cQWFDBUwi2Z3x8na5MK8QNdIO2p4VYmJmZ8wgBQAEAAcAAA==
kSDy6eL1pVybvIz6yYzlwz7nqBSgjPQV-YiRHygWFl8r1s60p1vSSfurkPqn8JSaopV~zgQpt5CMK8XxOv9L4NP0cfTAfvY3wCKXb~BbuBGXAXcdh-oSAQ65nXP3rpJ7g-TcdUbqYOhJVeKskHRu6Uv~ZTQyEM23rlI638bWNImy5f9bGw9ff-Fb5xj5IYjNNTWYvXmnB2GvP4TZZMRubGBauByWDDfPVg~0et7UOd7RwcnxTfpygKX41EZOJc05G6A4uBgMJjWQK6RjRa30YJ9M4nwR2xUYLb9y6IAaOZEc0khKjYbUp8KxcaG7spqnMogJR~xgWhRn~lV7b9PVsDL-0vDRuIunG5IGLU~pfviUCiv9H4mNiYft2GVvoJhRQLxWSlibJqmHzrXIE1741qX0NZkX9O9zI2gYG1Yw~t4xqdlVJYtBGMrBsRye0gBbImHhEcKo396yrz3~aZdqXiPNgisamx9tj485RgyO-JCp7NJ6WQ4cZmg6DIlNv-JZAAAA

View File

@ -1,15 +1,13 @@
# -*- coding: utf-8 -*-
"""Listener thread creates connection objects for incoming connections"""
import logging
import socket
import threading
from . import shared
from .connection import Connection
from connection import Connection
import shared
class Listener(threading.Thread):
"""The listener thread"""
def __init__(self, host, port, family=socket.AF_INET):
super().__init__(name='Listener')
self.host = host
@ -28,15 +26,13 @@ class Listener(threading.Thread):
break
try:
conn, addr = self.s.accept()
except socket.timeout:
continue
logging.info('Incoming connection from: %s:%i', *addr[:2])
logging.info('Incoming connection from: {}:{}'.format(addr[0], addr[1]))
with shared.connections_lock:
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(*addr[:2], conn, server=True)
c = Connection(addr[0], addr[1], conn, 'ip', True)
c.start()
shared.connections.add(c)
c = None
except socket.timeout:
pass

View File

@ -1,56 +1,43 @@
# -*- coding: utf-8 -*-
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
from . import i2p, shared
from .advertiser import Advertiser
from .manager import Manager
from .listener import Listener
from advertiser import Advertiser
from manager import Manager
from listener import Listener
import i2p.controller
import i2p.listener
import shared
def handler(s, f): # pylint: disable=unused-argument
"""Signal handler"""
def handler(s, f):
logging.info('Gracefully shutting down MiNode')
shared.shutting_down = True
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
"""Parsing arguments"""
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
parser.add_argument('--host', help='Listening host')
parser.add_argument(
'--debug', action='store_true', help='Enable debug logging')
parser.add_argument('--debug', help='Enable debug logging', action='store_true')
parser.add_argument('--data-dir', help='Path to data directory')
parser.add_argument(
'--no-incoming', action='store_true',
help='Do not listen for incoming connections')
parser.add_argument(
'--no-outgoing', action='store_true',
help='Do not send outgoing connections')
parser.add_argument(
'--no-ip', action='store_true', help='Do not use IP network')
parser.add_argument(
'--trusted-peer', help='Specify a trusted peer we should connect to')
parser.add_argument(
'--connection-limit', type=int, help='Maximum number of connections')
parser.add_argument(
'--i2p', action='store_true', help='Enable I2P support (uses SAMv3)')
parser.add_argument(
'--i2p-tunnel-length', type=int, help='Length of I2P tunnels')
parser.add_argument(
'--i2p-sam-host', help='Host of I2P SAMv3 bridge')
parser.add_argument(
'--i2p-sam-port', type=int, help='Port of I2P SAMv3 bridge')
parser.add_argument(
'--i2p-transient', action='store_true',
help='Generate new I2P destination on start')
parser.add_argument('--no-incoming', help='Do not listen for incoming connections', action='store_true')
parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true')
parser.add_argument('--no-ip', help='Do not use IP network', action='store_true')
parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to')
parser.add_argument('--connection-limit', help='Maximum number of connections', type=int)
parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true')
parser.add_argument('--i2p-tunnel-length', help='Length of I2P tunnels', type=int)
parser.add_argument('--i2p-sam-host', help='Host of I2P SAMv3 bridge')
parser.add_argument('--i2p-sam-port', help='Port of I2P SAMv3 bridge', type=int)
parser.add_argument('--i2p-transient', help='Generate new I2P destination on start', action='store_true')
args = parser.parse_args()
if args.port:
@ -100,156 +87,160 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
shared.i2p_transient = True
def load_data():
try:
with open(shared.data_directory + 'objects.pickle', mode='br') as file:
shared.objects = pickle.load(file)
except Exception as e:
logging.warning('Error while loading objects from disk.')
logging.warning(e)
try:
with open(shared.data_directory + 'nodes.pickle', mode='br') as file:
shared.node_pool = pickle.load(file)
except Exception as e:
logging.warning('Error while loading nodes from disk.')
logging.warning(e)
try:
with open(shared.data_directory + 'i2p_nodes.pickle', mode='br') as file:
shared.i2p_node_pool = pickle.load(file)
except Exception as e:
logging.warning('Error while loading nodes from disk.')
logging.warning(e)
with open(os.path.join(shared.source_directory, 'core_nodes.csv'), mode='r', newline='') as f:
reader = csv.reader(f)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), mode='r', newline='') as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to core nodes"""
try:
for port in (8080, 8444):
for item in socket.getaddrinfo(
'bootstrap{}.bitmessage.org'.format(port), 80,
proto=socket.IPPROTO_TCP
):
try:
addr = item[4][0]
socket.inet_pton(item[0], addr)
except (TypeError, socket.error):
continue
else:
shared.core_nodes.add((addr, port))
except socket.gaierror:
logging.info('Failed to do a DNS query')
except Exception:
logging.info('Error during DNS bootstrap', exc_info=True)
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8080))
logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method')
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8444))
logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method')
except Exception as e:
logging.error('Error during DNS bootstrap')
logging.error(e)
def start_ip_listener():
"""Starts `.listener.Listener`"""
listener_ipv4 = None
listener_ipv6 = None
if socket.has_ipv6:
try:
listener_ipv6 = Listener(
shared.listening_host,
shared.listening_port, family=socket.AF_INET6)
listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6)
listener_ipv6.start()
except socket.gaierror as e:
if e.errno == -9:
logging.info('IPv6 is not supported.')
except Exception:
logging.info(
'Error while starting IPv6 listener on port %s',
shared.listening_port, exc_info=True)
except Exception as e:
logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port))
logging.warning(e)
try:
listener_ipv4 = Listener(shared.listening_host, shared.listening_port)
listener_ipv4.start()
except OSError as e:
except Exception as e:
if listener_ipv6:
logging.info(
'Error while starting IPv4 listener on port %s.'
' However the IPv6 one seems to be working'
' and will probably accept IPv4 connections.', # 48 on macos
shared.listening_port, exc_info=e.errno not in (48, 98))
logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
'However the IPv6 one seems to be working and will probably accept IPv4 connections.')
else:
logging.warning(
'Error while starting IPv4 listener on port %s.'
'You will not receive incoming connections.'
' Please check your port configuration',
shared.listening_port, exc_info=True)
logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
'You will not receive incoming connections. Please check your port configuration')
logging.error(e)
def start_i2p_listener():
"""Starts I2P threads"""
# Grab I2P destinations from old object file
for obj in shared.objects.values():
if obj.object_type == shared.i2p_dest_obj_type:
shared.i2p_unchecked_node_pool.add((
base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p'))
shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p'))
dest_priv = b''
if not shared.i2p_transient:
try:
with open(
os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'br'
) as src:
dest_priv = src.read()
with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file:
dest_priv = file.read()
logging.debug('Loaded I2P destination private key.')
except FileNotFoundError:
pass
except Exception:
logging.info(
'Error while loading I2P destination private key.',
exc_info=True)
except Exception as e:
logging.warning('Error while loading I2P destination private key.')
logging.warning(e)
logging.info(
'Starting I2P Controller and creating tunnels. This may take a while.')
i2p_controller = i2p.I2PController(
shared, shared.i2p_sam_host, shared.i2p_sam_port, dest_priv)
logging.info('Starting I2P Controller and creating tunnels. This may take a while.')
i2p_controller = i2p.controller.I2PController(shared.i2p_sam_host, shared.i2p_sam_port, dest_priv)
i2p_controller.start()
shared.i2p_dest_pub = i2p_controller.dest_pub
shared.i2p_session_nick = i2p_controller.nick
logging.info('Local I2P destination: %s', shared.i2p_dest_pub.decode())
logging.info('I2P session nick: %s', shared.i2p_session_nick.decode())
logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode()))
logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode()))
logging.info('Starting I2P Listener')
i2p_listener = i2p.I2PListener(shared, i2p_controller.nick)
i2p_listener = i2p.listener.I2PListener(i2p_controller.nick)
i2p_listener.start()
if not shared.i2p_transient:
try:
with open(
os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'bw'
) as src:
src.write(i2p_controller.dest_priv)
with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file:
file.write(i2p_controller.dest_priv)
logging.debug('Saved I2P destination private key.')
except Exception:
logging.warning(
'Error while saving I2P destination private key.',
exc_info=True)
except Exception as e:
logging.warning('Error while saving I2P destination private key.')
logging.warning(e)
try:
with open(
os.path.join(shared.data_directory, 'i2p_dest.pub'), 'bw'
) as src:
src.write(shared.i2p_dest_pub)
with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file:
file.write(shared.i2p_dest_pub)
logging.debug('Saved I2P destination public key.')
except Exception:
logging.warning(
'Error while saving I2P destination public key.', exc_info=True)
except Exception as e:
logging.warning('Error while saving I2P destination public key.')
logging.warning(e)
def main():
"""Script entry point"""
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
parse_arguments()
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s')
logging.info('Starting MiNode')
logging.info('Data directory: %s', shared.data_directory)
logging.info('Data directory: {}'.format(shared.data_directory))
if not os.path.exists(shared.data_directory):
try:
os.makedirs(shared.data_directory)
except Exception:
logging.warning(
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
except Exception as e:
logging.warning('Error while creating data directory in: {}'.format(shared.data_directory))
logging.warning(e)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
if shared.i2p_enabled:
# We are starting it before cleaning expired objects
# so we can collect I2P destination objects
# We are starting it before cleaning expired objects so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
else:
logging.warning('Deleted invalid object: {}'.format(base64.b16encode(vector).decode()))
del shared.objects[vector]
manager = Manager()
manager.start()

View File

@ -1,43 +1,30 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
import queue
import random
import threading
import time
from . import proofofwork, shared, structure
from .connection import Bootstrapper, Connection
from .i2p import I2PDialer
from connection import Connection
from i2p.dialer import I2PDialer
import pow
import shared
import structure
class Manager(threading.Thread):
"""The manager thread"""
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
self.bootstrap_pool = []
self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time()
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def fill_bootstrap_pool(self):
"""Populate the bootstrap pool by core nodes and checked ones"""
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
random.shuffle(self.bootstrap_pool)
self.last_published_i2p_destination = time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # Publish destination 5-15 minutes after start
def run(self):
self.load_data()
self.clean_objects()
self.fill_bootstrap_pool()
while True:
time.sleep(0.8)
now = time.time()
@ -63,47 +50,21 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
# FIXME: no need to check is_valid() here
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
with shared.objects_lock:
del shared.objects[vector]
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
def manage_connections(self):
"""Open new connections if needed, remove closed ones"""
@staticmethod
def manage_connections():
hosts = set()
def connect(target, connection_class=Connection):
"""
Open a connection of *connection_class*
to the *target* (host, port)
"""
c = connection_class(*target)
c.start()
with shared.connections_lock:
shared.connections.add(c)
def bootstrap():
"""Bootstrap from DNS seed-nodes and known nodes"""
try:
target = self.bootstrap_pool.pop()
except IndexError:
logging.warning(
'Ran out of bootstrap nodes, refilling')
self.fill_bootstrap_pool()
return
logging.info('Starting a bootstrapper for %s:%s', *target)
connect(target, Bootstrapper)
outgoing_connections = 0
for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected':
with shared.connections_lock:
shared.connections.remove(c)
else:
hosts.add(structure.NetAddrNoPrefix.network_group(c.host))
hosts.add(c.host)
if not c.server:
outgoing_connections += 1
@ -116,166 +77,90 @@ class Manager(threading.Thread):
if shared.trusted_peer:
to_connect.add(shared.trusted_peer)
if (
outgoing_connections < shared.outgoing_connections
and shared.send_outgoing_connections and not shared.trusted_peer
):
if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections and not shared.trusted_peer:
if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16:
to_connect.update(random.sample(
tuple(shared.unchecked_node_pool), 16))
to_connect.update(random.sample(shared.unchecked_node_pool, 16))
else:
to_connect.update(shared.unchecked_node_pool)
if outgoing_connections < shared.outgoing_connections / 2:
bootstrap()
shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8:
to_connect.update(random.sample(
tuple(shared.node_pool), 8))
to_connect.update(random.sample(shared.node_pool, 8))
else:
to_connect.update(shared.node_pool)
if shared.i2p_enabled:
if len(shared.i2p_unchecked_node_pool) > 16:
to_connect.update(random.sample(
tuple(shared.i2p_unchecked_node_pool), 16))
to_connect.update(random.sample(shared.i2p_unchecked_node_pool, 16))
else:
to_connect.update(shared.i2p_unchecked_node_pool)
shared.i2p_unchecked_node_pool.difference_update(to_connect)
if len(shared.i2p_node_pool) > 8:
to_connect.update(random.sample(
tuple(shared.i2p_node_pool), 8))
to_connect.update(random.sample(shared.i2p_node_pool, 8))
else:
to_connect.update(shared.i2p_node_pool)
for host, port in to_connect:
group = structure.NetAddrNoPrefix.network_group(host)
if group in hosts:
for addr in to_connect:
if addr[0] in hosts:
continue
if port == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and host != shared.i2p_dest_pub:
if addr[1] == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and addr[0] != shared.i2p_dest_pub:
try:
d = I2PDialer(
shared,
host, shared.i2p_session_nick,
shared.i2p_sam_host, shared.i2p_sam_port)
d = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port)
d.start()
hosts.add(d.destination)
shared.i2p_dialers.add(d)
except Exception:
logging.warning(
'Exception while trying to establish'
' an I2P connection', exc_info=True)
except Exception as e:
logging.warning('Exception while trying to establish an I2P connection')
logging.warning(e)
else:
continue
else:
connect((host, port))
hosts.add(group)
c = Connection(addr[0], addr[1])
c.start()
hosts.add(c.host)
with shared.connections_lock:
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'bw'
) as dst:
with open(shared.data_directory + 'objects.pickle', mode='bw') as file:
with shared.objects_lock:
pickle.dump(shared.objects, dst, protocol=3)
pickle.dump(shared.objects, file, protocol=3)
logging.debug('Saved objects')
except Exception:
logging.warning('Error while saving objects', exc_info=True)
except Exception as e:
logging.warning('Error while saving objects')
logging.warning(e)
@staticmethod
def pickle_nodes():
if len(shared.node_pool) > 10000:
shared.node_pool = set(random.sample(
tuple(shared.node_pool), 10000))
shared.node_pool = set(random.sample(shared.node_pool, 10000))
if len(shared.unchecked_node_pool) > 1000:
shared.unchecked_node_pool = set(random.sample(
tuple(shared.unchecked_node_pool), 1000))
shared.unchecked_node_pool = set(random.sample(shared.unchecked_node_pool, 1000))
if len(shared.i2p_node_pool) > 1000:
shared.i2p_node_pool = set(random.sample(
tuple(shared.i2p_node_pool), 1000))
shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000))
if len(shared.i2p_unchecked_node_pool) > 100:
shared.i2p_unchecked_node_pool = set(random.sample(
tuple(shared.i2p_unchecked_node_pool), 100))
shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100))
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.node_pool, dst, protocol=3)
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw'
) as dst:
pickle.dump(shared.i2p_node_pool, dst, protocol=3)
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
pickle.dump(shared.node_pool, file, protocol=3)
with open(shared.data_directory + 'i2p_nodes.pickle', mode='bw') as file:
pickle.dump(shared.i2p_node_pool, file, protocol=3)
logging.debug('Saved nodes')
except Exception:
logging.warning('Error while saving nodes', exc_info=True)
except Exception as e:
logging.warning('Error while saving nodes')
logging.warning(e)
@staticmethod
def publish_i2p_destination():
if shared.i2p_session_nick and not shared.i2p_transient:
logging.info('Publishing our I2P destination')
dest_pub_raw = base64.b64decode(
shared.i2p_dest_pub, altchars=b'-~')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)
dest_pub_raw = base64.b64decode(shared.i2p_dest_pub, altchars=b'-~')
obj = structure.Object(b'\x00' * 8, int(time.time() + 2 * 3600), shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, 1, dest_pub_raw)
pow.do_pow_and_publish(obj)

View File

@ -1,30 +1,24 @@
# -*- coding: utf-8 -*-
"""Protocol message objects"""
import base64
import hashlib
import struct
import time
from . import shared, structure
import shared
import structure
class Header():
"""Message header structure"""
class Header(object):
def __init__(self, command, payload_length, payload_checksum):
self.command = command
self.payload_length = payload_length
self.payload_checksum = payload_checksum
def __repr__(self):
return (
'type: header, command: "{}", payload_length: {},'
' payload_checksum: {}'
).format(
self.command.decode(), self.payload_length,
base64.b16encode(self.payload_checksum).decode())
return 'type: header, command: "{}", payload_length: {}, payload_checksum: {}'\
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -34,9 +28,7 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
magic_bytes, command, payload_length, payload_checksum = struct.unpack('>4s12sL4s', b)
if magic_bytes != shared.magic_bytes:
raise ValueError('magic_bytes do not match')
@ -46,8 +38,7 @@ class Header():
return cls(command, payload_length, payload_checksum)
class Message():
"""Common message structure"""
class Message(object):
def __init__(self, command, payload):
self.command = command
self.payload = payload
@ -56,55 +47,35 @@ class Message():
self.payload_checksum = hashlib.sha512(payload).digest()[:4]
def __repr__(self):
return '{}, payload_length: {}, payload_checksum: {}'.format(
self.command.decode(), self.payload_length,
base64.b16encode(self.payload_checksum).decode())
return '{}, payload_length: {}, payload_checksum: {}'\
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
b = Header(self.command, self.payload_length, self.payload_checksum).to_bytes()
b += self.payload
return b
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
payload_length = len(payload)
if payload_length != h.payload_length:
raise ValueError(
'wrong payload length, expected {}, got {}'.format(
h.payload_length, payload_length))
raise ValueError('wrong payload length, expected {}, got {}'.format(h.payload_length, payload_length))
payload_checksum = hashlib.sha512(payload).digest()[:4]
if payload_checksum != h.payload_checksum:
raise ValueError(
'wrong payload checksum, expected {}, got {}'.format(
h.payload_checksum, payload_checksum))
raise ValueError('wrong payload checksum, expected {}, got {}'.format(h.payload_checksum, payload_checksum))
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent, streams=None
):
class Version(object):
def __init__(self, host, port, protocol_version=shared.protocol_version, services=shared.services,
nonce=shared.nonce, user_agent=shared.user_agent):
self.host = host
self.port = port
@ -112,45 +83,33 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
'version, protocol_version: {}, services: {}, host: {}, port: {},'
' nonce: {}, user_agent: {}').format(
self.protocol_version, self.services, self.host, self.port,
base64.b16encode(self.nonce).decode(), self.user_agent)
return 'version, protocol_version: {}, services: {}, host: {}, port: {}, nonce: {}, user_agent: {}'\
.format(self.protocol_version, self.services, self.host, self.port, base64.b16encode(self.nonce).decode(), self.user_agent)
def to_bytes(self):
payload = b''
payload += struct.pack('>I', self.protocol_version)
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
1, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
self.services, '127.0.0.1', 8444).to_bytes()
payload += structure.NetAddrNoPrefix(shared.services, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(shared.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_message(cls, m):
def from_bytes(cls, b):
m = Message.from_bytes(b)
payload = m.payload
( # unused: net_addr_local
protocol_version, services, timestamp, net_addr_remote, _, nonce
) = struct.unpack('>IQQ26s26s8s', payload[:80])
if abs(time.time() - timestamp) > 3600:
raise ValueError('remote time offset is too large')
protocol_version, services, t, net_addr_remote, net_addr_local, nonce = \
struct.unpack('>IQQ26s26s8s', payload[:80])
net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote)
@ -159,28 +118,22 @@ class Version():
payload = payload[80:]
user_agent_length, payload = _payload_read_int(payload)
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
return cls(host, port, protocol_version, services, nonce, user_agent)
class Inv():
"""The inv message payload"""
class Inv(object):
def __init__(self, vectors):
self.vectors = set(vectors)
@ -188,16 +141,16 @@ class Inv():
return 'inv, count: {}'.format(len(self.vectors))
def to_bytes(self):
return Message(
b'inv', structure.VarInt(len(self.vectors)).to_bytes()
+ b''.join(self.vectors)
).to_bytes()
return Message(b'inv', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -211,8 +164,7 @@ class Inv():
return cls(vectors)
class GetData():
"""The getdata message payload"""
class GetData(object):
def __init__(self, vectors):
self.vectors = set(vectors)
@ -220,16 +172,16 @@ class GetData():
return 'getdata, count: {}'.format(len(self.vectors))
def to_bytes(self):
return Message(
b'getdata', structure.VarInt(len(self.vectors)).to_bytes()
+ b''.join(self.vectors)
).to_bytes()
return Message(b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -243,8 +195,7 @@ class GetData():
return cls(vectors)
class Addr():
"""The addr message payload"""
class Addr(object):
def __init__(self, addresses):
self.addresses = addresses
@ -252,17 +203,16 @@ class Addr():
return 'addr, count: {}'.format(len(self.addresses))
def to_bytes(self):
return Message(
b'addr', structure.VarInt(len(self.addresses)).to_bytes()
+ b''.join({addr.to_bytes() for addr in self.addresses})
).to_bytes()
return Message(b'addr', structure.VarInt(len(self.addresses)).to_bytes() + b''.join({addr.to_bytes() for addr in self.addresses})).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
# not validating addr_count
_, payload = _payload_read_int(payload)
addr_count_varint_length = structure.VarInt.length(payload[0])
addr_count = structure.VarInt.from_bytes(payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
addresses = set()
@ -271,37 +221,3 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

46
minode/pow.py Normal file
View File

@ -0,0 +1,46 @@
import base64
import hashlib
import logging
import multiprocessing
import shared
import struct
import threading
import time
import structure
def _pow_worker(target, initial_hash, q):
nonce = 0
logging.debug("target: {}, initial_hash: {}".format(target, base64.b16encode(initial_hash).decode()))
trial_value = target + 1
while trial_value > target:
nonce += 1
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
q.put(struct.pack('>Q', nonce))
def _worker(obj):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
logging.debug("Starting POW process")
t = time.time()
p.start()
nonce = q.get()
p.join()
logging.debug("Finished doing POW, nonce: {}, time: {}s".format(nonce, time.time() - t))
obj = structure.Object(nonce, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.object_payload)
logging.debug("Object vector is {}".format(base64.b16encode(obj.vector).decode()))
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
def do_pow_and_publish(obj):
t = threading.Thread(target=_worker, args=(obj, ))
t.start()

View File

@ -1,54 +0,0 @@
"""Doing proof of work"""
import base64
import hashlib
import logging
import multiprocessing
import struct
import threading
import time
from . import shared, structure
def _pow_worker(target, initial_hash, q):
nonce = 0
logging.debug(
'target: %s, initial_hash: %s',
target, base64.b16encode(initial_hash).decode())
trial_value = target + 1
while trial_value > target:
nonce += 1
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(
struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
q.put(struct.pack('>Q', nonce))
def _worker(obj):
q = multiprocessing.Queue()
p = multiprocessing.Process(
target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
logging.debug('Starting POW process')
t = time.time()
p.start()
nonce = q.get()
p.join()
logging.debug(
'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t)
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
logging.debug(
'Object vector is %s', base64.b16encode(obj.vector).decode())
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
def do_pow_and_publish(obj):
t = threading.Thread(target=_worker, args=(obj, ))
t.start()

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
"""Common variables and structures, referred in different threads"""
import logging
import os
import queue
@ -21,7 +20,7 @@ protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
stream = 1
nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.3/'
user_agent = b'/MiNode:0.3.0/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250

View File

@ -1,17 +1,15 @@
# -*- coding: utf-8 -*-
"""Protocol structures"""
import base64
import hashlib
import logging
import socket
import struct
import socket
import time
from . import shared
import shared
class VarInt():
"""varint object"""
class VarInt(object):
def __init__(self, n):
self.n = n
@ -45,34 +43,21 @@ class VarInt():
return cls(n)
class Object():
"""The 'object' message payload"""
def __init__(
self, nonce, expires_time, object_type, version,
stream_number, object_payload
):
class Object(object):
def __init__(self, nonce, expires_time, object_type, version, stream_number, object_payload):
self.nonce = nonce
self.expires_time = expires_time
self.object_type = object_type
self.version = version
self.stream_number = stream_number
self.object_payload = object_payload
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
self.vector = hashlib.sha512(hashlib.sha512(self.to_bytes()).digest()).digest()[:32]
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
return 'object, vector: {}'.format(base64.b16encode(self.vector).decode())
@classmethod
def from_message(cls, m):
"""Decode message payload"""
payload = m.payload
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
payload = payload[20:]
@ -80,87 +65,63 @@ class Object():
version = VarInt.from_bytes(payload[:version_varint_length]).n
payload = payload[version_varint_length:]
stream_number_varint_length = VarInt.length(payload[0])
stream_number = VarInt.from_bytes(
payload[:stream_number_varint_length]).n
stream_number = VarInt.from_bytes(payload[:stream_number_varint_length]).n
payload = payload[stream_number_varint_length:]
return cls(
nonce, expires_time, object_type, version, stream_number, payload)
return cls(nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
payload += (
VarInt(self.version).to_bytes()
+ VarInt(self.stream_number).to_bytes())
payload += VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes()
payload += self.object_payload
return payload
def is_expired(self):
"""Check if object's TTL is expired"""
return self.expires_time + 3 * 3600 < time.time()
def is_valid(self):
"""Checks the object validity"""
if self.is_expired():
logging.debug(
'Invalid object %s, reason: expired',
base64.b16encode(self.vector).decode())
logging.debug('Invalid object {}, reason: expired'.format(base64.b16encode(self.vector).decode()))
return False
if self.expires_time > time.time() + 28 * 24 * 3600 + 3 * 3600:
logging.warning(
'Invalid object %s, reason: end of life too far in the future',
base64.b16encode(self.vector).decode())
logging.warning('Invalid object {}, reason: end of life too far in the future'.format(base64.b16encode(self.vector).decode()))
return False
if len(self.object_payload) > 2**18:
logging.warning(
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
logging.warning('Invalid object {}, reason: payload is too long'.format(base64.b16encode(self.vector).decode()))
return False
if self.stream_number != shared.stream:
logging.warning(
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
if self.stream_number != 1:
logging.warning('Invalid object {}, reason: not in stream 1'.format(base64.b16encode(self.vector).decode()))
return False
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
'Invalid object %s, reason: insufficient pow',
base64.b16encode(self.vector).decode())
logging.warning('Invalid object {}, reason: insufficient pow'.format(base64.b16encode(self.vector).decode()))
return False
return True
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
return int(
2 ** 64 / (
shared.nonce_trials_per_byte * (
length + (dt * length) / (2 ** 16))))
return int(2 ** 64 / (shared.nonce_trials_per_byte * (length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
"""Compute the initial hash for PoW"""
return hashlib.sha512(self.to_bytes()[8:]).digest()
class NetAddrNoPrefix():
"""Network address"""
class NetAddrNoPrefix(object):
def __init__(self, services, host, port):
self.services = services
self.host = host
self.port = port
def __repr__(self):
return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(
self.services, self.host, self.port)
return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(self.services, self.host, self.port)
def to_bytes(self):
b = b''
@ -173,34 +134,17 @@ class NetAddrNoPrefix():
b += struct.pack('>H', int(self.port))
return b
@staticmethod
def network_group(host):
"""A simplified network group identifier from pybitmessage protocol"""
try:
host = socket.inet_pton(socket.AF_INET, host)
return host[:2]
except socket.error:
try:
host = socket.inet_pton(socket.AF_INET6, host)
return host[:12]
except OSError:
return host
except TypeError:
return host
@classmethod
def from_bytes(cls, b):
services, host, port = struct.unpack('>Q16sH', b)
if host.startswith(
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'):
if host.startswith(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'):
host = socket.inet_ntop(socket.AF_INET, host[-4:])
else:
host = socket.inet_ntop(socket.AF_INET6, host)
return cls(services, host, port)
class NetAddr():
"""Network address with time and stream"""
class NetAddr(object):
def __init__(self, services, host, port, stream=shared.stream):
self.stream = stream
self.services = services
@ -208,8 +152,8 @@ class NetAddr():
self.port = port
def __repr__(self):
return 'net_addr, stream: {}, services: {}, host: {}, port {}'.format(
self.stream, self.services, self.host, self.port)
return 'net_addr, stream: {}, services: {}, host: {}, port {}'\
.format(self.stream, self.services, self.host, self.port)
def to_bytes(self):
b = b''
@ -220,6 +164,6 @@ class NetAddr():
@classmethod
def from_bytes(cls, b):
stream, net_addr = struct.unpack('>QI26s', b)[1:]
t, stream, net_addr = struct.unpack('>QI26s', b)
n = NetAddrNoPrefix.from_bytes(net_addr)
return cls(n.services, n.host, n.port, stream)

View File

@ -1,57 +0,0 @@
"""Tests for memory usage"""
import gc
import time
from minode import shared
from .test_network import TestProcessProto, run_listener
class TestListener(TestProcessProto):
"""A separate test case for Listener with a process with --trusted-peer"""
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
def setUp(self):
shared.shutting_down = False
@classmethod
def tearDownClass(cls):
super().tearDownClass()
shared.shutting_down = False
def test_listener(self):
"""Start Listener and disconnect a client"""
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
shared.connection_limit = 2
connected = False
started = time.time()
while not connected:
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to establish the connection')
for c in shared.connections:
if c.status == 'fully_established':
connected = True
if not self._stop_process(10):
self.fail('Failed to stop the client process')
for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected':
shared.connections.remove(c)
c = None
break
else:
self.fail('The connection is alive')
gc.collect()
for obj in gc.get_objects():
if (
isinstance(obj, shared.connection)
and obj not in shared.connections
):
self.fail('Connection %s remains in memory' % obj)

View File

@ -1,110 +0,0 @@
"""Tests for messages"""
import struct
import time
import unittest
from binascii import unhexlify
from minode import message
from minode.shared import magic_bytes
# 500 identical peers:
# import ipaddress
# from hyperbit import net, packet
# [packet.Address(
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_addr_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# protocol.CreatePacket(b'ping', b'test')
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
def test_packet(self):
"""Check packet creation and parsing by message.Message"""
msg = message.Message(b'ping', b'test').to_bytes()
self.assertEqual(msg[:len(magic_bytes)], magic_bytes)
with self.assertRaises(ValueError):
# wrong magic
message.Message.from_bytes(msg[1:])
with self.assertRaises(ValueError):
# wrong length
message.Message.from_bytes(msg[:-1])
with self.assertRaises(ValueError):
# wrong checksum
message.Message.from_bytes(msg[:-1] + b'\x00')
msg = message.Message.from_bytes(sample_ping_msg)
self.assertEqual(msg.command, b'ping')
self.assertEqual(msg.payload, b'test')
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_addr_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
self.assertEqual(address.stream, 1)
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
with self.assertRaises(ValueError):
# large time offset
version_packet = message.Version.from_message(msg)
msg.payload = (
msg.payload[:12] + struct.pack('>Q', int(time.time()))
+ msg.payload[20:])
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,279 +0,0 @@
"""Tests for network connections"""
import ipaddress
import logging
import os
import random
import unittest
import tempfile
import time
from contextlib import contextmanager
from minode import connection, main, shared
from minode.listener import Listener
from minode.manager import Manager
from .test_process import TestProcessProto
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s')
@contextmanager
def time_offset(offset):
"""
Replace time.time() by a mock returning a constant value
with given offset from current time.
"""
started = time.time()
time_call = time.time
try:
time.time = lambda: started + offset
yield time_call
finally:
time.time = time_call
@contextmanager
def run_listener(host='localhost', port=8444):
"""
Run the Listener with zero connection limit and
reset variables in shared after its stop.
"""
connection_limit = shared.connection_limit
shared.connection_limit = 0
try:
listener = Listener(host, port)
listener.start()
yield listener
except OSError:
yield
finally:
shared.connection_limit = connection_limit
shared.connections.clear()
shared.shutting_down = True
time.sleep(1)
class TestNetwork(unittest.TestCase):
"""Test case starting connections"""
@classmethod
def setUpClass(cls):
shared.data_directory = tempfile.gettempdir()
def setUp(self):
shared.core_nodes.clear()
shared.unchecked_node_pool.clear()
shared.objects = {}
try:
os.remove(os.path.join(shared.data_directory, 'objects.pickle'))
except FileNotFoundError:
pass
def _make_initial_nodes(self):
Manager.load_data()
core_nodes_len = len(shared.core_nodes)
self.assertGreaterEqual(core_nodes_len, 3)
main.bootstrap_from_dns()
self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len)
for host, _ in shared.core_nodes:
try:
ipaddress.IPv4Address(host)
except ipaddress.AddressValueError:
try:
ipaddress.IPv6Address(host)
except ipaddress.AddressValueError:
self.fail('Found not an IP address in the core nodes')
break
else:
self.fail('No IPv6 address found in the core nodes')
def test_bootstrap(self):
"""Start bootstrappers and check node pool"""
if shared.core_nodes:
shared.core_nodes = set()
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
self._make_initial_nodes()
self.assertEqual(len(shared.unchecked_node_pool), 0)
for node in shared.core_nodes:
c = connection.Bootstrapper(*node)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after running %s bootstrappers' % len(shared.core_nodes))
def test_connection(self):
"""Check a normal connection - should receive objects"""
self._make_initial_nodes()
started = time.time()
nodes = list(shared.core_nodes.union(shared.unchecked_node_pool))
random.shuffle(nodes)
for node in nodes:
# unknown = node not in shared.node_pool
# self.assertTrue(unknown)
unknown = True
shared.node_pool.discard(node)
c = connection.Connection(*node)
c.start()
connection_started = time.time()
while c.status not in ('disconnected', 'failed'):
# The addr of established connection is added to nodes pool
if unknown and c.status == 'fully_established':
unknown = False
self.assertIn(node, shared.node_pool)
if shared.objects or time.time() - connection_started > 90:
c.status = 'disconnecting'
if time.time() - started > 300:
c.status = 'disconnecting'
self.fail('Failed to receive an object in %s sec' % 300)
time.sleep(0.2)
if shared.objects: # got some objects
break
else:
self.fail('Failed to establish a proper connection')
def test_time_offset(self):
"""Assert the network bans for large time offset"""
def try_connect(nodes, timeout, call):
started = call()
for node in nodes:
c = connection.Connection(*node)
c.start()
while call() < started + timeout:
if c.status == 'fully_established':
return 'Established a connection'
if c.status in ('disconnected', 'failed'):
break
time.sleep(0.2)
else:
return 'Spent too much time trying to connect'
def time_offset_connections(nodes, offset):
"""Spoof time.time and open connections with given time offset"""
with time_offset(offset) as time_call:
result = try_connect(nodes, 200, time_call)
if result:
self.fail(result)
self._make_initial_nodes()
nodes = random.sample(
tuple(shared.core_nodes.union(shared.unchecked_node_pool)), 5)
time_offset_connections(nodes, 4000)
time_offset_connections(nodes, -4000)
class TestListener(TestProcessProto):
"""A separate test case for Listener with a process with --trusted-peer"""
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
def setUp(self):
shared.shutting_down = False
@classmethod
def tearDownClass(cls):
super().tearDownClass()
shared.shutting_down = False
def test_listener(self):
"""Start Listener and try to connect"""
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
c = connection.Connection('127.0.0.1', 8444)
shared.connections.add(c)
for _ in range(30):
if len(shared.connections) > 1:
self.fail('The listener ignored connection limit')
time.sleep(0.5)
shared.connection_limit = 2
c.start()
started = time.time()
while c.status not in ('disconnected', 'failed'):
if c.status == 'fully_established':
self.fail('Connected to itself')
if time.time() - started > 90:
c.status = 'disconnecting'
time.sleep(0.2)
server = None
started = time.time()
while not server:
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to establish the connection')
for c in shared.connections:
if c.status == 'fully_established':
server = c
self.assertTrue(server.server)
while not self.process.connections():
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to connect to listener')
client = self.process.connections()[0]
self.assertEqual(client.raddr[0], '127.0.0.1')
self.assertEqual(client.raddr[1], 8444)
self.assertEqual(server.host, client.laddr[0])
# self.assertEqual(server.port, client.laddr[1])
server.status = 'disconnecting'
self.assertFalse(listener.is_alive())
def test_listener_timeoffset(self):
"""Run listener with a large time offset - shouldn't connect"""
with time_offset(4000):
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
shared.connection_limit = 2
for _ in range(30):
for c in shared.connections:
if c.status == 'fully_established':
self.fail('Established a connection')
time.sleep(0.5)
class TestBootstrapProcess(TestProcessProto):
"""A separate test case for bootstrapping with a minode process"""
_listen = True
_connection_limit = 24
def test_bootstrap(self):
"""Start a bootstrapper for the local process and check node pool"""
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
started = time.time()
while not self.connections():
if time.time() - started > 60:
self.fail('Failed to establish a connection')
time.sleep(1)
for _ in range(3):
c = connection.Bootstrapper('127.0.0.1', 8444)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after 3 tries to bootstrap with the local process')

View File

@ -1,187 +0,0 @@
"""Blind tests, starting the minode process"""
import os
import signal
import socket
import subprocess
import sys
import tempfile
import time
import unittest
import psutil
from minode.i2p import util
from minode.structure import NetAddrNoPrefix
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
_process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 8
_listen = False
_listening_port = None
home = None
@classmethod
def setUpClass(cls):
if not cls.home:
cls.home = tempfile.gettempdir()
cmd = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
cmd += ['--no-incoming']
elif cls._listening_port:
cmd += ['-p', str(cls._listening_port)]
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
@classmethod
def _stop_process(cls, timeout=5):
cls.process.send_signal(signal.SIGTERM)
try:
cls.process.wait(timeout)
except psutil.TimeoutExpired:
return False
return True
@classmethod
def tearDownClass(cls):
"""Ensures that process stopped and removes files"""
try:
if not cls._stop_process(10):
try:
cls.process.kill()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
_wait_time = 30
# longer wait time because it's not a benchmark
def test_shutdown(self):
"""Send to minode SIGTERM and ensure it stopped"""
self.assertTrue(
self._stop_process(self._wait_time),
'%s has not stopped in %i sec' % (
' '.join(self._process_cmd), self._wait_time))
class TestProcess(TestProcessProto):
"""The test case for minode process"""
_wait_time = 180
_check_limit = False
def test_connections(self):
"""Check minode process connections"""
_started = time.time()
def continue_check_limit(extra_time):
for _ in range(extra_time * 2):
self.assertLessEqual(
len(self.connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
else self._connection_limit) + 1,
'Opened more connections than required'
' by --connection-limit')
time.sleep(1)
for _ in range(self._wait_time * 2):
if len(self.connections()) >= self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
if '--i2p' not in self._process_cmd:
groups = []
for c in self.connections():
group = NetAddrNoPrefix.network_group(c.raddr[0])
self.assertNotIn(group, groups)
groups.append(group)
time.sleep(0.5)
else:
self.fail(
'Failed to establish at least %i connections in %s sec'
% (int(self._connection_limit / 2), self._wait_time))
if self._check_limit:
continue_check_limit(_time_to_connect)
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
self.fail('Listening while started with --no-incoming')
return
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_listen = True
_listening_port = 8448
@classmethod
def setUpClass(cls):
cls.freezed = False
cls.keyfile = os.path.join(cls.home, 'i2p_dest.pub')
saved = os.path.isfile(cls.keyfile)
super().setUpClass()
for _ in range(cls._wait_time):
if saved:
if cls.process.num_threads() > 3:
break
elif os.path.isfile(cls.keyfile):
break
time.sleep(1)
else:
cls.freezed = True
def setUp(self):
"""Skip any test if I2PController freezed"""
if self.freezed:
raise unittest.SkipTest(
'I2PController has probably failed to start')
def test_saved_keys(self):
"""Check saved i2p keys"""
with open(self.keyfile, 'br') as src:
i2p_dest_pub = src.read()
with open(os.path.join(self.home, 'i2p_dest_priv.key'), 'br') as src:
i2p_dest_priv = src.read()
self.assertEqual(util.pub_from_priv(i2p_dest_priv), i2p_dest_pub)
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']

View File

@ -1,194 +0,0 @@
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
from binascii import unhexlify
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
# for one of bootstrap servers, port 8080,
# everything else is like in test_message: 1626611891, 1, 1
sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
self.assertEqual(s.to_bytes(), b'\x00')
s = structure.VarInt.from_bytes(b'\x00')
self.assertEqual(s.n, 0)
s = structure.VarInt(42)
self.assertEqual(s.to_bytes(), b'*')
s = structure.VarInt.from_bytes(b'*')
self.assertEqual(s.n, 42)
s = structure.VarInt(252)
self.assertEqual(s.to_bytes(), unhexlify('fc'))
s = structure.VarInt.from_bytes(unhexlify('fc'))
self.assertEqual(s.n, 252)
s = structure.VarInt(253)
self.assertEqual(s.to_bytes(), unhexlify('fd00fd'))
s = structure.VarInt.from_bytes(unhexlify('fd00fd'))
self.assertEqual(s.n, 253)
s = structure.VarInt(100500)
self.assertEqual(s.to_bytes(), unhexlify('fe00018894'))
s = structure.VarInt.from_bytes(unhexlify('fe00018894'))
self.assertEqual(s.n, 100500)
s = structure.VarInt(65535)
self.assertEqual(s.to_bytes(), unhexlify('fdffff'))
s = structure.VarInt.from_bytes(unhexlify('fdffff'))
self.assertEqual(s.n, 65535)
s = structure.VarInt(4294967295)
self.assertEqual(s.to_bytes(), unhexlify('feffffffff'))
s = structure.VarInt.from_bytes(unhexlify('feffffffff'))
self.assertEqual(s.n, 4294967295)
s = structure.VarInt(4294967296)
self.assertEqual(s.to_bytes(), unhexlify('ff0000000100000000'))
s = structure.VarInt.from_bytes(unhexlify('ff0000000100000000'))
self.assertEqual(s.n, 4294967296)
s = structure.VarInt(18446744073709551615)
self.assertEqual(s.to_bytes(), unhexlify('ffffffffffffffffff'))
s = structure.VarInt.from_bytes(unhexlify('ffffffffffffffffff'))
self.assertEqual(s.n, 18446744073709551615)
def test_address(self):
"""Check address encoding in structure.NetAddrNoPrefix()"""
addr = structure.NetAddrNoPrefix(1, '127.0.0.1', 8444)
self.assertEqual(
addr.to_bytes()[8:24],
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'
+ struct.pack('>L', 2130706433))
addr = structure.NetAddrNoPrefix(1, '191.168.1.1', 8444)
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('00000000000000000000ffffbfa80101'))
addr = structure.NetAddrNoPrefix(1, '1.1.1.1', 8444)
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('00000000000000000000ffff01010101'))
addr = structure.NetAddrNoPrefix(
1, '0102:0304:0506:0708:090A:0B0C:0D0E:0F10', 8444)
self.assertEqual(
addr.to_bytes()[8:24],
unhexlify('0102030405060708090a0b0c0d0e0f10'))
addr = structure.NetAddr.from_bytes(sample_addr_data)
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
self.assertEqual(addr.port, 8080)
self.assertEqual(addr.stream, 1)
self.assertEqual(addr.services, 1)
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_network_group(self):
"""Test various types of network groups"""
test_ip = '1.2.3.4'
self.assertEqual(
b'\x01\x02', structure.NetAddrNoPrefix.network_group(test_ip))
self.assertEqual(
structure.NetAddrNoPrefix.network_group('8.8.8.8'),
structure.NetAddrNoPrefix.network_group('8.8.4.4'))
self.assertNotEqual(
structure.NetAddrNoPrefix.network_group('1.1.1.1'),
structure.NetAddrNoPrefix.network_group('8.8.8.8'))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
structure.NetAddrNoPrefix.network_group(test_ip))
for test_ip in (
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None
):
self.assertEqual(
test_ip, structure.NetAddrNoPrefix.network_group(test_ip))
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -1,2 +0,0 @@
coverage
psutil

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python
import os
from setuptools import setup, find_packages
from minode import shared
README = open(os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'README.md')).read()
name, version = shared.user_agent.strip(b'/').split(b':')
setup(
name=name.decode('utf-8'),
version=version.decode('utf-8'),
description='Python 3 implementation of the Bitmessage protocol.'
' Designed only to route objects inside the network.',
long_description=README,
license='MIT',
author='Krzysztof Oziomek',
url='https://git.bitmessage.org/lee.miller/MiNode',
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
entry_points={'console_scripts': ['minode = minode.main:main']},
classifiers=[
"License :: OSI Approved :: MIT License"
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Topic :: Internet",
"Topic :: Security :: Cryptography",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)

2
start.sh Executable file → Normal file
View File

@ -1,2 +1,2 @@
#!/bin/sh
python3 -m minode.main "$@"
python3 minode/main.py "$@"

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python
"""Custom tests runner script"""
import random # noseq
import sys
import unittest
def unittest_discover():
"""Explicit test suite creation"""
loader = unittest.defaultTestLoader
# randomize the order of tests in test cases
loader.sortTestMethodsUsing = lambda a, b: random.randint(-1, 1)
return loader.discover('minode.tests')
if __name__ == "__main__":
result = unittest.TextTestRunner(verbosity=2).run(unittest_discover())
sys.exit(not result.wasSuccessful())

45
tox.ini
View File

@ -1,45 +0,0 @@
[tox]
envlist = reset,py3{6,7,8,9,10,11},stats
skip_missing_interpreters = true
[testenv]
deps = -rrequirements.txt
commands =
coverage run -a -m tests
[testenv:lint-basic]
deps = flake8
commands =
flake8 minode --count --select=E9,F63,F7,F82 --show-source --statistics
[testenv:reset]
deps =
-rrequirements.txt
bandit
flake8
pylint
commands =
coverage erase
flake8 minode --count --statistics
pylint minode --exit-zero --rcfile=tox.ini
bandit -r --exit-zero -x tests minode
[testenv:stats]
deps = coverage
commands =
coverage report
coverage xml
[coverage:run]
source = minode
omit =
tests.py
*/tests/*
[coverage:report]
ignore_errors = true
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8