Started adding a request queue - for sending prepared messages

in the first place.
This commit is contained in:
Lee Miller 2023-07-21 02:36:40 +03:00
parent e5187c7887
commit 4ea139c9f6
Signed by: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
3 changed files with 41 additions and 10 deletions

View File

@ -42,8 +42,6 @@ def parse_arguments():
'--trusted-peer', help='Specify a trusted peer we should connect to') '--trusted-peer', help='Specify a trusted peer we should connect to')
parser.add_argument( parser.add_argument(
'--connection-limit', type=int, help='Maximum number of connections') '--connection-limit', type=int, help='Maximum number of connections')
parser.add_argument(
'--msg-queue', action='store_true', help='Enable messages queue')
parser.add_argument( parser.add_argument(
'--i2p', action='store_true', help='Enable I2P support (uses SAMv3)') '--i2p', action='store_true', help='Enable I2P support (uses SAMv3)')
parser.add_argument( parser.add_argument(
@ -56,6 +54,17 @@ def parse_arguments():
'--i2p-transient', action='store_true', '--i2p-transient', action='store_true',
help='Generate new I2P destination on start') help='Generate new I2P destination on start')
try:
import zmq
zmq_context = zmq.Context()
except ImportError:
zmq_context = None
else:
parser.add_argument(
'--msg-queue', action='store_true', help='Enable messages queue')
parser.add_argument(
'--request-queue', help='Set the URL for a request queue')
args = parser.parse_args() args = parser.parse_args()
if args.port: if args.port:
shared.listening_port = args.port shared.listening_port = args.port
@ -92,11 +101,6 @@ def parse_arguments():
shared.trusted_peer = (addr[0], int(addr[1])) shared.trusted_peer = (addr[0], int(addr[1]))
if args.connection_limit: if args.connection_limit:
shared.connection_limit = args.connection_limit shared.connection_limit = args.connection_limit
if args.msg_queue:
import zmq
zmq_context = zmq.Context()
shared.zmq_socket = zmq_context.socket(zmq.PUB)
shared.zmq_socket.bind("tcp://*:5556")
if args.i2p: if args.i2p:
shared.i2p_enabled = True shared.i2p_enabled = True
if args.i2p_tunnel_length: if args.i2p_tunnel_length:
@ -108,6 +112,17 @@ def parse_arguments():
if args.i2p_transient: if args.i2p_transient:
shared.i2p_transient = True shared.i2p_transient = True
if zmq_context is None:
return
if args.msg_queue:
shared.zmq_socket = zmq_context.socket(zmq.PUB)
shared.zmq_socket.bind("tcp://*:5556")
if args.request_queue:
shared.request_queue = zmq_context.socket(zmq.SUB)
shared.request_queue.connect(args.request_queue)
shared.request_queue.setsockopt(zmq.RCVTIMEO, 1000)
shared.request_queue.setsockopt(zmq.SUBSCRIBE, b'msg')
def load_data(): def load_data():
"""Loads initial nodes and data, stored in files between sessions""" """Loads initial nodes and data, stored in files between sessions"""

View File

@ -4,12 +4,13 @@ import base64
import logging import logging
import os import os
import pickle import pickle
import queue
import random import random
import threading import threading
import time import time
from . import proofofwork, shared, structure import zmq
from . import message, proofofwork, shared, structure
from .connection import Connection from .connection import Connection
from .i2p import I2PDialer from .i2p import I2PDialer
@ -18,7 +19,6 @@ class Manager(threading.Thread):
"""The manager thread""" """The manager thread"""
def __init__(self): def __init__(self):
super().__init__(name='Manager') super().__init__(name='Manager')
self.q = queue.Queue()
self.last_cleaned_objects = time.time() self.last_cleaned_objects = time.time()
self.last_cleaned_connections = time.time() self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time() self.last_pickled_objects = time.time()
@ -51,6 +51,21 @@ class Manager(threading.Thread):
self.publish_i2p_destination() self.publish_i2p_destination()
self.last_published_i2p_destination = now self.last_published_i2p_destination = now
try:
tag, data = shared.request_queue.recv().split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data)
)
except (AttributeError, ValueError, zmq.error.Again):
pass
else:
if obj.is_valid():
with shared.objects_lock:
shared.objects[obj.vector] = obj
else:
logging.error(
'Got invalid object in the request queue: %s', obj)
@staticmethod @staticmethod
def clean_objects(): def clean_objects():
for vector in set(shared.objects): for vector in set(shared.objects):

View File

@ -66,3 +66,4 @@ objects = {}
objects_lock = threading.Lock() objects_lock = threading.Lock()
zmq_socket = None zmq_socket = None
request_queue = None