Compare commits

...

25 Commits

Author SHA1 Message Date
Lee Miller edbfc12a20
Merge branch 'proofofwork' into testing 2023-10-17 04:36:37 +03:00
Lee Miller 9ba35226e7
Merge branch 'network' into testing 2023-10-17 04:32:04 +03:00
Lee Miller 8d04f28ecd
Merge branch 'doc' into testing 2023-10-17 04:31:30 +03:00
Lee Miller dba2880568
Merge branch 'ssl' into testing 2023-10-17 04:30:53 +03:00
Lee Miller 2e19e12933
Implement permanently running worker thread 2023-10-17 00:14:54 +03:00
Lee Miller ff63139d78
Document proofofwork public calls 2023-10-17 00:14:48 +03:00
Lee Miller 1139bdb8b7
Use pip install -e directly to help tox install the extension.
use_develop doesn't work for some reason. Other envs will lack the
bitmsghash*.so and more code in the proofofwork will be covered.
2023-10-16 01:29:46 +03:00
Lee Miller 794cf2657c
Initial efforts to make bitmsghash a valid python extension 2023-10-16 01:29:46 +03:00
Lee Miller 450839079c
Add a glob fallback searching bitmsghash*.so
(e.g. bitmsghash.cpython-39-x86_64-linux-gnu.so)
2023-10-16 01:29:46 +03:00
Lee Miller 224feb28d0
Update buildbot_multibuild Dockerfile with debs needed for compiling C ext 2023-10-16 01:29:45 +03:00
Lee Miller 904a554631
Update pow check approach in test_object using proofofwork.Worker 2023-10-16 01:29:45 +03:00
Lee Miller df34857d6a
Integrate bitmsghash.bmpow - PyBitmessage's CPoW extension,
define a threading.Thread subclass Worker() for that.
2023-10-16 01:29:13 +03:00
Lee Miller 4bd86a725a
Document the methods of manager 2023-10-15 23:46:02 +03:00
Lee Miller 6369ea75d8
Define abstract bases in message and structure to reduce docstrings 2023-10-15 23:38:04 +03:00
Lee Miller bec948cfba
Copy the relevant part of test_network_group() from PyBitmessage 2023-10-15 20:17:38 +03:00
Lee Miller 1ce65fca8a
Invalidate the version message with a large time offset 2023-10-15 20:17:38 +03:00
Lee Miller ba95d046f9
Run listener with a large time offset and ensure it's not connected 2023-10-15 20:17:38 +03:00
Lee Miller 110dfc3324
Correct position of the except clause in listener loop 2023-10-15 20:17:38 +03:00
Lee Miller acee18f0c4
Add a test case for listener with a process running with --trusted-peer 2023-10-15 20:17:35 +03:00
Lee Miller 131512a5e6
Add a test for connections with large time offset 2023-10-15 20:16:34 +03:00
Lee Miller cfd054fcf4
Check network group of connections in process test if it isn't for i2p 2023-10-15 20:16:34 +03:00
Lee Miller 78f170451b
Define a static method network_group() in NetAddrNoPrefix
and use it in manager.
2023-10-15 20:16:34 +03:00
Lee Miller ba897c8d40
A short test for normal connection (with timeout in 5 min) 2023-10-15 20:16:31 +03:00
Lee Miller edb641024d
Define a base class for connection to subclass for special purposes 2023-10-15 19:00:51 +03:00
Lee Miller bae064d32b
Resolve an SSL issue when connecting to PyBitmessage 0.6.1, log version 2023-10-12 20:05:36 +03:00
15 changed files with 764 additions and 141 deletions

View File

@ -1,5 +1,7 @@
FROM ubuntu:focal
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update
RUN apt-get install -yq software-properties-common
@ -8,6 +10,7 @@ RUN apt-add-repository ppa:purplei2p/i2pd
RUN apt-get update
RUN apt-get install -yq --no-install-suggests --no-install-recommends \
build-essential libcap-dev libffi-dev libssl-dev \
python3-dev python3-pip python3.9 python3.9-dev python3.9-venv sudo i2pd
RUN echo 'builder ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers

View File

@ -0,0 +1,203 @@
// bitmessage cracker, build with g++ or MSVS to a shared library,
// use via minode.proofofwork module
#ifdef _WIN32
#include "winsock.h"
#include "windows.h"
#define uint64_t unsigned __int64
#else
#include <arpa/inet.h>
#include <pthread.h>
#include <stdint.h>
#endif
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#if defined(__APPLE__) || defined(__FreeBSD__) || defined (__DragonFly__) || defined (__OpenBSD__) || defined (__NetBSD__)
#include <sys/types.h>
#include <sys/sysctl.h>
#endif
#include "openssl/sha.h"
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#define HASH_SIZE 64
#define BUFLEN 16384
#if defined(__GNUC__)
#define EXPORT __attribute__ ((__visibility__("default")))
#elif defined(_WIN32)
#define EXPORT __declspec(dllexport)
#endif
#ifndef __APPLE__
#define ntohll(x) ( ( (uint64_t)(ntohl( (unsigned int)((x << 32) >> 32) )) << 32) | ntohl( ((unsigned int)(x >> 32)) ) )
#endif
unsigned long long max_val;
unsigned char *initialHash;
unsigned long long successval = 0;
unsigned int numthreads = 0;
#ifdef _WIN32
DWORD WINAPI threadfunc(LPVOID param) {
#else
void * threadfunc(void* param) {
#endif
unsigned int incamt = *((unsigned int*)param);
SHA512_CTX sha;
unsigned char buf[HASH_SIZE + sizeof(uint64_t)] = { 0 };
unsigned char output[HASH_SIZE] = { 0 };
memcpy(buf + sizeof(uint64_t), initialHash, HASH_SIZE);
unsigned long long tmpnonce = incamt;
unsigned long long * nonce = (unsigned long long *)buf;
unsigned long long * hash = (unsigned long long *)output;
while (successval == 0) {
tmpnonce += numthreads;
(*nonce) = ntohll(tmpnonce); /* increment nonce */
SHA512_Init(&sha);
SHA512_Update(&sha, buf, HASH_SIZE + sizeof(uint64_t));
SHA512_Final(output, &sha);
SHA512_Init(&sha);
SHA512_Update(&sha, output, HASH_SIZE);
SHA512_Final(output, &sha);
if (ntohll(*hash) < max_val) {
successval = tmpnonce;
}
}
#ifdef _WIN32
return 0;
#else
return NULL;
#endif
}
void getnumthreads()
{
#ifdef _WIN32
DWORD_PTR dwProcessAffinity, dwSystemAffinity;
#elif __linux__
cpu_set_t dwProcessAffinity;
#elif __OpenBSD__
int mib[2], core_count = 0;
int dwProcessAffinity = 0;
size_t len2;
#else
int dwProcessAffinity = 0;
int32_t core_count = 0;
#endif
size_t len = sizeof(dwProcessAffinity);
if (numthreads > 0)
return;
#ifdef _WIN32
GetProcessAffinityMask(GetCurrentProcess(), &dwProcessAffinity, &dwSystemAffinity);
#elif __linux__
sched_getaffinity(0, len, &dwProcessAffinity);
#elif __OpenBSD__
len2 = sizeof(core_count);
mib[0] = CTL_HW;
mib[1] = HW_NCPU;
if (sysctl(mib, 2, &core_count, &len2, 0, 0) == 0)
numthreads = core_count;
#else
if (sysctlbyname("hw.logicalcpu", &core_count, &len, 0, 0) == 0)
numthreads = core_count;
else if (sysctlbyname("hw.ncpu", &core_count, &len, 0, 0) == 0)
numthreads = core_count;
#endif
for (unsigned int i = 0; i < len * 8; i++)
#if defined(_WIN32)
#if defined(_MSC_VER)
if (dwProcessAffinity & (1i64 << i))
#else // CYGWIN/MINGW
if (dwProcessAffinity & (1ULL << i))
#endif
#elif defined __linux__
if (CPU_ISSET(i, &dwProcessAffinity))
#else
if (dwProcessAffinity & (1 << i))
#endif
numthreads++;
if (numthreads == 0) // something failed
numthreads = 1;
printf("Number of threads: %i\n", (int)numthreads);
}
extern "C" EXPORT unsigned long long BitmessagePOW(unsigned char * starthash, unsigned long long target)
{
successval = 0;
max_val = target;
getnumthreads();
initialHash = (unsigned char *)starthash;
# ifdef _WIN32
HANDLE* threads = (HANDLE*)calloc(sizeof(HANDLE), numthreads);
# else
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), numthreads);
struct sched_param schparam;
schparam.sched_priority = 0;
# endif
unsigned int *threaddata = (unsigned int *)calloc(sizeof(unsigned int), numthreads);
for (unsigned int i = 0; i < numthreads; i++) {
threaddata[i] = i;
# ifdef _WIN32
threads[i] = CreateThread(NULL, 0, threadfunc, (LPVOID)&threaddata[i], 0, NULL);
SetThreadPriority(threads[i], THREAD_PRIORITY_IDLE);
# else
pthread_create(&threads[i], NULL, threadfunc, (void*)&threaddata[i]);
# ifdef __linux__
pthread_setschedparam(threads[i], SCHED_IDLE, &schparam);
# else
pthread_setschedparam(threads[i], SCHED_RR, &schparam);
# endif
# endif
}
# ifdef _WIN32
WaitForMultipleObjects(numthreads, threads, TRUE, INFINITE);
# else
for (unsigned int i = 0; i < numthreads; i++) {
pthread_join(threads[i], NULL);
}
# endif
free(threads);
free(threaddata);
return successval;
}
// python module definitions
static PyObject *
bitmessage_pow(PyObject *self, PyObject *args)
{
const char *initial_hash;
unsigned long long target;
unsigned long long nonce;
if (!PyArg_ParseTuple(args, "yK", &initial_hash, target))
return NULL;
nonce = BitmessagePOW((unsigned char *)initial_hash, target);
return PyLong_FromUnsignedLongLong(nonce);
};
static PyMethodDef BitmsghashMethods[] = {
{"bitmessage_pow", bitmessage_pow, METH_VARARGS,
"Do the Bitmessage PoW and return nonce."},
{NULL, NULL, 0, NULL}
};
static struct PyModuleDef bitmsghashmodule = {
PyModuleDef_HEAD_INIT,
"bitmsghash",
"A C extension for PoW borrowed from PyBitmessage",
-1,
BitmsghashMethods
};
PyMODINIT_FUNC
PyInit_bitmsghash(void)
{
return PyModule_Create(&bitmsghashmodule);
}

View File

@ -15,8 +15,11 @@ import time
from . import message, shared, structure
class Connection(threading.Thread):
"""The connection object"""
class ConnectionBase(threading.Thread):
"""
Common code for the connection thread
with minimum command handlers to reuse
"""
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
@ -207,6 +210,11 @@ class Connection(threading.Thread):
context.options = (
ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
| ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE)
# OP_NO_SSL* is deprecated since 3.6
try:
context.minimum_version = ssl.TLSVersion.TLSv1
except AttributeError:
pass
self.s = context.wrap_socket(
self.s, server_side=self.server, do_handshake_on_connect=False)
@ -227,8 +235,8 @@ class Connection(threading.Thread):
break
self.tls = True
logging.debug(
'Established TLS connection with %s:%s',
self.host_print, self.port)
'Established TLS connection with %s:%s (%s)',
self.host_print, self.port, self.s.version())
def _send_message(self, m):
if isinstance(m, message.Message) and m.command == b'object':
@ -334,89 +342,13 @@ class Connection(threading.Thread):
break
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
):
self.status = 'disconnecting'
self.send_queue.put(None)
else:
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(
message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
elif m.command == b'verack':
if m.command == b'verack':
self.verack_received = True
logging.debug(
'%s:%s -> %s', self.host_print, self.port, 'verack')
if self.server:
self.send_queue.put('fully_established')
elif m.command == b'inv':
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
elif m.command == b'object':
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
elif m.command == b'addr':
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
elif m.command == b'ping':
logging.debug('%s:%s -> ping', self.host_print, self.port)
self.send_queue.put(message.Message(b'pong', b''))
@ -430,7 +362,51 @@ class Connection(threading.Thread):
shared.unchecked_node_pool.discard((self.host, self.port))
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
try:
getattr(self, '_process_msg_{}'.format(m.command.decode()))(m)
except (AttributeError, UnicodeDecodeError):
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
def _process_msg_version(self, m):
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
):
self.status = 'disconnecting'
self.send_queue.put(None)
else:
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
def _process_msg_addr(self, m):
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
@ -489,4 +465,41 @@ class Connection(threading.Thread):
message.Message(b'object', obj.to_bytes()))
class Connection(ConnectionBase):
"""The connection with all commands implementation"""
def _process_msg_inv(self, m):
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
def _process_msg_object(self, m):
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m):
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
shared.connection = Connection

View File

@ -28,13 +28,14 @@ class Listener(threading.Thread):
break
try:
conn, addr = self.s.accept()
logging.info('Incoming connection from: %s:%i', *addr[:2])
with shared.connections_lock:
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(*addr[:2], conn, server=True)
c.start()
shared.connections.add(c)
except socket.timeout:
pass
continue
logging.info('Incoming connection from: %s:%i', *addr[:2])
with shared.connections_lock:
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(*addr[:2], conn, server=True)
c.start()
shared.connections.add(c)

View File

@ -55,6 +55,7 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
"""Delete expired objects"""
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
@ -70,6 +71,7 @@ class Manager(threading.Thread):
@staticmethod
def manage_connections():
"""Keep number of open connections according to the app settings"""
hosts = set()
outgoing_connections = 0
for c in shared.connections.copy():
@ -77,7 +79,7 @@ class Manager(threading.Thread):
with shared.connections_lock:
shared.connections.remove(c)
else:
hosts.add(c.host)
hosts.add(structure.NetAddrNoPrefix.network_group(c.host))
if not c.server:
outgoing_connections += 1
@ -119,15 +121,16 @@ class Manager(threading.Thread):
else:
to_connect.update(shared.i2p_node_pool)
for addr in to_connect:
if addr[0] in hosts:
for host, port in to_connect:
group = structure.NetAddrNoPrefix.network_group(host)
if group in hosts:
continue
if addr[1] == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and addr[0] != shared.i2p_dest_pub:
if port == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick and host != shared.i2p_dest_pub:
try:
d = I2PDialer(
shared,
addr[0], shared.i2p_session_nick,
host, shared.i2p_session_nick,
shared.i2p_sam_host, shared.i2p_sam_port)
d.start()
hosts.add(d.destination)
@ -139,9 +142,9 @@ class Manager(threading.Thread):
else:
continue
else:
c = Connection(addr[0], addr[1])
c = Connection(host, port)
c.start()
hosts.add(c.host)
hosts.add(group)
with shared.connections_lock:
shared.connections.add(c)
shared.hosts = hosts
@ -201,6 +204,7 @@ class Manager(threading.Thread):
@staticmethod
def pickle_objects():
"""Save objects into a pickle file"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'bw'
@ -213,6 +217,7 @@ class Manager(threading.Thread):
@staticmethod
def pickle_nodes():
"""Save nodes into pickle files"""
if len(shared.node_pool) > 10000:
shared.node_pool = set(random.sample(shared.node_pool, 10000))
if len(shared.unchecked_node_pool) > 1000:
@ -241,6 +246,7 @@ class Manager(threading.Thread):
@staticmethod
def publish_i2p_destination():
"""Send I2P destination object"""
if shared.i2p_session_nick and not shared.i2p_transient:
logging.info('Publishing our I2P destination')
dest_pub_raw = base64.b64decode(

View File

@ -4,11 +4,28 @@ import base64
import hashlib
import struct
import time
from abc import ABC, abstractmethod
from . import shared, structure
class Header():
class IMessage(ABC):
"""A base for typical message"""
@abstractmethod
def __repr__(self):
"""Make a printable form"""
@abstractmethod
def to_bytes(self):
"""Serialize to bytes the full message"""
@classmethod
@abstractmethod
def from_message(cls, m):
"""Parse from message"""
class Header(structure.IStructure):
"""Message header structure"""
def __init__(self, command, payload_length, payload_checksum):
self.command = command
@ -24,7 +41,6 @@ class Header():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = b''
b += shared.magic_bytes
b += self.command.ljust(12, b'\x00')
@ -34,7 +50,6 @@ class Header():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
'>4s12sL4s', b)
@ -46,7 +61,7 @@ class Header():
return cls(command, payload_length, payload_checksum)
class Message():
class Message(structure.IStructure):
"""Common message structure"""
def __init__(self, command, payload):
self.command = command
@ -61,7 +76,6 @@ class Message():
base64.b16encode(self.payload_checksum).decode())
def to_bytes(self):
"""Serialize to bytes"""
b = Header(
self.command, self.payload_length, self.payload_checksum
).to_bytes()
@ -70,7 +84,6 @@ class Message():
@classmethod
def from_bytes(cls, b):
"""Parse from bytes"""
h = Header.from_bytes(b[:24])
payload = b[24:]
@ -98,7 +111,7 @@ def _payload_read_int(data):
data[varint_length:])
class Version():
class Version(IMessage):
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
@ -124,6 +137,7 @@ class Version():
base64.b16encode(self.nonce).decode(), self.user_agent)
def to_bytes(self):
"""Serialize to bytes"""
payload = b''
payload += struct.pack('>I', self.protocol_version)
payload += struct.pack('>Q', self.services)
@ -145,10 +159,13 @@ class Version():
def from_message(cls, m):
payload = m.payload
( # unused: timestamp, net_addr_local
protocol_version, services, _, net_addr_remote, _, nonce
( # unused: net_addr_local
protocol_version, services, timestamp, net_addr_remote, _, nonce
) = struct.unpack('>IQQ26s26s8s', payload[:80])
if abs(time.time() - timestamp) > 3600:
raise ValueError('remote time offset is too large')
net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote)
host = net_addr_remote.host
@ -176,7 +193,7 @@ class Version():
host, port, protocol_version, services, nonce, user_agent, streams)
class Inv():
class Inv(IMessage):
"""The inv message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -208,7 +225,7 @@ class Inv():
return cls(vectors)
class GetData():
class GetData(IMessage):
"""The getdata message payload"""
def __init__(self, vectors):
self.vectors = set(vectors)
@ -240,7 +257,7 @@ class GetData():
return cls(vectors)
class Addr():
class Addr(IMessage):
"""The addr message payload"""
def __init__(self, addresses):
self.addresses = addresses
@ -270,7 +287,7 @@ class Addr():
return cls(addresses)
class Error():
class Error(IMessage):
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text

View File

@ -1,13 +1,17 @@
"""Doing proof of work"""
import base64
import ctypes
import glob
import hashlib
import importlib
import logging
import multiprocessing
import os
import struct
import threading
import time
from . import shared, structure
from . import message, shared, structure
def _pow_worker(target, initial_hash, q):
@ -25,30 +29,103 @@ def _pow_worker(target, initial_hash, q):
q.put(struct.pack('>Q', nonce))
def _worker(obj):
q = multiprocessing.Queue()
p = multiprocessing.Process(
target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
class Worker(threading.Thread):
"""The thread doind PoW for objects and publishing them"""
def __init__(self, obj=None):
self.obj = obj
super().__init__(
name='Worker' if not self.obj
else 'Worker-%s' % (base64.b16encode(obj.vector).decode())
)
self.bmpow = bso = None
try:
bitmsglib = importlib.util.find_spec('minode.bitmsghash').origin
if not bitmsglib:
bitmsglib = glob.glob(os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'bitmsghash*.so'
))[0]
except (AttributeError, IndexError):
return
logging.debug('Starting POW process')
t = time.time()
p.start()
nonce = q.get()
p.join()
try:
bso = ctypes.CDLL(bitmsglib)
except Exception: # pylint: disable=broad-exception-caught
logging.warning('Error loading bitmsghash', exc_info=True)
try:
# MSVS on Windows
bso = ctypes.WinDLL(bitmsglib)
except (AttributeError, TypeError, ValueError):
return
logging.debug(
'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t)
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
logging.debug(
'Object vector is %s', base64.b16encode(obj.vector).decode())
self.bmpow = bso.BitmessagePOW
self.bmpow.restype = ctypes.c_ulonglong
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
@staticmethod
def _worker_result(obj):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=_pow_worker, args=(
obj.pow_target(), obj.pow_initial_hash(), q))
p.start()
nonce = q.get()
p.join()
return nonce
def add_pow(self, obj):
"""
Do PoW for the given object and return a new `.structure.Object`
- with the proper nonce and vector.
"""
t = time.time()
if not self.bmpow:
logging.debug('Starting PoW process')
nonce = self._worker_result(obj)
else:
logging.debug('Calling PoW extension')
nonce = struct.pack('>Q', self.bmpow(
ctypes.pointer(
ctypes.create_string_buffer(obj.pow_initial_hash(), 64)),
ctypes.c_ulonglong(obj.pow_target())
))
logging.debug(
'Finished doing PoW, nonce: %s, time: %ss', nonce, time.time() - t)
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
logging.debug(
'Object vector is %s', base64.b16encode(obj.vector).decode())
return obj
@staticmethod
def _publish(obj):
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
def run(self):
if self.obj:
self._publish(self.add_pow(self.obj))
return
while not shared.shutting_down:
data = shared.objects_queue.get()
obj = structure.Object.from_message(
message.Message.from_bytes(data))
if int.from_bytes(obj.nonce, 'big') == 0:
obj = self.add_pow(obj)
self._publish(obj)
def do_pow_and_publish(obj):
t = threading.Thread(target=_worker, args=(obj, ))
t.start()
"""
Start a worker thread doing PoW for the given object
and putting a new object and its vector into appropriate places in `shared`
to advertize to the network.
"""
Worker(obj).start()

View File

@ -64,3 +64,5 @@ connection_limit = 250
objects = {}
objects_lock = threading.Lock()
objects_queue = queue.Queue()

View File

@ -6,11 +6,24 @@ import logging
import socket
import struct
import time
from abc import ABC, abstractmethod
from . import shared
class VarInt():
class IStructure(ABC):
"""A base for typical structure"""
@abstractmethod
def to_bytes(self):
"""Serialize to bytes"""
@classmethod
@abstractmethod
def from_bytes(cls, b):
"""Parse from bytes"""
class VarInt(IStructure):
"""varint object"""
def __init__(self, n):
self.n = n
@ -29,6 +42,7 @@ class VarInt():
@staticmethod
def length(b):
"""Determine the length of varint in the given bytes"""
if b == 0xfd:
return 3
if b == 0xfe:
@ -87,7 +101,7 @@ class Object():
nonce, expires_time, object_type, version, stream_number, payload)
def to_bytes(self):
"""Serialize to bytes"""
"""Serialize to bytes object payload"""
payload = b''
payload += self.nonce
payload += struct.pack('>QL', self.expires_time, self.object_type)
@ -151,7 +165,7 @@ class Object():
return hashlib.sha512(self.to_bytes()[8:]).digest()
class NetAddrNoPrefix():
class NetAddrNoPrefix(IStructure):
"""Network address"""
def __init__(self, services, host, port):
self.services = services
@ -173,6 +187,21 @@ class NetAddrNoPrefix():
b += struct.pack('>H', int(self.port))
return b
@staticmethod
def network_group(host):
"""A simplified network group identifier from pybitmessage protocol"""
try:
host = socket.inet_pton(socket.AF_INET, host)
return host[:2]
except socket.error:
try:
host = socket.inet_pton(socket.AF_INET6, host)
return host[:12]
except OSError:
return host
except TypeError:
return host
@classmethod
def from_bytes(cls, b):
services, host, port = struct.unpack('>Q16sH', b)
@ -184,7 +213,7 @@ class NetAddrNoPrefix():
return cls(services, host, port)
class NetAddr():
class NetAddr(IStructure):
"""Network address with time and stream"""
def __init__(self, services, host, port, stream=shared.stream):
self.stream = stream

View File

@ -1,4 +1,6 @@
"""Tests for messages"""
import struct
import time
import unittest
from binascii import unhexlify
@ -75,6 +77,13 @@ class TestMessage(unittest.TestCase):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
with self.assertRaises(ValueError):
# large time offset
version_packet = message.Version.from_message(msg)
msg.payload = (
msg.payload[:12] + struct.pack('>Q', int(time.time()))
+ msg.payload[20:])
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)

View File

@ -0,0 +1,217 @@
"""Tests for network connections"""
import logging
import os
import random
import unittest
import tempfile
import time
from contextlib import contextmanager
from minode import connection, main, shared
from minode.listener import Listener
from minode.manager import Manager
from .test_process import TestProcessProto
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s')
@contextmanager
def time_offset(offset):
"""
Replace time.time() by a mock returning a constant value
with given offset from current time.
"""
started = time.time()
time_call = time.time
try:
time.time = lambda: started + offset
yield time_call
finally:
time.time = time_call
@contextmanager
def run_listener(host='localhost', port=8444):
"""
Run the Listener with zero connection limit and
reset variables in shared after its stop.
"""
connection_limit = shared.connection_limit
shared.connection_limit = 0
try:
listener = Listener(host, port)
listener.start()
yield listener
except OSError:
yield
finally:
shared.connection_limit = connection_limit
shared.connections.clear()
shared.shutting_down = True
time.sleep(1)
class TestNetwork(unittest.TestCase):
"""Test case starting connections"""
@classmethod
def setUpClass(cls):
shared.data_directory = tempfile.gettempdir()
def setUp(self):
shared.core_nodes.clear()
shared.unchecked_node_pool.clear()
shared.objects = {}
try:
os.remove(os.path.join(shared.data_directory, 'objects.pickle'))
except FileNotFoundError:
pass
def _make_initial_nodes(self):
Manager.load_data()
self.assertGreaterEqual(len(shared.core_nodes), 3)
main.bootstrap_from_dns()
self.assertGreaterEqual(len(shared.unchecked_node_pool), 3)
def test_connection(self):
"""Check a normal connection - should receive objects"""
self._make_initial_nodes()
started = time.time()
nodes = list(shared.core_nodes.union(shared.unchecked_node_pool))
random.shuffle(nodes)
for node in nodes:
# unknown = node not in shared.node_pool
# self.assertTrue(unknown)
unknown = True
shared.node_pool.discard(node)
c = connection.Connection(*node)
c.start()
connection_started = time.time()
while c.status not in ('disconnected', 'failed'):
# The addr of established connection is added to nodes pool
if unknown and c.status == 'fully_established':
unknown = False
self.assertIn(node, shared.node_pool)
if shared.objects or time.time() - connection_started > 90:
c.status = 'disconnecting'
if time.time() - started > 300:
c.status = 'disconnecting'
self.fail('Failed to receive an object in %s sec' % 300)
time.sleep(0.2)
if shared.objects: # got some objects
break
else:
self.fail('Failed to establish a proper connection')
def test_time_offset(self):
"""Assert the network bans for large time offset"""
def try_connect(nodes, timeout, call):
started = call()
for node in nodes:
c = connection.Connection(*node)
c.start()
while call() < started + timeout:
if c.status == 'fully_established':
return 'Established a connection'
if c.status in ('disconnected', 'failed'):
break
time.sleep(0.2)
else:
return 'Spent too much time trying to connect'
def time_offset_connections(nodes, offset):
"""Spoof time.time and open connections with given time offset"""
with time_offset(offset) as time_call:
result = try_connect(nodes, 200, time_call)
if result:
self.fail(result)
self._make_initial_nodes()
nodes = random.sample(
shared.core_nodes.union(shared.unchecked_node_pool), 5)
time_offset_connections(nodes, 4000)
time_offset_connections(nodes, -4000)
class TestListener(TestProcessProto):
"""A separate test case for Listener with a process with --trusted-peer"""
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
def setUp(self):
shared.shutting_down = False
def test_listener(self):
"""Start Listener and try to connect"""
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
c = connection.Connection('127.0.0.1', 8444)
shared.connections.add(c)
for _ in range(30):
if len(shared.connections) > 1:
self.fail('The listener ignored connection limit')
time.sleep(0.5)
shared.connection_limit = 2
c.start()
started = time.time()
while c.status not in ('disconnected', 'failed'):
if c.status == 'fully_established':
self.fail('Connected to itself')
if time.time() - started > 90:
c.status = 'disconnecting'
time.sleep(0.2)
server = None
started = time.time()
while not server:
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to establish the connection')
for c in shared.connections:
if c.status == 'fully_established':
server = c
self.assertTrue(server.server)
while not self.process.connections():
time.sleep(0.2)
if time.time() - started > 90:
self.fail('Failed to connect to listener')
client = self.process.connections()[0]
self.assertEqual(client.raddr[0], '127.0.0.1')
self.assertEqual(client.raddr[1], 8444)
self.assertEqual(server.host, client.laddr[0])
# self.assertEqual(server.port, client.laddr[1])
server.status = 'disconnecting'
self.assertFalse(listener.is_alive())
def test_listener_timeoffset(self):
"""Run listener with a large time offset - shouldn't connect"""
with time_offset(4000):
with run_listener() as listener:
if not listener:
self.fail('Failed to start listener')
shared.connection_limit = 2
for _ in range(30):
for c in shared.connections:
if c.status == 'fully_established':
self.fail('Established a connection')
time.sleep(0.5)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
shared.shutting_down = False

View File

@ -9,6 +9,8 @@ import time
import psutil
from minode.structure import NetAddrNoPrefix
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
@ -102,6 +104,12 @@ class TestProcess(TestProcessProto):
if len(self.connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
if '--i2p' not in self._process_cmd:
groups = []
for c in self.connections():
group = NetAddrNoPrefix.network_group(c.raddr[0])
self.assertNotIn(group, groups)
groups.append(group)
time.sleep(0.5)
else:
self.fail(

View File

@ -104,6 +104,27 @@ class TestStructure(unittest.TestCase):
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_network_group(self):
"""Test various types of network groups"""
test_ip = '1.2.3.4'
self.assertEqual(
b'\x01\x02', structure.NetAddrNoPrefix.network_group(test_ip))
self.assertEqual(
structure.NetAddrNoPrefix.network_group('8.8.8.8'),
structure.NetAddrNoPrefix.network_group('8.8.4.4'))
self.assertNotEqual(
structure.NetAddrNoPrefix.network_group('1.1.1.1'),
structure.NetAddrNoPrefix.network_group('8.8.8.8'))
test_ip = '0102:0304:0506:0708:090A:0B0C:0D0E:0F10'
self.assertEqual(
b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C',
structure.NetAddrNoPrefix.network_group(test_ip))
for test_ip in (
'bootstrap8444.bitmessage.org', 'quzwelsuziwqgpt2.onion', None
):
self.assertEqual(
test_ip, structure.NetAddrNoPrefix.network_group(test_ip))
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
@ -122,8 +143,7 @@ class TestStructure(unittest.TestCase):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
obj = proofofwork.Worker().add_pow(obj)
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())

View File

@ -1,8 +1,9 @@
#!/usr/bin/env python
import os
import sys
from setuptools import setup, find_packages
from setuptools import Extension, find_packages, setup
from minode import shared
@ -12,6 +13,19 @@ README = open(os.path.join(
name, version = shared.user_agent.strip(b'/').split(b':')
bitmsghash = Extension(
'minode.bitmsghash',
sources=['minode/bitmsghash/bitmsghash.cpp'],
libraries=['pthread', 'crypto'],
)
if sys.platform[:3] == 'win':
bitmsghash.libraries = ['libeay32', 'ws2_32']
openssl_dir = os.getenv('OPENSSL_DIR')
if openssl_dir:
bitmsghash.library_dirs = [os.path.join(openssl_dir, 'lib')]
bitmsghash.include_dirs = [os.path.join(openssl_dir, 'include')]
setup(
name=name.decode('utf-8'),
version=version.decode('utf-8'),
@ -23,6 +37,7 @@ setup(
url='https://git.bitmessage.org/lee.miller/MiNode',
packages=find_packages(exclude=('*tests',)),
package_data={'': ['*.csv', 'tls/*.pem']},
ext_modules=[bitmsghash],
entry_points={'console_scripts': ['minode = minode.main:main']},
classifiers=[
"License :: OSI Approved :: MIT License"

View File

@ -12,6 +12,9 @@ deps = flake8
commands =
flake8 minode --count --select=E9,F63,F7,F82 --show-source --statistics
[testenv:py39]
commands_pre = pip install -e .
[testenv:reset]
deps =
-rrequirements.txt