Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
Lee Miller | 504f12e5d0 | |
Lee Miller | 4ea139c9f6 | |
Dmitri Bogomolov | e5187c7887 | |
Dmitri Bogomolov | 3c56afc570 |
|
@ -405,6 +405,9 @@ class Connection(threading.Thread):
|
||||||
logging.debug(dest)
|
logging.debug(dest)
|
||||||
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
||||||
shared.vector_advertise_queue.put(obj.vector)
|
shared.vector_advertise_queue.put(obj.vector)
|
||||||
|
if shared.zmq_socket:
|
||||||
|
shared.zmq_socket.send(
|
||||||
|
b'obj\x00%i\x00' % obj.object_type + obj.to_bytes())
|
||||||
|
|
||||||
elif m.command == b'getdata':
|
elif m.command == b'getdata':
|
||||||
getdata = message.GetData.from_message(m)
|
getdata = message.GetData.from_message(m)
|
||||||
|
|
|
@ -54,6 +54,17 @@ def parse_arguments():
|
||||||
'--i2p-transient', action='store_true',
|
'--i2p-transient', action='store_true',
|
||||||
help='Generate new I2P destination on start')
|
help='Generate new I2P destination on start')
|
||||||
|
|
||||||
|
try:
|
||||||
|
import zmq
|
||||||
|
zmq_context = zmq.Context()
|
||||||
|
except ImportError:
|
||||||
|
zmq_context = None
|
||||||
|
else:
|
||||||
|
parser.add_argument(
|
||||||
|
'--msg-queue', action='store_true', help='Enable messages queue')
|
||||||
|
parser.add_argument(
|
||||||
|
'--request-queue', help='Set the URL for a request queue')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if args.port:
|
if args.port:
|
||||||
shared.listening_port = args.port
|
shared.listening_port = args.port
|
||||||
|
@ -101,6 +112,17 @@ def parse_arguments():
|
||||||
if args.i2p_transient:
|
if args.i2p_transient:
|
||||||
shared.i2p_transient = True
|
shared.i2p_transient = True
|
||||||
|
|
||||||
|
if zmq_context is None:
|
||||||
|
return
|
||||||
|
if args.msg_queue:
|
||||||
|
shared.zmq_socket = zmq_context.socket(zmq.PUB)
|
||||||
|
shared.zmq_socket.bind("tcp://*:5556")
|
||||||
|
if args.request_queue:
|
||||||
|
shared.request_queue = zmq_context.socket(zmq.SUB)
|
||||||
|
shared.request_queue.connect(args.request_queue)
|
||||||
|
shared.request_queue.setsockopt(zmq.RCVTIMEO, 1000)
|
||||||
|
shared.request_queue.setsockopt(zmq.SUBSCRIBE, b'msg')
|
||||||
|
|
||||||
|
|
||||||
def load_data():
|
def load_data():
|
||||||
"""Loads initial nodes and data, stored in files between sessions"""
|
"""Loads initial nodes and data, stored in files between sessions"""
|
||||||
|
|
|
@ -4,12 +4,13 @@ import base64
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
import queue
|
|
||||||
import random
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from . import proofofwork, shared, structure
|
import zmq
|
||||||
|
|
||||||
|
from . import message, proofofwork, shared, structure
|
||||||
from .connection import Connection
|
from .connection import Connection
|
||||||
from .i2p import I2PDialer
|
from .i2p import I2PDialer
|
||||||
|
|
||||||
|
@ -18,7 +19,6 @@ class Manager(threading.Thread):
|
||||||
"""The manager thread"""
|
"""The manager thread"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(name='Manager')
|
super().__init__(name='Manager')
|
||||||
self.q = queue.Queue()
|
|
||||||
self.last_cleaned_objects = time.time()
|
self.last_cleaned_objects = time.time()
|
||||||
self.last_cleaned_connections = time.time()
|
self.last_cleaned_connections = time.time()
|
||||||
self.last_pickled_objects = time.time()
|
self.last_pickled_objects = time.time()
|
||||||
|
@ -51,6 +51,21 @@ class Manager(threading.Thread):
|
||||||
self.publish_i2p_destination()
|
self.publish_i2p_destination()
|
||||||
self.last_published_i2p_destination = now
|
self.last_published_i2p_destination = now
|
||||||
|
|
||||||
|
try:
|
||||||
|
tag, data = shared.request_queue.recv().split(b'\x00', 1)
|
||||||
|
obj = structure.Object.from_message(
|
||||||
|
message.Message(b'object', data)
|
||||||
|
)
|
||||||
|
except (AttributeError, ValueError, zmq.error.Again):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
if obj.is_valid():
|
||||||
|
with shared.objects_lock:
|
||||||
|
shared.objects[obj.vector] = obj
|
||||||
|
else:
|
||||||
|
logging.error(
|
||||||
|
'Got invalid object in the request queue: %s', obj)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clean_objects():
|
def clean_objects():
|
||||||
for vector in set(shared.objects):
|
for vector in set(shared.objects):
|
||||||
|
|
|
@ -64,3 +64,6 @@ connection_limit = 250
|
||||||
|
|
||||||
objects = {}
|
objects = {}
|
||||||
objects_lock = threading.Lock()
|
objects_lock = threading.Lock()
|
||||||
|
|
||||||
|
zmq_socket = None
|
||||||
|
request_queue = None
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
"""Test interprocess queues"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
from minode import structure, message
|
||||||
|
|
||||||
|
from .test_process import TestProcessProto
|
||||||
|
|
||||||
|
|
||||||
|
class TestProcessQueue(TestProcessProto):
|
||||||
|
"""A test case starting the process with queues"""
|
||||||
|
_process_cmd = ['minode', '--msg-queue']
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
super().setUpClass()
|
||||||
|
context = zmq.Context()
|
||||||
|
cls.socket = context.socket(zmq.SUB)
|
||||||
|
cls.socket.connect('tcp://localhost:5556')
|
||||||
|
cls.socket.setsockopt(zmq.RCVTIMEO, 5000)
|
||||||
|
cls.socket.setsockopt(zmq.SUBSCRIBE, b'obj')
|
||||||
|
|
||||||
|
def test_receive_msg(self):
|
||||||
|
"""wait a couple of messages"""
|
||||||
|
timeout = 240
|
||||||
|
start = time.time()
|
||||||
|
c = 0
|
||||||
|
while time.time() - start < timeout:
|
||||||
|
if c > 1:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
tag, data = self.socket.recv().split(b'\x00', 1)
|
||||||
|
except zmq.error.Again:
|
||||||
|
continue
|
||||||
|
|
||||||
|
c += 1
|
||||||
|
self.assertEqual(tag, b'obj')
|
||||||
|
obj_type, data = data.split(b'\x00', 1)
|
||||||
|
obj = structure.Object.from_message(
|
||||||
|
message.Message(b'object', data))
|
||||||
|
self.assertEqual(int(obj_type), obj.object_type)
|
||||||
|
self.assertTrue(obj.is_valid())
|
||||||
|
|
||||||
|
if c == 0:
|
||||||
|
self.fail('No messages received in %ss' % timeout)
|
|
@ -1,2 +1,3 @@
|
||||||
coverage
|
coverage
|
||||||
psutil
|
psutil
|
||||||
|
pyzmq
|
||||||
|
|
Loading…
Reference in New Issue