Compare commits

..

54 Commits

Author SHA1 Message Date
70f059d9b1
Removed unneeded objects validation in Manager.clean_objects()
All checks were successful
Testing / default (push) Successful in 8m1s
2024-10-23 01:37:32 +03:00
9bb3038547
Unref open connections in the listener loops to let gc collect when closed
All checks were successful
Testing / default (push) Successful in 10m11s
2024-10-22 04:48:45 +03:00
dcaee3febf
Add a test for the closed connection remaining in memory 2024-10-22 04:48:38 +03:00
d3708c6392
Update git path in the README
All checks were successful
Testing / default (push) Successful in 7m34s
2024-10-22 04:35:33 +03:00
f09509893f
Test bootstrapping with a minode process
All checks were successful
Testing / default (push) Successful in 8m45s
2024-09-23 04:35:49 +03:00
144c3240db
Ensure main.bootstrap_from_dns() adds IPv6 addresses to the core nodes 2024-09-23 04:35:49 +03:00
80ca750da2
Added a test for bootstrapping 2024-09-23 04:35:48 +03:00
ce8bef45b8
Reduce number of simultaneous bootstrappers, refill the bootstrap pool 2024-09-23 04:33:22 +03:00
7053ac84f7
Try not to add core nodes to pool 2024-09-23 04:33:21 +03:00
05fcbdb45c
A rough implementation of proper bootstrapping:
added a Bootstrapper connection class, connect() and bootstrap() closures
in Manager.manage_connections(). The later is called while
shared.unchecked_node_pool is empty.
2024-09-23 04:28:33 +03:00
d106078dac
Skip tests instead of failing if I2PController freezes
All checks were successful
Testing / default (push) Successful in 6m0s
2024-07-30 01:32:02 +03:00
a01e2d3469
Add a test for the saved I2P keys 2024-07-29 15:44:36 +03:00
0c898f687b
Expect I2PController to start in TestProcess._wait_time before checks,
thus increasing the maximum wait time, but increase also _connection_limit,
because 2 connections it is only the controller and the listener.
2024-07-29 15:43:09 +03:00
97576f6750
Simplify local testing: test in a container using a docker-test.sh script
All checks were successful
Testing / default (push) Successful in 7m3s
2024-07-23 02:31:43 +03:00
a451a255af
Bump version to 0.3.3
All checks were successful
Testing / default (push) Successful in 4m14s
2024-07-09 05:41:35 +03:00
16031874c7
Relax the condition in TestProcess.test_connections() 2024-07-09 05:41:23 +03:00
aa6e8a57fb
Copy the relevant part of test_network_group() from PyBitmessage 2024-06-25 20:19:31 +03:00
e11aece1a8
Invalidate the version message with a large time offset 2024-06-25 20:19:30 +03:00
908ed1f582
Run listener with a large time offset and ensure it's not connected 2024-06-25 20:19:30 +03:00
1b9648f3de
Correct position of the except clause in listener loop 2024-06-25 20:19:29 +03:00
c4d22c4c21
Add a test case for listener with a process running with --trusted-peer 2024-06-25 20:19:29 +03:00
5ca6e8a3e3
Add a test for connections with large time offset 2024-06-25 20:19:24 +03:00
abf062ac86
Check network group of connections in process test if it isn't for i2p 2024-06-25 19:55:29 +03:00
7719de5338
Define a static method network_group() in NetAddrNoPrefix
and use it in manager.
2024-06-25 19:55:28 +03:00
b0fa199838
A short test for normal connection (with timeout in 5 min) 2024-06-25 19:55:27 +03:00
f9272cbac9
Define a base class for connection to subclass for special purposes 2024-06-25 19:55:27 +03:00
efeabcb4cf
Cleanup the wait time in test_process, correct format in TestProcess.fail()
Some checks failed
Testing / default (push) Failing after 3m49s
2024-06-18 19:09:31 +03:00
12f6e34afe
Add a gitea workflow badge in readme 2024-06-08 04:44:43 +03:00
fd68c6ebe2
Rewrite the github workflow to use by gitea 2024-05-07 19:15:36 +03:00
740654b563
Make tuples from sets before taking random samples 2024-05-07 19:15:36 +03:00
4e77342d4d
.dockerignore for local run 2024-05-07 19:15:36 +03:00
ddba85384d
Update the buildbot_multibuild dir to jammy and enable py311 2024-05-07 19:15:36 +03:00
5a65978678
Fix a mistake in Connection._do_tls_handshake(): return on exception,
log ssl.SSLError reason and discard the node.
2024-05-07 19:15:36 +03:00
d06beded72
Resolve an SSL issue connecting to PyBitmessage 0.6.1 or using openssl 3.0,
log version
2024-05-07 19:15:00 +03:00
c9a3877b92
Lower logging level for connection error messages in I2PDialer 2023-12-24 01:44:08 +02:00
d7ee73843e
Adjust pylint design checker parameters:
raise max-args to 8, add max-attributes with the same value.
2023-10-14 03:43:22 +03:00
9bcaea12cf
Specifically skip B311 in manager by bandit 2023-10-14 01:06:31 +03:00
e4c2c1be16
Make load_data a static method in manager,
use ascii while loading nodes csv.
2023-10-14 01:06:31 +03:00
a7187d8dfd
Suppress some too-many-* pylint design warnings in parse_arguments() 2023-10-14 01:06:31 +03:00
ddf07fd506
Set object tag for object types supporting it 2023-10-12 19:50:18 +03:00
2145f5839e
Cover the main proofofwork call and worker procedure 2023-10-12 19:50:15 +03:00
b806906af4
Add Error message class, handle fatal 2023-10-12 19:49:32 +03:00
3f61bd694b
Define a helper function to read a varint and trim payload 2023-10-12 19:49:32 +03:00
7812e4bbc2
Use shared.stream when assembling i2p_dest object instead of hardcoded 1 2023-10-12 19:49:32 +03:00
fda6ecfe01
Unify and improve message.Version:
- from_message() decoding method as in other messages;
  - support multiple streams and move stream check to connection;
  - use shared.stream instead of hardcoded 1;
  - replace values from shared with the instance attributes in to_bytes(),
    put conventional 1 as services of a remote host.
2023-10-12 19:49:32 +03:00
428580a980
Add a test for version message 2023-10-12 19:49:32 +03:00
399fc6f21f
Improve structure.Object:
- use shared.stream instead of hardcoded 1;
  - reuse pow_initial_hash() in is_valid().
2023-10-12 19:49:31 +03:00
218905739c
Add a test for object covering also proofofwork 2023-10-12 19:48:52 +03:00
e4887734a0
Send ping into inactive connection, not pong 2023-10-07 17:55:49 +03:00
ae40a3d0b8
Update copyright notes 2023-10-07 17:53:11 +03:00
fe508c176b
Update the Specification link 2023-08-23 02:47:03 +03:00
58a80bb4a4
Remove unreachable except clause for ConnectionResetError
- handled in socket.error branch. TODO: follow PEP 3151
2023-08-20 01:19:11 +03:00
8755e56167
Replace Manager.clean_objects() by the extended version from main
and call it upon the Manager start.
2023-08-20 01:14:17 +03:00
45a4a8fd31
Manifest disconnecting 2023-08-16 03:42:16 +03:00
21 changed files with 978 additions and 356 deletions

View File

@ -1,16 +1,21 @@
FROM ubuntu:focal FROM ubuntu:jammy
RUN apt-get update RUN apt-get update
RUN apt-get install -yq software-properties-common RUN apt-get install -yq software-properties-common
RUN apt-add-repository ppa:purplei2p/i2pd RUN apt-add-repository ppa:purplei2p/i2pd && apt-get update -qq
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \ RUN apt-get install -yq --no-install-suggests --no-install-recommends \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd python3-dev python3-pip python-is-python3 python3.11-dev python3.11-venv
RUN apt-get install -yq --no-install-suggests --no-install-recommends sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
RUN python3.9 -m pip install setuptools wheel RUN pip install setuptools wheel
RUN python3.9 -m pip install --upgrade pip tox virtualenv RUN pip install --upgrade pip tox virtualenv
ADD . .
CMD .buildbot/ubuntu/build.sh && .buildbot/ubuntu/test.sh

3
.dockerignore Normal file
View File

@ -0,0 +1,3 @@
.git
.tox
dist

View File

@ -1,44 +1,20 @@
name: Blind Test name: Testing
on: [push]
on: push
jobs: jobs:
default: default:
runs-on: ubuntu-20.04
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
python-version: [3.8]
include:
- os: ubuntu-latest
python-version: 3.9
steps: steps:
- uses: actions/checkout@v2 - name: Install dependencies
- name: Set up Python ${{ matrix.python-version }} run: |
uses: actions/setup-python@v2 apt-get update
with: apt-get install -yq --no-install-suggests --no-install-recommends \
python-version: ${{ matrix.python-version }} python3-dev python3-pip python3-venv python-is-python3
- name: Install python dependencies pip install setuptools wheel
run: | pip install --upgrade pip tox virtualenv
python -m pip install --upgrade pip - name: Check out repository code
pip install wheel uses: actions/checkout@v3
pip install bandit flake8 pylint - name: Quick lint
pip install -r requirements.txt run: tox -e lint-basic
python setup.py install - name: Run tests
- name: Lint run: tox
if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version == '3.8' }}
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 minode --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 minode --count --statistics
pylint minode --exit-zero --rcfile=tox.ini
bandit -r --exit-zero -x tests minode
- name: Test
run: |
export PYTHONWARNINGS=all
coverage run -a -m tests
- name: Summary
run: coverage report

View File

@ -1,6 +1,7 @@
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@ -1,6 +1,6 @@
# MiNode # MiNode
[![Quick Test](https://github.com/g1itch/MiNode/actions/workflows/test.yml/badge.svg)](https://github.com/g1itch/MiNode/actions/workflows/test.yml) [![Testing](https://git.bitmessage.org/Bitmessage/MiNode/actions/workflows/test.yml/badge.svg)](https://git.bitmessage.org/Bitmessage/MiNode/actions?workflow=test.yml)
Python 3 implementation of the Bitmessage protocol. Designed only to route Python 3 implementation of the Bitmessage protocol. Designed only to route
objects inside the network. objects inside the network.
@ -11,7 +11,7 @@ objects inside the network.
## Running ## Running
``` ```
git clone https://git.bitmessage.org/lee.miller/MiNode.git git clone https://git.bitmessage.org/Bitmessage/MiNode.git
``` ```
``` ```
cd MiNode cd MiNode
@ -86,4 +86,4 @@ will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Links ## Links
- [Bitmessage project website](https://bitmessage.org) - [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification) - [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)

11
docker-test.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/sh
DOCKERFILE=.buildbot/ubuntu/Dockerfile
docker build -t minode/tox -f $DOCKERFILE .
if [ $? -gt 0 ]; then
docker build --no-cache -t minode/tox -f $DOCKERFILE .
fi
docker run --rm -it minode/tox

View File

@ -15,8 +15,11 @@ import time
from . import message, shared, structure from . import message, shared, structure
class Connection(threading.Thread): class ConnectionBase(threading.Thread):
"""The connection object""" """
Common code for the connection thread
with minimum command handlers to reuse
"""
def __init__( def __init__(
self, host, port, s=None, network='ip', server=False, self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b'' i2p_remote_dest=b''
@ -110,11 +113,7 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s', 'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e) self.host_print, self.port, e)
data = None data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive() self._process_buffer_receive()
self._process_queue() self._process_queue()
self._send_data() self._send_data()
@ -139,13 +138,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300 time.time() - self.last_message_sent > 300
and self.status == 'fully_established' and self.status == 'fully_established'
): ):
self.send_queue.put(message.Message(b'pong', b'')) self.send_queue.put(message.Message(b'ping', b''))
if self.status == 'disconnecting' or shared.shutting_down: if self.status == 'disconnecting' or shared.shutting_down:
data = None data = None
if not data: if not data:
self.status = 'disconnected' self.status = 'disconnected'
self.s.close() self.s.close()
logging.debug( logging.info(
'Disconnected from %s:%s', self.host_print, self.port) 'Disconnected from %s:%s', self.host_print, self.port)
break break
time.sleep(0.2) time.sleep(0.2)
@ -211,6 +210,13 @@ class Connection(threading.Thread):
context.options = ( context.options = (
ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
| ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE) | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE)
# OP_NO_SSL* is deprecated since 3.6
try:
# TODO: ssl.TLSVersion.TLSv1 is deprecated
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1_2
except AttributeError:
pass
self.s = context.wrap_socket( self.s = context.wrap_socket(
self.s, server_side=self.server, do_handshake_on_connect=False) self.s, server_side=self.server, do_handshake_on_connect=False)
@ -228,11 +234,14 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s', 'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e) self.host_print, self.port, e)
self.status = 'disconnecting' self.status = 'disconnecting'
break if isinstance(e, ssl.SSLError): # pylint: disable=no-member
logging.debug('ssl.SSLError reason: %s', e.reason)
shared.node_pool.discard((self.host, self.port))
return
self.tls = True self.tls = True
logging.debug( logging.debug(
'Established TLS connection with %s:%s', 'Established TLS connection with %s:%s (%s)',
self.host_print, self.port) self.host_print, self.port, self.s.version())
def _send_message(self, m): def _send_message(self, m):
if isinstance(m, message.Message) and m.command == b'object': if isinstance(m, message.Message) and m.command == b'object':
@ -260,11 +269,11 @@ class Connection(threading.Thread):
if len(shared.node_pool) > 10: if len(shared.node_pool) > 10:
addr.update({ addr.update({
structure.NetAddr(1, a[0], a[1]) structure.NetAddr(1, a[0], a[1])
for a in random.sample(shared.node_pool, 10)}) for a in random.sample(tuple(shared.node_pool), 10)})
if len(shared.unchecked_node_pool) > 10: if len(shared.unchecked_node_pool) > 10:
addr.update({ addr.update({
structure.NetAddr(1, a[0], a[1]) structure.NetAddr(1, a[0], a[1])
for a in random.sample(shared.unchecked_node_pool, 10)}) for a in random.sample(tuple(shared.unchecked_node_pool), 10)})
if len(addr) != 0: if len(addr) != 0:
self.send_queue.put(message.Addr(addr)) self.send_queue.put(message.Addr(addr))
@ -278,7 +287,7 @@ class Connection(threading.Thread):
# We limit size of inv messaged to 10000 entries # We limit size of inv messaged to 10000 entries
# because they might time out # because they might time out
# in very slow networks (I2P) # in very slow networks (I2P)
pack = random.sample(to_send, 10000) pack = random.sample(tuple(to_send), 10000)
self.send_queue.put(message.Inv(pack)) self.send_queue.put(message.Inv(pack))
to_send.difference_update(pack) to_send.difference_update(pack)
else: else:
@ -338,100 +347,72 @@ class Connection(threading.Thread):
break break
def _process_message(self, m): def _process_message(self, m):
if m.command == b'version': if m.command == b'verack':
version = message.Version.from_bytes(m.to_bytes())
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
):
self.status = 'disconnecting'
self.send_queue.put(None)
else:
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(
message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
elif m.command == b'verack':
self.verack_received = True self.verack_received = True
logging.debug( logging.debug(
'%s:%s -> %s', self.host_print, self.port, 'verack') '%s:%s -> %s', self.host_print, self.port, 'verack')
if self.server: if self.server:
self.send_queue.put('fully_established') self.send_queue.put('fully_established')
elif m.command == b'inv':
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
to_get.difference_update(shared.junk_vectors)
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
elif m.command == b'object':
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
if obj.is_junk():
return shared.junk_vectors.add(obj.vector)
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
elif m.command == b'addr':
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
elif m.command == b'ping': elif m.command == b'ping':
logging.debug('%s:%s -> ping', self.host_print, self.port) logging.debug('%s:%s -> ping', self.host_print, self.port)
self.send_queue.put(message.Message(b'pong', b'')) self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error': elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning( logging.warning(
'%s:%s -> error: %s', self.host_print, self.port, m.payload) '%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
else: else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m) try:
getattr(self, '_process_msg_{}'.format(m.command.decode()))(m)
except (AttributeError, UnicodeDecodeError):
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
def _process_msg_version(self, m):
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
):
self.status = 'disconnecting'
self.send_queue.put(None)
else:
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
def _process_msg_addr(self, m):
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
if (a.host, a.port) not in shared.core_nodes:
shared.unchecked_node_pool.add((a.host, a.port))
def _request_objects(self): def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100: if self.vectors_to_get and len(self.vectors_requested) < 100:
@ -447,7 +428,7 @@ class Connection(threading.Thread):
logging.info( logging.info(
'Queued %s vectors to get', len(self.vectors_to_get)) 'Queued %s vectors to get', len(self.vectors_to_get))
if len(self.vectors_to_get) > 64: if len(self.vectors_to_get) > 64:
pack = random.sample(self.vectors_to_get, 64) pack = random.sample(tuple(self.vectors_to_get), 64)
self.send_queue.put(message.GetData(pack)) self.send_queue.put(message.GetData(pack))
self.vectors_requested.update({ self.vectors_requested.update({
vector: time.time() for vector in pack vector: time.time() for vector in pack
@ -477,7 +458,7 @@ class Connection(threading.Thread):
logging.info( logging.info(
'Preparing to send %s objects', len(self.vectors_to_send)) 'Preparing to send %s objects', len(self.vectors_to_send))
if len(self.vectors_to_send) > 16: if len(self.vectors_to_send) > 16:
to_send = random.sample(self.vectors_to_send, 16) to_send = random.sample(tuple(self.vectors_to_send), 16)
self.vectors_to_send.difference_update(to_send) self.vectors_to_send.difference_update(to_send)
else: else:
to_send = self.vectors_to_send.copy() to_send = self.vectors_to_send.copy()
@ -490,4 +471,50 @@ class Connection(threading.Thread):
message.Message(b'object', obj.to_bytes())) message.Message(b'object', obj.to_bytes()))
class Connection(ConnectionBase):
"""The connection with all commands implementation"""
def _process_msg_inv(self, m):
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
def _process_msg_object(self, m):
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m):
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
class Bootstrapper(ConnectionBase):
"""A special type of connection to find IP nodes"""
def _process_msg_addr(self, m):
super()._process_msg_addr(m)
shared.node_pool.discard((self.host, self.port))
self.status = 'disconnecting'
self.send_queue.put(None)
shared.connection = Connection shared.connection = Connection

View File

@ -37,7 +37,7 @@ class I2PDialer(I2PThread):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n') self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split() self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply: if b'RESULT=OK' not in self.version_reply:
logging.warning('Error while connecting to %s', self.destination) logging.debug('Error while connecting to %s', self.destination)
self.success = False self.success = False
self._send( self._send(
@ -45,6 +45,5 @@ class I2PDialer(I2PThread):
+ self.destination + b'\n') + self.destination + b'\n')
reply = self._receive_line().split(b' ') reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply: if b'RESULT=OK' not in reply:
logging.warning( logging.debug('Error while connecting to %s', self.destination)
'Error while connecting to %s', self.destination)
self.success = False self.success = False

View File

@ -49,6 +49,7 @@ class I2PListener(I2PThread):
destination, 'i2p', self.s, 'i2p', True, destination) destination, 'i2p', self.s, 'i2p', True, destination)
c.start() c.start()
self.state.connections.add(c) self.state.connections.add(c)
c = None
self.new_socket() self.new_socket()
except socket.timeout: except socket.timeout:
pass pass

View File

@ -28,13 +28,15 @@ class Listener(threading.Thread):
break break
try: try:
conn, addr = self.s.accept() conn, addr = self.s.accept()
logging.info('Incoming connection from: %s:%i', *addr[:2])
with shared.connections_lock:
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(*addr[:2], conn, server=True)
c.start()
shared.connections.add(c)
except socket.timeout: except socket.timeout:
pass continue
logging.info('Incoming connection from: %s:%i', *addr[:2])
with shared.connections_lock:
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(*addr[:2], conn, server=True)
c.start()
shared.connections.add(c)
c = None

View File

@ -2,11 +2,9 @@
"""Functions for starting the program""" """Functions for starting the program"""
import argparse import argparse
import base64 import base64
import csv
import logging import logging
import multiprocessing import multiprocessing
import os import os
import pickle
import signal import signal
import socket import socket
@ -22,7 +20,7 @@ def handler(s, f): # pylint: disable=unused-argument
shared.shutting_down = True shared.shutting_down = True
def parse_arguments(): def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
"""Parsing arguments""" """Parsing arguments"""
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int) parser.add_argument('-p', '--port', help='Port to listen on', type=int)
@ -102,69 +100,23 @@ def parse_arguments():
shared.i2p_transient = True shared.i2p_transient = True
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns(): def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes""" """Addes addresses of bootstrap servers to core nodes"""
try: try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): for port in (8080, 8444):
shared.unchecked_node_pool.add((item[4][0], 8080)) for item in socket.getaddrinfo(
logging.debug( 'bootstrap{}.bitmessage.org'.format(port), 80,
'Adding %s to unchecked_node_pool' proto=socket.IPPROTO_TCP
' based on DNS bootstrap method', item[4][0]) ):
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): try:
shared.unchecked_node_pool.add((item[4][0], 8444)) addr = item[4][0]
logging.debug( socket.inet_pton(item[0], addr)
'Adding %s to unchecked_node_pool' except (TypeError, socket.error):
' based on DNS bootstrap method', item[4][0]) continue
else:
shared.core_nodes.add((addr, port))
except socket.gaierror:
logging.info('Failed to do a DNS query')
except Exception: except Exception:
logging.info('Error during DNS bootstrap', exc_info=True) logging.info('Error during DNS bootstrap', exc_info=True)
@ -290,8 +242,6 @@ def main():
'Error while creating data directory in: %s', 'Error while creating data directory in: %s',
shared.data_directory, exc_info=True) shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer: if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns() bootstrap_from_dns()
@ -300,18 +250,6 @@ def main():
# so we can collect I2P destination objects # so we can collect I2P destination objects
start_i2p_listener() start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager() manager = Manager()
manager.start() manager.start()

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects""" """The main thread, managing connections, nodes and objects"""
import base64 import base64
import csv
import logging import logging
import os import os
import pickle import pickle
@ -10,7 +11,7 @@ import threading
import time import time
from . import proofofwork, shared, structure from . import proofofwork, shared, structure
from .connection import Connection from .connection import Bootstrapper, Connection
from .i2p import I2PDialer from .i2p import I2PDialer
@ -19,15 +20,24 @@ class Manager(threading.Thread):
def __init__(self): def __init__(self):
super().__init__(name='Manager') super().__init__(name='Manager')
self.q = queue.Queue() self.q = queue.Queue()
self.bootstrap_pool = []
self.last_cleaned_objects = time.time() self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time() self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time() self.last_pickled_objects = time.time()
self.last_pickled_nodes = time.time() self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start # Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \ self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
def fill_bootstrap_pool(self):
"""Populate the bootstrap pool by core nodes and checked ones"""
self.bootstrap_pool = list(shared.core_nodes.union(shared.node_pool))
random.shuffle(self.bootstrap_pool)
def run(self): def run(self):
self.load_data()
self.clean_objects()
self.fill_bootstrap_pool()
while True: while True:
time.sleep(0.8) time.sleep(0.8)
now = time.time() now = time.time()
@ -53,23 +63,47 @@ class Manager(threading.Thread):
@staticmethod @staticmethod
def clean_objects(): def clean_objects():
for vector in set(shared.objects): for vector in set(shared.objects):
# FIXME: no need to check is_valid() here
if shared.objects[vector].is_expired(): if shared.objects[vector].is_expired():
with shared.objects_lock:
del shared.objects[vector]
logging.debug( logging.debug(
'Deleted expired object: %s', 'Deleted expired object: %s',
base64.b16encode(vector).decode()) base64.b16encode(vector).decode())
with shared.objects_lock:
del shared.objects[vector]
@staticmethod def manage_connections(self):
def manage_connections(): """Open new connections if needed, remove closed ones"""
hosts = set() hosts = set()
def connect(target, connection_class=Connection):
"""
Open a connection of *connection_class*
to the *target* (host, port)
"""
c = connection_class(*target)
c.start()
with shared.connections_lock:
shared.connections.add(c)
def bootstrap():
"""Bootstrap from DNS seed-nodes and known nodes"""
try:
target = self.bootstrap_pool.pop()
except IndexError:
logging.warning(
'Ran out of bootstrap nodes, refilling')
self.fill_bootstrap_pool()
return
logging.info('Starting a bootstrapper for %s:%s', *target)
connect(target, Bootstrapper)
outgoing_connections = 0 outgoing_connections = 0
for c in shared.connections.copy(): for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected': if not c.is_alive() or c.status == 'disconnected':
with shared.connections_lock: with shared.connections_lock:
shared.connections.remove(c) shared.connections.remove(c)
else: else:
hosts.add(c.host) hosts.add(structure.NetAddrNoPrefix.network_group(c.host))
if not c.server: if not c.server:
outgoing_connections += 1 outgoing_connections += 1
@ -90,36 +124,41 @@ class Manager(threading.Thread):
if shared.ip_enabled: if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16: if len(shared.unchecked_node_pool) > 16:
to_connect.update(random.sample( to_connect.update(random.sample(
shared.unchecked_node_pool, 16)) tuple(shared.unchecked_node_pool), 16))
else: else:
to_connect.update(shared.unchecked_node_pool) to_connect.update(shared.unchecked_node_pool)
if outgoing_connections < shared.outgoing_connections / 2:
bootstrap()
shared.unchecked_node_pool.difference_update(to_connect) shared.unchecked_node_pool.difference_update(to_connect)
if len(shared.node_pool) > 8: if len(shared.node_pool) > 8:
to_connect.update(random.sample(shared.node_pool, 8)) to_connect.update(random.sample(
tuple(shared.node_pool), 8))
else: else:
to_connect.update(shared.node_pool) to_connect.update(shared.node_pool)
if shared.i2p_enabled: if shared.i2p_enabled:
if len(shared.i2p_unchecked_node_pool) > 16: if len(shared.i2p_unchecked_node_pool) > 16:
to_connect.update( to_connect.update(random.sample(
random.sample(shared.i2p_unchecked_node_pool, 16)) tuple(shared.i2p_unchecked_node_pool), 16))
else: else:
to_connect.update(shared.i2p_unchecked_node_pool) to_connect.update(shared.i2p_unchecked_node_pool)
shared.i2p_unchecked_node_pool.difference_update(to_connect) shared.i2p_unchecked_node_pool.difference_update(to_connect)
if len(shared.i2p_node_pool) > 8: if len(shared.i2p_node_pool) > 8:
to_connect.update(random.sample(shared.i2p_node_pool, 8)) to_connect.update(random.sample(
tuple(shared.i2p_node_pool), 8))
else: else:
to_connect.update(shared.i2p_node_pool) to_connect.update(shared.i2p_node_pool)
for addr in to_connect: for host, port in to_connect:
if addr[0] in hosts: group = structure.NetAddrNoPrefix.network_group(host)
if group in hosts:
continue continue
if addr[1] == 'i2p' and shared.i2p_enabled: if port == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and addr[0] != shared.i2p_dest_pub: if shared.i2p_session_nick and host != shared.i2p_dest_pub:
try: try:
d = I2PDialer( d = I2PDialer(
shared, shared,
addr[0], shared.i2p_session_nick, host, shared.i2p_session_nick,
shared.i2p_sam_host, shared.i2p_sam_port) shared.i2p_sam_host, shared.i2p_sam_port)
d.start() d.start()
hosts.add(d.destination) hosts.add(d.destination)
@ -131,13 +170,63 @@ class Manager(threading.Thread):
else: else:
continue continue
else: else:
c = Connection(addr[0], addr[1]) connect((host, port))
c.start() hosts.add(group)
hosts.add(c.host)
with shared.connections_lock:
shared.connections.add(c)
shared.hosts = hosts shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {(row[0], int(row[1])) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod @staticmethod
def pickle_objects(): def pickle_objects():
try: try:
@ -153,17 +242,18 @@ class Manager(threading.Thread):
@staticmethod @staticmethod
def pickle_nodes(): def pickle_nodes():
if len(shared.node_pool) > 10000: if len(shared.node_pool) > 10000:
shared.node_pool = set(random.sample(shared.node_pool, 10000)) shared.node_pool = set(random.sample(
tuple(shared.node_pool), 10000))
if len(shared.unchecked_node_pool) > 1000: if len(shared.unchecked_node_pool) > 1000:
shared.unchecked_node_pool = set( shared.unchecked_node_pool = set(random.sample(
random.sample(shared.unchecked_node_pool, 1000)) tuple(shared.unchecked_node_pool), 1000))
if len(shared.i2p_node_pool) > 1000: if len(shared.i2p_node_pool) > 1000:
shared.i2p_node_pool = set( shared.i2p_node_pool = set(random.sample(
random.sample(shared.i2p_node_pool, 1000)) tuple(shared.i2p_node_pool), 1000))
if len(shared.i2p_unchecked_node_pool) > 100: if len(shared.i2p_unchecked_node_pool) > 100:
shared.i2p_unchecked_node_pool = set( shared.i2p_unchecked_node_pool = set(random.sample(
random.sample(shared.i2p_unchecked_node_pool, 100)) tuple(shared.i2p_unchecked_node_pool), 100))
try: try:
with open( with open(
@ -187,5 +277,5 @@ class Manager(threading.Thread):
obj = structure.Object( obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600), b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
1, dest_pub_raw) shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj) proofofwork.do_pow_and_publish(obj)

View File

@ -91,12 +91,19 @@ class Message():
return cls(h.command, payload) return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version(): class Version():
"""The version message payload""" """The version message payload"""
def __init__( def __init__(
self, host, port, protocol_version=shared.protocol_version, self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce, services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent user_agent=shared.user_agent, streams=None
): ):
self.host = host self.host = host
self.port = port self.port = port
@ -105,6 +112,9 @@ class Version():
self.services = services self.services = services
self.nonce = nonce self.nonce = nonce
self.user_agent = user_agent self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self): def __repr__(self):
return ( return (
@ -119,26 +129,29 @@ class Version():
payload += struct.pack('>Q', self.services) payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time())) payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix( payload += structure.NetAddrNoPrefix(
shared.services, self.host, self.port).to_bytes() 1, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix( payload += structure.NetAddrNoPrefix(
shared.services, '127.0.0.1', 8444).to_bytes() self.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce payload += self.nonce
payload += structure.VarInt(len(shared.user_agent)).to_bytes() payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += shared.user_agent payload += self.user_agent
payload += 2 * structure.VarInt(1).to_bytes() payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
return Message(b'version', payload).to_bytes() return Message(b'version', payload).to_bytes()
@classmethod @classmethod
def from_bytes(cls, b): def from_message(cls, m):
m = Message.from_bytes(b)
payload = m.payload payload = m.payload
( # unused: timestamp, net_addr_local ( # unused: net_addr_local
protocol_version, services, _, net_addr_remote, _, nonce protocol_version, services, timestamp, net_addr_remote, _, nonce
) = struct.unpack('>IQQ26s26s8s', payload[:80]) ) = struct.unpack('>IQQ26s26s8s', payload[:80])
if abs(time.time() - timestamp) > 3600:
raise ValueError('remote time offset is too large')
net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote) net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote)
host = net_addr_remote.host host = net_addr_remote.host
@ -146,20 +159,24 @@ class Version():
payload = payload[80:] payload = payload[80:]
user_agent_varint_length = structure.VarInt.length(payload[0]) user_agent_length, payload = _payload_read_int(payload)
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent = payload[:user_agent_length] user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:] payload = payload[user_agent_length:]
if payload != b'\x01\x01': streams_count, payload = _payload_read_int(payload)
raise ValueError('message not for stream 1') if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
return cls(host, port, protocol_version, services, nonce, user_agent) while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
class Inv(): class Inv():
@ -180,11 +197,7 @@ class Inv():
def from_message(cls, m): def from_message(cls, m):
payload = m.payload payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0]) vector_count, payload = _payload_read_int(payload)
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set() vectors = set()
@ -216,11 +229,7 @@ class GetData():
def from_message(cls, m): def from_message(cls, m):
payload = m.payload payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0]) vector_count, payload = _payload_read_int(payload)
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set() vectors = set()
@ -252,11 +261,8 @@ class Addr():
def from_message(cls, m): def from_message(cls, m):
payload = m.payload payload = m.payload
addr_count_varint_length = structure.VarInt.length(payload[0]) # not validating addr_count
# addr_count = structure.VarInt.from_bytes( _, payload = _payload_read_int(payload)
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
addresses = set() addresses = set()
@ -265,3 +271,37 @@ class Addr():
payload = payload[38:] payload = payload[38:]
return cls(addresses) return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -16,14 +16,12 @@ ip_enabled = True
log_level = logging.INFO log_level = logging.INFO
curve = 714 # secp256k1
key_length = 32
magic_bytes = b'\xe9\xbe\xb4\xd9' magic_bytes = b'\xe9\xbe\xb4\xd9'
protocol_version = 3 protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL services = 3 # NODE_NETWORK, NODE_SSL
stream = 1 stream = 1
nonce = os.urandom(8) nonce = os.urandom(8)
user_agent = b'/MiNode:0.3.2/' user_agent = b'/MiNode:0.3.3/'
timeout = 600 timeout = 600
header_length = 24 header_length = 24
i2p_dest_obj_type = 0x493250 i2p_dest_obj_type = 0x493250
@ -65,5 +63,4 @@ outgoing_connections = 8
connection_limit = 250 connection_limit = 250
objects = {} objects = {}
junk_vectors = set()
objects_lock = threading.Lock() objects_lock = threading.Lock()

View File

@ -60,6 +60,12 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512( self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32] self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self): def __repr__(self):
return 'object, vector: {}'.format( return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode()) base64.b16encode(self.vector).decode())
@ -112,18 +118,16 @@ class Object():
'Invalid object %s, reason: payload is too long', 'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode()) base64.b16encode(self.vector).decode())
return False return False
if self.stream_number != 1: if self.stream_number != shared.stream:
logging.warning( logging.warning(
'Invalid object %s, reason: not in stream 1', 'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode()) base64.b16encode(self.vector).decode(), shared.stream)
return False return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes( pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512( hashlib.sha512(hashlib.sha512(
self.nonce + h).digest()).digest()[:8], 'big') self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
target = self.pow_target() target = self.pow_target()
if target < pow_value: if target < pow_value:
logging.warning( logging.warning(
@ -132,31 +136,6 @@ class Object():
return False return False
return True return True
def is_junk(self):
"""
Returns True if an object with encrypted payload has
curve number or key length different from those defined in shared.
"""
if self.object_type not in (1, 2, 3):
return False
if self.object_type == 2:
sp = 0
elif self.object_type == 1 and self.version != 4:
return False
else:
sp = 32
sp += 16
curve = struct.unpack('!H', self.object_payload[sp:sp + 2])[0]
if curve != shared.curve:
return True
length = struct.unpack('!H', self.object_payload[sp + 2:sp + 4])[0]
if length > shared.key_length:
return True
length = struct.unpack('!H', self.object_payload[sp + 36:sp + 38])[0]
if length > shared.key_length:
return True
return False
def pow_target(self): def pow_target(self):
"""Compute PoW target""" """Compute PoW target"""
data = self.to_bytes()[8:] data = self.to_bytes()[8:]
@ -194,6 +173,21 @@ class NetAddrNoPrefix():
b += struct.pack('>H', int(self.port)) b += struct.pack('>H', int(self.port))
return b return b
@staticmethod
def network_group(host):
"""A simplified network group identifier from pybitmessage protocol"""
try:
host = socket.inet_pton(socket.AF_INET, host)
return host[:2]
except socket.error:
try:
host = socket.inet_pton(socket.AF_INET6, host)
return host[:12]
except OSError:
return host
except TypeError:
return host
@classmethod @classmethod
def from_bytes(cls, b): def from_bytes(cls, b):
services, host, port = struct.unpack('>Q16sH', b) services, host, port = struct.unpack('>Q16sH', b)

View File

@ -0,0 +1,57 @@
"""Tests for memory usage"""
import gc
import time
from minode import shared
from .test_network import TestProcessProto, run_listener
class TestListener(TestProcessProto):
"""A separate test case for Listener with a process with --trusted-peer"""
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
def setUp(self):
shared.shutting_down = False
@classmethod
def tearDownClass(cls):
super().tearDownClass()
shared.shutting_down = False
def test_listener(self):
"""Start Listener and disconnect a client"""
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
shared.connection_limit = 2
connected = False
started = time.time()
while not connected:
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to establish the connection')
for c in shared.connections:
if c.status == 'fully_established':
connected = True
if not self._stop_process(10):
self.fail('Failed to stop the client process')
for c in shared.connections.copy():
if not c.is_alive() or c.status == 'disconnected':
shared.connections.remove(c)
c = None
break
else:
self.fail('The connection is alive')
gc.collect()
for obj in gc.get_objects():
if (
isinstance(obj, shared.connection)
and obj not in shared.connections
):
self.fail('Connection %s remains in memory' % obj)

View File

@ -1,4 +1,6 @@
"""Tests for messages""" """Tests for messages"""
import struct
import time
import unittest import unittest
from binascii import unhexlify from binascii import unhexlify
@ -13,7 +15,7 @@ from minode.shared import magic_bytes
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed, # 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444 # 8444
# ) for _ in range(1000)] # ) for _ in range(1000)]
sample_data = unhexlify( sample_addr_data = unhexlify(
'fd01f4' + ( 'fd01f4' + (
'0000000060f420b30000000' '0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc' '1000000000000000100000000000000000000ffff7f00000120fc'
@ -24,6 +26,21 @@ sample_data = unhexlify(
sample_ping_msg = unhexlify( sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374') 'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase): class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages""" """Test assembling and disassembling of network mesages"""
@ -47,7 +64,7 @@ class TestMessage(unittest.TestCase):
def test_addr(self): def test_addr(self):
"""Test addr messages""" """Test addr messages"""
msg = message.Message(b'addr', sample_data) msg = message.Message(b'addr', sample_addr_data)
addr_packet = message.Addr.from_message(msg) addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500) self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop() address = addr_packet.addresses.pop()
@ -55,3 +72,39 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1) self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444) self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1') self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
with self.assertRaises(ValueError):
# large time offset
version_packet = message.Version.from_message(msg)
msg.payload = (
msg.payload[:12] + struct.pack('>Q', int(time.time()))
+ msg.payload[20:])
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -0,0 +1,279 @@
"""Tests for network connections"""
import ipaddress
import logging
import os
import random
import unittest
import tempfile
import time
from contextlib import contextmanager
from minode import connection, main, shared
from minode.listener import Listener
from minode.manager import Manager
from .test_process import TestProcessProto
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s')
@contextmanager
def time_offset(offset):
"""
Replace time.time() by a mock returning a constant value
with given offset from current time.
"""
started = time.time()
time_call = time.time
try:
time.time = lambda: started + offset
yield time_call
finally:
time.time = time_call
@contextmanager
def run_listener(host='localhost', port=8444):
"""
Run the Listener with zero connection limit and
reset variables in shared after its stop.
"""
connection_limit = shared.connection_limit
shared.connection_limit = 0
try:
listener = Listener(host, port)
listener.start()
yield listener
except OSError:
yield
finally:
shared.connection_limit = connection_limit
shared.connections.clear()
shared.shutting_down = True
time.sleep(1)
class TestNetwork(unittest.TestCase):
"""Test case starting connections"""
@classmethod
def setUpClass(cls):
shared.data_directory = tempfile.gettempdir()
def setUp(self):
shared.core_nodes.clear()
shared.unchecked_node_pool.clear()
shared.objects = {}
try:
os.remove(os.path.join(shared.data_directory, 'objects.pickle'))
except FileNotFoundError:
pass
def _make_initial_nodes(self):
Manager.load_data()
core_nodes_len = len(shared.core_nodes)
self.assertGreaterEqual(core_nodes_len, 3)
main.bootstrap_from_dns()
self.assertGreaterEqual(len(shared.core_nodes), core_nodes_len)
for host, _ in shared.core_nodes:
try:
ipaddress.IPv4Address(host)
except ipaddress.AddressValueError:
try:
ipaddress.IPv6Address(host)
except ipaddress.AddressValueError:
self.fail('Found not an IP address in the core nodes')
break
else:
self.fail('No IPv6 address found in the core nodes')
def test_bootstrap(self):
"""Start bootstrappers and check node pool"""
if shared.core_nodes:
shared.core_nodes = set()
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
self._make_initial_nodes()
self.assertEqual(len(shared.unchecked_node_pool), 0)
for node in shared.core_nodes:
c = connection.Bootstrapper(*node)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after running %s bootstrappers' % len(shared.core_nodes))
def test_connection(self):
"""Check a normal connection - should receive objects"""
self._make_initial_nodes()
started = time.time()
nodes = list(shared.core_nodes.union(shared.unchecked_node_pool))
random.shuffle(nodes)
for node in nodes:
# unknown = node not in shared.node_pool
# self.assertTrue(unknown)
unknown = True
shared.node_pool.discard(node)
c = connection.Connection(*node)
c.start()
connection_started = time.time()
while c.status not in ('disconnected', 'failed'):
# The addr of established connection is added to nodes pool
if unknown and c.status == 'fully_established':
unknown = False
self.assertIn(node, shared.node_pool)
if shared.objects or time.time() - connection_started > 90:
c.status = 'disconnecting'
if time.time() - started > 300:
c.status = 'disconnecting'
self.fail('Failed to receive an object in %s sec' % 300)
time.sleep(0.2)
if shared.objects: # got some objects
break
else:
self.fail('Failed to establish a proper connection')
def test_time_offset(self):
"""Assert the network bans for large time offset"""
def try_connect(nodes, timeout, call):
started = call()
for node in nodes:
c = connection.Connection(*node)
c.start()
while call() < started + timeout:
if c.status == 'fully_established':
return 'Established a connection'
if c.status in ('disconnected', 'failed'):
break
time.sleep(0.2)
else:
return 'Spent too much time trying to connect'
def time_offset_connections(nodes, offset):
"""Spoof time.time and open connections with given time offset"""
with time_offset(offset) as time_call:
result = try_connect(nodes, 200, time_call)
if result:
self.fail(result)
self._make_initial_nodes()
nodes = random.sample(
tuple(shared.core_nodes.union(shared.unchecked_node_pool)), 5)
time_offset_connections(nodes, 4000)
time_offset_connections(nodes, -4000)
class TestListener(TestProcessProto):
"""A separate test case for Listener with a process with --trusted-peer"""
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
def setUp(self):
shared.shutting_down = False
@classmethod
def tearDownClass(cls):
super().tearDownClass()
shared.shutting_down = False
def test_listener(self):
"""Start Listener and try to connect"""
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
c = connection.Connection('127.0.0.1', 8444)
shared.connections.add(c)
for _ in range(30):
if len(shared.connections) > 1:
self.fail('The listener ignored connection limit')
time.sleep(0.5)
shared.connection_limit = 2
c.start()
started = time.time()
while c.status not in ('disconnected', 'failed'):
if c.status == 'fully_established':
self.fail('Connected to itself')
if time.time() - started > 90:
c.status = 'disconnecting'
time.sleep(0.2)
server = None
started = time.time()
while not server:
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to establish the connection')
for c in shared.connections:
if c.status == 'fully_established':
server = c
self.assertTrue(server.server)
while not self.process.connections():
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to connect to listener')
client = self.process.connections()[0]
self.assertEqual(client.raddr[0], '127.0.0.1')
self.assertEqual(client.raddr[1], 8444)
self.assertEqual(server.host, client.laddr[0])
# self.assertEqual(server.port, client.laddr[1])
server.status = 'disconnecting'
self.assertFalse(listener.is_alive())
def test_listener_timeoffset(self):
"""Run listener with a large time offset - shouldn't connect"""
with time_offset(4000):
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
shared.connection_limit = 2
for _ in range(30):
for c in shared.connections:
if c.status == 'fully_established':
self.fail('Established a connection')
time.sleep(0.5)
class TestBootstrapProcess(TestProcessProto):
"""A separate test case for bootstrapping with a minode process"""
_listen = True
_connection_limit = 24
def test_bootstrap(self):
"""Start a bootstrapper for the local process and check node pool"""
if shared.unchecked_node_pool:
shared.unchecked_node_pool = set()
started = time.time()
while not self.connections():
if time.time() - started > 60:
self.fail('Failed to establish a connection')
time.sleep(1)
for _ in range(3):
c = connection.Bootstrapper('127.0.0.1', 8444)
c.start()
c.join()
if len(shared.unchecked_node_pool) > 2:
break
else:
self.fail(
'Failed to find at least 3 nodes'
' after 3 tries to bootstrap with the local process')

View File

@ -1,14 +1,18 @@
"""Blind tests, starting the minode process""" """Blind tests, starting the minode process"""
import unittest import os
import signal import signal
import socket import socket
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import time import time
import unittest
import psutil import psutil
from minode.i2p import util
from minode.structure import NetAddrNoPrefix
try: try:
socket.socket().bind(('127.0.0.1', 7656)) socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True i2p_port_free = True
@ -19,7 +23,7 @@ except (OSError, socket.error):
class TestProcessProto(unittest.TestCase): class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow""" """Test process attributes, common flow"""
_process_cmd = ['minode'] _process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 10 _connection_limit = 4 if sys.platform.startswith('win') else 8
_listen = False _listen = False
_listening_port = None _listening_port = None
@ -69,17 +73,20 @@ class TestProcessProto(unittest.TestCase):
class TestProcessShutdown(TestProcessProto): class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM""" """Separate test case for SIGTERM"""
_wait_time = 30
# longer wait time because it's not a benchmark
def test_shutdown(self): def test_shutdown(self):
"""Send to minode SIGTERM and ensure it stopped""" """Send to minode SIGTERM and ensure it stopped"""
# longer wait time because it's not a benchmark
self.assertTrue( self.assertTrue(
self._stop_process(20), self._stop_process(self._wait_time),
'%s has not stopped in 20 sec' % ' '.join(self._process_cmd)) '%s has not stopped in %i sec' % (
' '.join(self._process_cmd), self._wait_time))
class TestProcess(TestProcessProto): class TestProcess(TestProcessProto):
"""The test case for minode process""" """The test case for minode process"""
_wait_time = 120 _wait_time = 180
_check_limit = False _check_limit = False
def test_connections(self): def test_connections(self):
@ -99,14 +106,20 @@ class TestProcess(TestProcessProto):
time.sleep(1) time.sleep(1)
for _ in range(self._wait_time * 2): for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2: if len(self.connections()) >= self._connection_limit / 2:
_time_to_connect = round(time.time() - _started) _time_to_connect = round(time.time() - _started)
break break
if '--i2p' not in self._process_cmd:
groups = []
for c in self.connections():
group = NetAddrNoPrefix.network_group(c.raddr[0])
self.assertNotIn(group, groups)
groups.append(group)
time.sleep(0.5) time.sleep(0.5)
else: else:
self.fail( self.fail(
'Failed establish at least %s connections in %s sec' 'Failed to establish at least %i connections in %s sec'
% (self._connection_limit / 2, self._wait_time)) % (int(self._connection_limit / 2), self._wait_time))
if self._check_limit: if self._check_limit:
continue_check_limit(_time_to_connect) continue_check_limit(_time_to_connect)
@ -127,11 +140,39 @@ class TestProcess(TestProcessProto):
class TestProcessI2P(TestProcess): class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP""" """Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip'] _process_cmd = ['minode', '--i2p', '--no-ip']
_connection_limit = 4
_wait_time = 120
_listen = True _listen = True
_listening_port = 8448 _listening_port = 8448
@classmethod
def setUpClass(cls):
cls.freezed = False
cls.keyfile = os.path.join(cls.home, 'i2p_dest.pub')
saved = os.path.isfile(cls.keyfile)
super().setUpClass()
for _ in range(cls._wait_time):
if saved:
if cls.process.num_threads() > 3:
break
elif os.path.isfile(cls.keyfile):
break
time.sleep(1)
else:
cls.freezed = True
def setUp(self):
"""Skip any test if I2PController freezed"""
if self.freezed:
raise unittest.SkipTest(
'I2PController has probably failed to start')
def test_saved_keys(self):
"""Check saved i2p keys"""
with open(self.keyfile, 'br') as src:
i2p_dest_pub = src.read()
with open(os.path.join(self.home, 'i2p_dest_priv.key'), 'br') as src:
i2p_dest_priv = src.read()
self.assertEqual(util.pub_from_priv(i2p_dest_priv), i2p_dest_pub)
def test_connections(self): def test_connections(self):
"""Ensure all connections are I2P""" """Ensure all connections are I2P"""
super().test_connections() super().test_connections()

View File

@ -1,9 +1,13 @@
"""Tests for structures""" """Tests for structures"""
import unittest import base64
import logging
import queue
import struct import struct
import time
import unittest
from binascii import unhexlify from binascii import unhexlify
from minode import structure from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost() # host pregenerated by pybitmessage.protocol.encodeHost()
@ -13,10 +17,24 @@ sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001' '0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90') '260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase): class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing""" """Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self): def test_varint(self):
"""Test varint serializing and deserializing""" """Test varint serializing and deserializing"""
s = structure.VarInt(0) s = structure.VarInt(0)
@ -85,3 +103,92 @@ class TestStructure(unittest.TestCase):
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1) addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:]) self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_network_group(self):
"""Test various types of network groups"""
test_ip = '1.2.3.4'
self.assertEqual(
b'\x01\x02', structure.NetAddrNoPrefix.network_group(test_ip))
self.assertEqual(
structure.NetAddrNoPrefix.network_group('8.8.8.8'),
structure.NetAddrNoPrefix.network_group('8.8.4.4'))
self.assertNotEqual(
structure.NetAddrNoPrefix.network_group('1.1.1.1'),
structure.NetAddrNoPrefix.network_group('8.8.8.8'))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
structure.NetAddrNoPrefix.network_group(test_ip))
for test_ip in (
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None
):
self.assertEqual(
test_ip, structure.NetAddrNoPrefix.network_group(test_ip))
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -1,5 +1,5 @@
[tox] [tox]
envlist = reset,py{36,37,38,39,310},stats envlist = reset,py3{6,7,8,9,10,11},stats
skip_missing_interpreters = true skip_missing_interpreters = true
[testenv] [testenv]
@ -41,4 +41,5 @@ ignore_errors = true
[pylint.main] [pylint.main]
disable = invalid-name,consider-using-f-string,fixme disable = invalid-name,consider-using-f-string,fixme
max-args = 7 max-args = 8
max-attributes = 8