diff --git a/src/api.py b/src/api.py index a4445569..f9bf55de 100644 --- a/src/api.py +++ b/src/api.py @@ -100,7 +100,7 @@ try: except ImportError: connectionpool = None -from network import stats, StoppableThread +from network import stats, StoppableThread, invQueue from version import softwareVersion try: # TODO: write tests for XML vulnerabilities @@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object): logger.info( 'Broadcasting inv for msg(API disseminatePreEncryptedMsg' ' command): %s', hexlify(inventoryHash)) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) return hexlify(inventoryHash).decode() @command('trashSentMessageByAckData') @@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object): logger.info( 'broadcasting inv within API command disseminatePubkey with' ' hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) + invQueue.put((pubkeyStreamNumber, inventoryHash)) @command( 'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag') diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 469ccbfa..974631cb 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -30,7 +30,7 @@ from addresses import ( from bmconfigparser import config from helper_sql import ( sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) -from network import knownnodes +from network import knownnodes, invQueue from network.node import Peer from tr import _translate @@ -729,7 +729,7 @@ class objectProcessor(threading.Thread): inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) state.Inventory[inventoryHash] = ( objectType, toStreamNumber, ackPayload, expiresTime, b'') - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index f2821f65..f79d9240 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -28,7 +28,7 @@ import tr from addresses import decodeAddress, decodeVarint, encodeVarint from bmconfigparser import config from helper_sql import sqlExecute, sqlQuery -from network import knownnodes, StoppableThread +from network import knownnodes, StoppableThread, invQueue from six.moves import configparser, queue @@ -293,7 +293,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -381,7 +381,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -474,7 +474,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -522,7 +522,7 @@ class singleWorker(StoppableThread): self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) def sendBroadcast(self): """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" @@ -690,7 +690,7 @@ class singleWorker(StoppableThread): ' for object: %s', hexlify(inventoryHash) ) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( @@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread): 'Broadcasting inv for my msg(within sendmsg function): %s', hexlify(inventoryHash) ) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Update the sent message in the sent table with the # necessary information. @@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread): state.Inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') self.logger.info('sending inv (for the getpubkey message)') - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/network/__init__.py b/src/network/__init__.py index 42e9d035..073c8435 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -3,15 +3,19 @@ Network subsystem package """ from .dandelion import Dandelion from .threads import StoppableThread +from .multiqueue import MultiQueue dandelion_ins = Dandelion() +# network queues +invQueue = MultiQueue() +addrQueue = MultiQueue() + __all__ = ["StoppableThread"] def start(config, state): """Start network threads""" - import state from .announcethread import AnnounceThread import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread diff --git a/src/network/addrthread.py b/src/network/addrthread.py index a77e609c..81e44506 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -7,7 +7,7 @@ from six.moves import queue # magic imports! import connectionpool from protocol import assembleAddrMessage -from queues import addrQueue # FIXME: init with queue +from network import addrQueue # FIXME: init with queue from threads import StoppableThread diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 797dab5e..2b8dff79 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -17,7 +17,7 @@ import protocol import state import connectionpool from bmconfigparser import config -from queues import invQueue, objectProcessorQueue, portCheckerQueue +from queues import objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict from network.advanceddispatcher import AdvancedDispatcher from network.bmobject import ( @@ -26,7 +26,7 @@ from network.bmobject import ( BMObjectUnwantedStreamError ) from network.proxy import ProxyError -from network import dandelion_ins +from network import dandelion_ins, invQueue from node import Node, Peer from objectracker import ObjectTracker, missingObjects diff --git a/src/network/invthread.py b/src/network/invthread.py index 0b79710a..503eefa1 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,8 +9,7 @@ import addresses import protocol import state import connectionpool -from network import dandelion_ins -from queues import invQueue +from network import dandelion_ins, invQueue from threads import StoppableThread diff --git a/src/multiqueue.py b/src/network/multiqueue.py similarity index 89% rename from src/multiqueue.py rename to src/network/multiqueue.py index 88b6a4dd..3fad4e34 100644 --- a/src/multiqueue.py +++ b/src/network/multiqueue.py @@ -2,16 +2,11 @@ A queue with multiple internal subqueues. Elements are added into a random subqueue, and retrieval rotates """ - +import random from collections import deque from six.moves import queue -try: - import helper_random -except ImportError: - from . import helper_random - class MultiQueue(queue.Queue): """A base queue class""" @@ -38,7 +33,7 @@ class MultiQueue(queue.Queue): # Put a new item in the queue def _put(self, item): # self.queue.append(item) - self.queues[helper_random.randomrandrange(self.queueCount)].append( + self.queues[random.randrange(self.queueCount)].append( # nosec B311 (item)) # Get an item from the queue diff --git a/src/network/tcp.py b/src/network/tcp.py index f2dce07d..ec29c9ae 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -17,8 +17,8 @@ import state import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes -from network import dandelion_ins -from queues import invQueue, receiveDataQueue, UISignalQueue +from network import dandelion_ins, invQueue +from queues import receiveDataQueue, UISignalQueue from tr import _translate import asyncore_pollchoose as asyncore diff --git a/src/queues.py b/src/queues.py index 4a9b98d2..18eb6dfa 100644 --- a/src/queues.py +++ b/src/queues.py @@ -5,11 +5,6 @@ import time from six.moves import queue -try: - from multiqueue import MultiQueue -except ImportError: - from .multiqueue import MultiQueue - class ObjectProcessorQueue(queue.Queue): """Special queue class using lock for `.threads.objectProcessor`""" @@ -44,8 +39,6 @@ addressGeneratorQueue = queue.Queue() #: `.network.ReceiveQueueThread` instances dump objects they hear #: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() -invQueue = MultiQueue() -addrQueue = MultiQueue() portCheckerQueue = queue.Queue() receiveDataQueue = queue.Queue() #: The address generator thread uses this queue to get information back diff --git a/src/tests/test_multiqueue.py b/src/tests/test_multiqueue.py index 87149d56..4b041f1c 100644 --- a/src/tests/test_multiqueue.py +++ b/src/tests/test_multiqueue.py @@ -1,7 +1,7 @@ """Test cases for multiqueue""" import unittest -from pybitmessage.multiqueue import MultiQueue +from pybitmessage.network.multiqueue import MultiQueue class TestMultiQueue(unittest.TestCase):