Compare commits

...

10 Commits

Author SHA1 Message Date
Lee Miller cdece735ce
Make an object from sample data in test_object() 2023-10-12 03:30:49 +03:00
Lee Miller d4fbc35d7d
Set object tag for object types supporting it 2023-10-07 17:55:50 +03:00
Lee Miller e64ca24266
Cover the main proofofwork call and worker procedure 2023-10-07 17:55:50 +03:00
Lee Miller e41391450b
Add Error message class, handle fatal 2023-10-07 17:55:50 +03:00
Lee Miller 855f83330a
Define a helper function to read a varint and trim payload 2023-10-07 17:55:50 +03:00
Lee Miller 493bc5a411
Use shared.stream when assembling i2p_dest object instead of hardcoded 1 2023-10-07 17:55:49 +03:00
Lee Miller f0752c9e54
Unify and improve message.Version:
- from_message() decoding method as in other messages;
  - support multiple streams and move stream check to connection;
  - use shared.stream instead of hardcoded 1;
  - replace values from shared with the instance attributes in to_bytes(),
    put conventional 1 as services of a remote host.
2023-10-07 17:55:49 +03:00
Lee Miller a363590c18
Add a test for version message 2023-10-07 17:55:49 +03:00
Lee Miller 7cd0268212
Improve structure.Object:
- use shared.stream instead of hardcoded 1;
  - reuse pow_initial_hash() in is_valid().
2023-10-07 17:55:49 +03:00
Lee Miller f35a3504a2
Add a test for object covering also proofofwork 2023-10-07 17:55:49 +03:00
6 changed files with 226 additions and 49 deletions

View File

@ -335,7 +335,9 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -420,8 +422,12 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)

View File

@ -193,5 +193,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
1, dest_pub_raw)
shared.stream, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -91,12 +91,19 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent
user_agent=shared.user_agent, streams=None
):
self.host = host
self.port = port
@ -105,6 +112,9 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -119,20 +129,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
shared.services, self.host, self.port).to_bytes()
1, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
shared.services, '127.0.0.1', 8444).to_bytes()
self.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_bytes(cls, b):
m = Message.from_bytes(b)
def from_message(cls, m):
payload = m.payload
( # unused: timestamp, net_addr_local
@ -146,20 +156,24 @@ class Version():
payload = payload[80:]
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent_length, payload = _payload_read_int(payload)
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
return cls(host, port, protocol_version, services, nonce, user_agent)
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
class Inv():
@ -180,11 +194,7 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -216,11 +226,7 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vector_count, payload = _payload_read_int(payload)
vectors = set()
@ -252,11 +258,8 @@ class Addr():
def from_message(cls, m):
payload = m.payload
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
# not validating addr_count
_, payload = _payload_read_int(payload)
addresses = set()
@ -265,3 +268,37 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -60,6 +60,12 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@ -112,18 +118,16 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != 1:
if self.stream_number != shared.stream:
logging.warning(
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + h).digest()).digest()[:8], 'big')
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(

View File

@ -13,7 +13,7 @@ from minode.shared import magic_bytes
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_data = unhexlify(
sample_addr_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
@ -24,6 +24,21 @@ sample_data = unhexlify(
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
@ -47,7 +62,7 @@ class TestMessage(unittest.TestCase):
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_data)
msg = message.Message(b'addr', sample_addr_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -55,3 +70,32 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,9 +1,13 @@
"""Tests for structures"""
import unittest
import base64
import logging
import queue
import struct
import time
import unittest
from binascii import unhexlify
from minode import structure
from minode import message, proofofwork, shared, structure
# host pregenerated by pybitmessage.protocol.encodeHost()
@ -13,10 +17,24 @@ sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -85,3 +103,71 @@ class TestStructure(unittest.TestCase):
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())