diff --git a/minode/main.py b/minode/main.py index aae3c46..85d2ec6 100644 --- a/minode/main.py +++ b/minode/main.py @@ -42,8 +42,6 @@ def parse_arguments(): '--trusted-peer', help='Specify a trusted peer we should connect to') parser.add_argument( '--connection-limit', type=int, help='Maximum number of connections') - parser.add_argument( - '--msg-queue', action='store_true', help='Enable messages queue') parser.add_argument( '--i2p', action='store_true', help='Enable I2P support (uses SAMv3)') parser.add_argument( @@ -56,6 +54,17 @@ def parse_arguments(): '--i2p-transient', action='store_true', help='Generate new I2P destination on start') + try: + import zmq + zmq_context = zmq.Context() + except ImportError: + zmq_context = None + else: + parser.add_argument( + '--msg-queue', action='store_true', help='Enable messages queue') + parser.add_argument( + '--request-queue', help='Set the URL for a request queue') + args = parser.parse_args() if args.port: shared.listening_port = args.port @@ -92,11 +101,6 @@ def parse_arguments(): shared.trusted_peer = (addr[0], int(addr[1])) if args.connection_limit: shared.connection_limit = args.connection_limit - if args.msg_queue: - import zmq - zmq_context = zmq.Context() - shared.zmq_socket = zmq_context.socket(zmq.PUB) - shared.zmq_socket.bind("tcp://*:5556") if args.i2p: shared.i2p_enabled = True if args.i2p_tunnel_length: @@ -108,6 +112,17 @@ def parse_arguments(): if args.i2p_transient: shared.i2p_transient = True + if zmq_context is None: + return + if args.msg_queue: + shared.zmq_socket = zmq_context.socket(zmq.PUB) + shared.zmq_socket.bind("tcp://*:5556") + if args.request_queue: + shared.request_queue = zmq_context.socket(zmq.SUB) + shared.request_queue.connect(args.request_queue) + shared.request_queue.setsockopt(zmq.RCVTIMEO, 1000) + shared.request_queue.setsockopt(zmq.SUBSCRIBE, b'msg') + def load_data(): """Loads initial nodes and data, stored in files between sessions""" diff --git a/minode/manager.py b/minode/manager.py index 0afce57..01c5a77 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -4,12 +4,13 @@ import base64 import logging import os import pickle -import queue import random import threading import time -from . import proofofwork, shared, structure +import zmq + +from . import message, proofofwork, shared, structure from .connection import Connection from .i2p import I2PDialer @@ -18,7 +19,6 @@ class Manager(threading.Thread): """The manager thread""" def __init__(self): super().__init__(name='Manager') - self.q = queue.Queue() self.last_cleaned_objects = time.time() self.last_cleaned_connections = time.time() self.last_pickled_objects = time.time() @@ -51,6 +51,21 @@ class Manager(threading.Thread): self.publish_i2p_destination() self.last_published_i2p_destination = now + try: + tag, data = shared.request_queue.recv().split(b'\x00', 1) + obj = structure.Object.from_message( + message.Message(b'object', data) + ) + except (AttributeError, ValueError, zmq.error.Again): + pass + else: + if obj.is_valid(): + with shared.objects_lock: + shared.objects[obj.vector] = obj + else: + logging.error( + 'Got invalid object in the request queue: %s', obj) + @staticmethod def clean_objects(): for vector in set(shared.objects): diff --git a/minode/shared.py b/minode/shared.py index ce0c613..61b1c25 100644 --- a/minode/shared.py +++ b/minode/shared.py @@ -66,3 +66,4 @@ objects = {} objects_lock = threading.Lock() zmq_socket = None +request_queue = None