Compare commits

..

4 Commits
v0.3 ... queue

Author SHA1 Message Date
504f12e5d0
Fix and improve test_queue:
- decrease timeout, remove unneeded sleep;
  - make and check objects from the data received in message queue;
  - add docstrings.
2023-10-15 01:48:47 +03:00
4ea139c9f6
Started adding a request queue - for sending prepared messages
in the first place.
2023-10-15 01:48:47 +03:00
e5187c7887
Minimal test for msg queue: wait for a msg at most for 4 min 2023-10-15 01:48:46 +03:00
3c56afc570
Testing zmq PUB/SUB for objects. Enabled by --msg-queue arg. 2023-10-15 01:48:15 +03:00
8 changed files with 153 additions and 64 deletions

View File

@ -405,6 +405,9 @@ class Connection(threading.Thread):
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
if shared.zmq_socket:
shared.zmq_socket.send(
b'obj\x00%i\x00' % obj.object_type + obj.to_bytes())
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)

View File

@ -37,7 +37,7 @@ class I2PDialer(I2PThread):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -45,5 +45,6 @@ class I2PDialer(I2PThread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning(
'Error while connecting to %s', self.destination)
self.success = False

View File

@ -2,9 +2,11 @@
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -20,7 +22,7 @@ def handler(s, f): # pylint: disable=unused-argument
shared.shutting_down = True
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
def parse_arguments():
"""Parsing arguments"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
@ -52,6 +54,17 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
'--i2p-transient', action='store_true',
help='Generate new I2P destination on start')
try:
import zmq
zmq_context = zmq.Context()
except ImportError:
zmq_context = None
else:
parser.add_argument(
'--msg-queue', action='store_true', help='Enable messages queue')
parser.add_argument(
'--request-queue', help='Set the URL for a request queue')
args = parser.parse_args()
if args.port:
shared.listening_port = args.port
@ -99,6 +112,67 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
if args.i2p_transient:
shared.i2p_transient = True
if zmq_context is None:
return
if args.msg_queue:
shared.zmq_socket = zmq_context.socket(zmq.PUB)
shared.zmq_socket.bind("tcp://*:5556")
if args.request_queue:
shared.request_queue = zmq_context.socket(zmq.SUB)
shared.request_queue.connect(args.request_queue)
shared.request_queue.setsockopt(zmq.RCVTIMEO, 1000)
shared.request_queue.setsockopt(zmq.SUBSCRIBE, b'msg')
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
@ -238,6 +312,8 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()

View File

@ -1,16 +1,16 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
import queue
import random
import threading
import time
from . import proofofwork, shared, structure
import zmq
from . import message, proofofwork, shared, structure
from .connection import Connection
from .i2p import I2PDialer
@ -19,17 +19,15 @@ class Manager(threading.Thread):
"""The manager thread"""
def __init__(self):
super().__init__(name='Manager')
self.q = queue.Queue()
self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time()
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
@ -53,6 +51,21 @@ class Manager(threading.Thread):
self.publish_i2p_destination()
self.last_published_i2p_destination = now
try:
tag, data = shared.request_queue.recv().split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data)
)
except (AttributeError, ValueError, zmq.error.Again):
pass
else:
if obj.is_valid():
with shared.objects_lock:
shared.objects[obj.vector] = obj
else:
logging.error(
'Got invalid object in the request queue: %s', obj)
@staticmethod
def clean_objects():
for vector in set(shared.objects):
@ -146,59 +159,6 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:

View File

@ -64,3 +64,6 @@ connection_limit = 250
objects = {}
objects_lock = threading.Lock()
zmq_socket = None
request_queue = None

View File

@ -0,0 +1,46 @@
"""Test interprocess queues"""
import time
import zmq
from minode import structure, message
from .test_process import TestProcessProto
class TestProcessQueue(TestProcessProto):
"""A test case starting the process with queues"""
_process_cmd = ['minode', '--msg-queue']
@classmethod
def setUpClass(cls):
super().setUpClass()
context = zmq.Context()
cls.socket = context.socket(zmq.SUB)
cls.socket.connect('tcp://localhost:5556')
cls.socket.setsockopt(zmq.RCVTIMEO, 5000)
cls.socket.setsockopt(zmq.SUBSCRIBE, b'obj')
def test_receive_msg(self):
"""wait a couple of messages"""
timeout = 240
start = time.time()
c = 0
while time.time() - start < timeout:
if c > 1:
return
try:
tag, data = self.socket.recv().split(b'\x00', 1)
except zmq.error.Again:
continue
c += 1
self.assertEqual(tag, b'obj')
obj_type, data = data.split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data))
self.assertEqual(int(obj_type), obj.object_type)
self.assertTrue(obj.is_valid())
if c == 0:
self.fail('No messages received in %ss' % timeout)

View File

@ -1,2 +1,3 @@
coverage
psutil
pyzmq

View File

@ -41,5 +41,4 @@ ignore_errors = true
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8
max-args = 7