Compare commits

...

4 Commits
v0.3 ... queue

Author SHA1 Message Date
Lee Miller 504f12e5d0
Fix and improve test_queue:
- decrease timeout, remove unneeded sleep;
  - make and check objects from the data received in message queue;
  - add docstrings.
2023-10-15 01:48:47 +03:00
Lee Miller 4ea139c9f6
Started adding a request queue - for sending prepared messages
in the first place.
2023-10-15 01:48:47 +03:00
Dmitri Bogomolov e5187c7887
Minimal test for msg queue: wait for a msg at most for 4 min 2023-10-15 01:48:46 +03:00
Dmitri Bogomolov 3c56afc570
Testing zmq PUB/SUB for objects. Enabled by --msg-queue arg. 2023-10-15 01:48:15 +03:00
6 changed files with 93 additions and 3 deletions

View File

@ -405,6 +405,9 @@ class Connection(threading.Thread):
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
if shared.zmq_socket:
shared.zmq_socket.send(
b'obj\x00%i\x00' % obj.object_type + obj.to_bytes())
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)

View File

@ -54,6 +54,17 @@ def parse_arguments():
'--i2p-transient', action='store_true',
help='Generate new I2P destination on start')
try:
import zmq
zmq_context = zmq.Context()
except ImportError:
zmq_context = None
else:
parser.add_argument(
'--msg-queue', action='store_true', help='Enable messages queue')
parser.add_argument(
'--request-queue', help='Set the URL for a request queue')
args = parser.parse_args()
if args.port:
shared.listening_port = args.port
@ -101,6 +112,17 @@ def parse_arguments():
if args.i2p_transient:
shared.i2p_transient = True
if zmq_context is None:
return
if args.msg_queue:
shared.zmq_socket = zmq_context.socket(zmq.PUB)
shared.zmq_socket.bind("tcp://*:5556")
if args.request_queue:
shared.request_queue = zmq_context.socket(zmq.SUB)
shared.request_queue.connect(args.request_queue)
shared.request_queue.setsockopt(zmq.RCVTIMEO, 1000)
shared.request_queue.setsockopt(zmq.SUBSCRIBE, b'msg')
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""

View File

@ -4,12 +4,13 @@ import base64
import logging
import os
import pickle
import queue
import random
import threading
import time
from . import proofofwork, shared, structure
import zmq
from . import message, proofofwork, shared, structure
from .connection import Connection
from .i2p import I2PDialer
@ -18,7 +19,6 @@ class Manager(threading.Thread):
"""The manager thread"""
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time()
@ -51,6 +51,21 @@ class Manager(threading.Thread):
self.publish_i2p_destination()
self.last_published_i2p_destination = now
try:
tag, data = shared.request_queue.recv().split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data)
)
except (AttributeError, ValueError, zmq.error.Again):
pass
else:
if obj.is_valid():
with shared.objects_lock:
shared.objects[obj.vector] = obj
else:
logging.error(
'Got invalid object in the request queue: %s', obj)
@staticmethod
def clean_objects():
for vector in set(shared.objects):

View File

@ -64,3 +64,6 @@ connection_limit = 250
objects = {}
objects_lock = threading.Lock()
zmq_socket = None
request_queue = None

View File

@ -0,0 +1,46 @@
"""Test interprocess queues"""
import time
import zmq
from minode import structure, message
from .test_process import TestProcessProto
class TestProcessQueue(TestProcessProto):
"""A test case starting the process with queues"""
_process_cmd = ['minode', '--msg-queue']
@classmethod
def setUpClass(cls):
super().setUpClass()
context = zmq.Context()
cls.socket = context.socket(zmq.SUB)
cls.socket.connect('tcp://localhost:5556')
cls.socket.setsockopt(zmq.RCVTIMEO, 5000)
cls.socket.setsockopt(zmq.SUBSCRIBE, b'obj')
def test_receive_msg(self):
"""wait a couple of messages"""
timeout = 240
start = time.time()
c = 0
while time.time() - start < timeout:
if c > 1:
return
try:
tag, data = self.socket.recv().split(b'\x00', 1)
except zmq.error.Again:
continue
c += 1
self.assertEqual(tag, b'obj')
obj_type, data = data.split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data))
self.assertEqual(int(obj_type), obj.object_type)
self.assertTrue(obj.is_valid())
if c == 0:
self.fail('No messages received in %ss' % timeout)

View File

@ -1,2 +1,3 @@
coverage
psutil
pyzmq