diff --git a/src/api.py b/src/api.py index a4445569..f9bf55de 100644 --- a/src/api.py +++ b/src/api.py @@ -100,7 +100,7 @@ try: except ImportError: connectionpool = None -from network import stats, StoppableThread +from network import stats, StoppableThread, invQueue from version import softwareVersion try: # TODO: write tests for XML vulnerabilities @@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object): logger.info( 'Broadcasting inv for msg(API disseminatePreEncryptedMsg' ' command): %s', hexlify(inventoryHash)) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) return hexlify(inventoryHash).decode() @command('trashSentMessageByAckData') @@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object): logger.info( 'broadcasting inv within API command disseminatePubkey with' ' hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) + invQueue.put((pubkeyStreamNumber, inventoryHash)) @command( 'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag') diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 469ccbfa..974631cb 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -30,7 +30,7 @@ from addresses import ( from bmconfigparser import config from helper_sql import ( sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) -from network import knownnodes +from network import knownnodes, invQueue from network.node import Peer from tr import _translate @@ -729,7 +729,7 @@ class objectProcessor(threading.Thread): inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) state.Inventory[inventoryHash] = ( objectType, toStreamNumber, ackPayload, expiresTime, b'') - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index f2821f65..f79d9240 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -28,7 +28,7 @@ import tr from addresses import decodeAddress, decodeVarint, encodeVarint from bmconfigparser import config from helper_sql import sqlExecute, sqlQuery -from network import knownnodes, StoppableThread +from network import knownnodes, StoppableThread, invQueue from six.moves import configparser, queue @@ -293,7 +293,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -381,7 +381,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -474,7 +474,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -522,7 +522,7 @@ class singleWorker(StoppableThread): self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) def sendBroadcast(self): """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" @@ -690,7 +690,7 @@ class singleWorker(StoppableThread): ' for object: %s', hexlify(inventoryHash) ) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( @@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread): 'Broadcasting inv for my msg(within sendmsg function): %s', hexlify(inventoryHash) ) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Update the sent message in the sent table with the # necessary information. @@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread): state.Inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') self.logger.info('sending inv (for the getpubkey message)') - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/network/__init__.py b/src/network/__init__.py index 42e9d035..642842df 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -3,15 +3,18 @@ Network subsystem package """ from .dandelion import Dandelion from .threads import StoppableThread +from .multiqueue import MultiQueue dandelion_ins = Dandelion() +# network queues +invQueue = MultiQueue() + __all__ = ["StoppableThread"] def start(config, state): """Start network threads""" - import state from .announcethread import AnnounceThread import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 797dab5e..2b8dff79 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -17,7 +17,7 @@ import protocol import state import connectionpool from bmconfigparser import config -from queues import invQueue, objectProcessorQueue, portCheckerQueue +from queues import objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict from network.advanceddispatcher import AdvancedDispatcher from network.bmobject import ( @@ -26,7 +26,7 @@ from network.bmobject import ( BMObjectUnwantedStreamError ) from network.proxy import ProxyError -from network import dandelion_ins +from network import dandelion_ins, invQueue from node import Node, Peer from objectracker import ObjectTracker, missingObjects diff --git a/src/network/invthread.py b/src/network/invthread.py index 0b79710a..503eefa1 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,8 +9,7 @@ import addresses import protocol import state import connectionpool -from network import dandelion_ins -from queues import invQueue +from network import dandelion_ins, invQueue from threads import StoppableThread diff --git a/src/network/tcp.py b/src/network/tcp.py index f2dce07d..ec29c9ae 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -17,8 +17,8 @@ import state import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes -from network import dandelion_ins -from queues import invQueue, receiveDataQueue, UISignalQueue +from network import dandelion_ins, invQueue +from queues import receiveDataQueue, UISignalQueue from tr import _translate import asyncore_pollchoose as asyncore diff --git a/src/queues.py b/src/queues.py index a95e8e46..8e46ccf0 100644 --- a/src/queues.py +++ b/src/queues.py @@ -43,7 +43,6 @@ addressGeneratorQueue = queue.Queue() #: `.network.ReceiveQueueThread` instances dump objects they hear #: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() -invQueue = MultiQueue() addrQueue = MultiQueue() portCheckerQueue = queue.Queue() receiveDataQueue = queue.Queue()