MiNode/minode/tests/test_proofofwork.py
Lee Miller 2cf9587b91
All checks were successful
Testing / default (push) Successful in 7m1s
Merge branch 'shutdown' into testing
2025-02-18 08:50:30 +02:00

77 lines
2.4 KiB
Python

"""Special tests for PoW"""
import base64
import logging
import multiprocessing
import os
import queue
import signal
import threading
import time
import unittest
from minode import proofofwork, shared, structure
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] [%(levelname)s] %(message)s')
multiprocessing.set_start_method('spawn')
class TestProofofwork(unittest.TestCase):
"""Test components of proof of work"""
def setUp(self):
shared.objects = {}
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
int(time.time() + 300), 42, 1,
shared.stream, object_payload=b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
obj.expires_time, obj.object_type, obj.version, obj.stream_number,
object_payload=obj.object_payload, nonce=nonce)
self.assertTrue(obj.is_valid())
def test_interrupt(self):
"""Send signals to _pow_worker process"""
obj = structure.Object(
int(time.time() + 300), 42, 1, 2, object_payload=os.urandom(4096))
threading.Thread(target=proofofwork._worker, args=(obj, )).start()
time.sleep(1)
worker = multiprocessing.active_children()[0]
# worker.terminate()
os.kill(worker.pid, signal.SIGINT)
time.sleep(1)
self.assertFalse(worker.is_alive())