174 lines
6.7 KiB
Python
174 lines
6.7 KiB
Python
"""Tests for structures"""
|
|
import base64
|
|
import logging
|
|
import queue
|
|
import struct
|
|
import time
|
|
import unittest
|
|
from binascii import unhexlify
|
|
|
|
from minode import message, proofofwork, shared, structure
|
|
|
|
|
|
# host pregenerated by pybitmessage.protocol.encodeHost()
|
|
# for one of bootstrap servers, port 8080,
|
|
# everything else is like in test_message: 1626611891, 1, 1
|
|
sample_addr_data = unhexlify(
|
|
'0000000060f420b3000000010000000000000001'
|
|
'260753000201300000000000000057ae1f90')
|
|
|
|
# data for an object with expires_time 1697063939
|
|
# structure.Object(
|
|
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
|
|
sample_object_data = unhexlify(
|
|
'000000000000000000000000652724030000002a010248454c4c4f')
|
|
|
|
logging.basicConfig(
|
|
level=shared.log_level,
|
|
format='[%(asctime)s] [%(levelname)s] %(message)s')
|
|
|
|
|
|
class TestStructure(unittest.TestCase):
|
|
"""Testing structures serializing and deserializing"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
shared.objects = {}
|
|
|
|
def test_varint(self):
|
|
"""Test varint serializing and deserializing"""
|
|
s = structure.VarInt(0)
|
|
self.assertEqual(s.to_bytes(), b'\x00')
|
|
s = structure.VarInt.from_bytes(b'\x00')
|
|
self.assertEqual(s.n, 0)
|
|
s = structure.VarInt(42)
|
|
self.assertEqual(s.to_bytes(), b'*')
|
|
s = structure.VarInt.from_bytes(b'*')
|
|
self.assertEqual(s.n, 42)
|
|
s = structure.VarInt(252)
|
|
self.assertEqual(s.to_bytes(), unhexlify('fc'))
|
|
s = structure.VarInt.from_bytes(unhexlify('fc'))
|
|
self.assertEqual(s.n, 252)
|
|
s = structure.VarInt(253)
|
|
self.assertEqual(s.to_bytes(), unhexlify('fd00fd'))
|
|
s = structure.VarInt.from_bytes(unhexlify('fd00fd'))
|
|
self.assertEqual(s.n, 253)
|
|
s = structure.VarInt(100500)
|
|
self.assertEqual(s.to_bytes(), unhexlify('fe00018894'))
|
|
s = structure.VarInt.from_bytes(unhexlify('fe00018894'))
|
|
self.assertEqual(s.n, 100500)
|
|
s = structure.VarInt(65535)
|
|
self.assertEqual(s.to_bytes(), unhexlify('fdffff'))
|
|
s = structure.VarInt.from_bytes(unhexlify('fdffff'))
|
|
self.assertEqual(s.n, 65535)
|
|
s = structure.VarInt(4294967295)
|
|
self.assertEqual(s.to_bytes(), unhexlify('feffffffff'))
|
|
s = structure.VarInt.from_bytes(unhexlify('feffffffff'))
|
|
self.assertEqual(s.n, 4294967295)
|
|
s = structure.VarInt(4294967296)
|
|
self.assertEqual(s.to_bytes(), unhexlify('ff0000000100000000'))
|
|
s = structure.VarInt.from_bytes(unhexlify('ff0000000100000000'))
|
|
self.assertEqual(s.n, 4294967296)
|
|
s = structure.VarInt(18446744073709551615)
|
|
self.assertEqual(s.to_bytes(), unhexlify('ffffffffffffffffff'))
|
|
s = structure.VarInt.from_bytes(unhexlify('ffffffffffffffffff'))
|
|
self.assertEqual(s.n, 18446744073709551615)
|
|
|
|
def test_address(self):
|
|
"""Check address encoding in structure.NetAddrNoPrefix()"""
|
|
addr = structure.NetAddrNoPrefix(1, '127.0.0.1', 8444)
|
|
self.assertEqual(
|
|
addr.to_bytes()[8:24],
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'
|
|
+ struct.pack('>L', 2130706433))
|
|
addr = structure.NetAddrNoPrefix(1, '191.168.1.1', 8444)
|
|
self.assertEqual(
|
|
addr.to_bytes()[8:24],
|
|
unhexlify('00000000000000000000ffffbfa80101'))
|
|
addr = structure.NetAddrNoPrefix(1, '1.1.1.1', 8444)
|
|
self.assertEqual(
|
|
addr.to_bytes()[8:24],
|
|
unhexlify('00000000000000000000ffff01010101'))
|
|
addr = structure.NetAddrNoPrefix(
|
|
1, '0102:0304:0506:0708:090A:0B0C:0D0E:0F10', 8444)
|
|
self.assertEqual(
|
|
addr.to_bytes()[8:24],
|
|
unhexlify('0102030405060708090a0b0c0d0e0f10'))
|
|
|
|
addr = structure.NetAddr.from_bytes(sample_addr_data)
|
|
self.assertEqual(addr.host, '2607:5300:201:3000::57ae')
|
|
self.assertEqual(addr.port, 8080)
|
|
self.assertEqual(addr.stream, 1)
|
|
self.assertEqual(addr.services, 1)
|
|
|
|
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
|
|
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
|
|
|
|
def test_object(self):
|
|
"""Create and check objects"""
|
|
obj = structure.Object.from_message(
|
|
message.Message(b'object', sample_object_data))
|
|
self.assertEqual(obj.object_type, 42)
|
|
self.assertEqual(obj.stream_number, 2)
|
|
self.assertEqual(obj.expires_time, 1697063939)
|
|
self.assertEqual(obj.object_payload, b'HELLO')
|
|
|
|
obj = structure.Object(
|
|
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
|
|
self.assertFalse(obj.is_valid())
|
|
obj.expires_time = int(time.time() - 11000)
|
|
self.assertFalse(obj.is_valid())
|
|
|
|
obj = structure.Object(
|
|
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
|
|
vector = obj.vector
|
|
proofofwork._worker(obj) # pylint: disable=protected-access
|
|
obj = shared.objects.popitem()[1]
|
|
self.assertNotEqual(obj.vector, vector)
|
|
self.assertFalse(obj.is_expired())
|
|
self.assertFalse(obj.is_valid())
|
|
shared.stream = 2
|
|
self.assertTrue(obj.is_valid())
|
|
|
|
obj.object_payload = \
|
|
b'TIGER, tiger, burning bright. In the forests of the night'
|
|
self.assertFalse(obj.is_valid())
|
|
|
|
def test_proofofwork(self):
|
|
"""Check the main proofofwork call and worker"""
|
|
shared.vector_advertise_queue = queue.Queue()
|
|
obj = structure.Object(
|
|
b'\x00' * 8, int(time.time() + 300), 42, 1,
|
|
shared.stream, b'HELLO')
|
|
start_time = time.time()
|
|
proofofwork.do_pow_and_publish(obj)
|
|
try:
|
|
vector = shared.vector_advertise_queue.get(timeout=300)
|
|
except queue.Empty:
|
|
self.fail("Couldn't make work in 300 sec")
|
|
else:
|
|
time.sleep(1)
|
|
try:
|
|
result = shared.objects[vector]
|
|
except KeyError:
|
|
self.fail(
|
|
"Couldn't found object with vector %s"
|
|
" %s sec after pow start" % (
|
|
base64.b16encode(vector), time.time() - start_time))
|
|
self.assertTrue(result.is_valid())
|
|
self.assertEqual(result.object_type, 42)
|
|
self.assertEqual(result.object_payload, b'HELLO')
|
|
|
|
q = queue.Queue()
|
|
# pylint: disable=protected-access
|
|
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
|
|
try:
|
|
nonce = q.get(timeout=5)
|
|
except queue.Empty:
|
|
self.fail("No nonce found in the queue")
|
|
|
|
obj = structure.Object(
|
|
nonce, obj.expires_time, obj.object_type, obj.version,
|
|
obj.stream_number, obj.object_payload)
|
|
self.assertTrue(obj.is_valid())
|