From 6578b5b9250224872612aa338c20622056dcba68 Mon Sep 17 00:00:00 2001 From: anand k Date: Sun, 23 Jun 2024 08:43:51 +0530 Subject: [PATCH 1/3] Moved multiqueue to network module --- src/{ => network}/multiqueue.py | 9 ++------- src/queues.py | 7 +++---- src/tests/test_multiqueue.py | 2 +- 3 files changed, 6 insertions(+), 12 deletions(-) rename src/{ => network}/multiqueue.py (89%) diff --git a/src/multiqueue.py b/src/network/multiqueue.py similarity index 89% rename from src/multiqueue.py rename to src/network/multiqueue.py index 88b6a4dd..3fad4e34 100644 --- a/src/multiqueue.py +++ b/src/network/multiqueue.py @@ -2,16 +2,11 @@ A queue with multiple internal subqueues. Elements are added into a random subqueue, and retrieval rotates """ - +import random from collections import deque from six.moves import queue -try: - import helper_random -except ImportError: - from . import helper_random - class MultiQueue(queue.Queue): """A base queue class""" @@ -38,7 +33,7 @@ class MultiQueue(queue.Queue): # Put a new item in the queue def _put(self, item): # self.queue.append(item) - self.queues[helper_random.randomrandrange(self.queueCount)].append( + self.queues[random.randrange(self.queueCount)].append( # nosec B311 (item)) # Get an item from the queue diff --git a/src/queues.py b/src/queues.py index 4a9b98d2..a95e8e46 100644 --- a/src/queues.py +++ b/src/queues.py @@ -5,10 +5,9 @@ import time from six.moves import queue -try: - from multiqueue import MultiQueue -except ImportError: - from .multiqueue import MultiQueue + +from network.multiqueue import MultiQueue + class ObjectProcessorQueue(queue.Queue): diff --git a/src/tests/test_multiqueue.py b/src/tests/test_multiqueue.py index 87149d56..4b041f1c 100644 --- a/src/tests/test_multiqueue.py +++ b/src/tests/test_multiqueue.py @@ -1,7 +1,7 @@ """Test cases for multiqueue""" import unittest -from pybitmessage.multiqueue import MultiQueue +from pybitmessage.network.multiqueue import MultiQueue class TestMultiQueue(unittest.TestCase): -- 2.45.1 From 5cd4ecb4371741a94c0f7831dbed39834cfd920b Mon Sep 17 00:00:00 2001 From: anand k Date: Sun, 23 Jun 2024 10:53:48 +0530 Subject: [PATCH 2/3] Moved invQueue to network module --- src/api.py | 6 +++--- src/class_objectProcessor.py | 4 ++-- src/class_singleWorker.py | 16 ++++++++-------- src/network/__init__.py | 5 ++++- src/network/bmproto.py | 4 ++-- src/network/invthread.py | 3 +-- src/network/tcp.py | 4 ++-- src/queues.py | 1 - 8 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/api.py b/src/api.py index a4445569..f9bf55de 100644 --- a/src/api.py +++ b/src/api.py @@ -100,7 +100,7 @@ try: except ImportError: connectionpool = None -from network import stats, StoppableThread +from network import stats, StoppableThread, invQueue from version import softwareVersion try: # TODO: write tests for XML vulnerabilities @@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object): logger.info( 'Broadcasting inv for msg(API disseminatePreEncryptedMsg' ' command): %s', hexlify(inventoryHash)) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) return hexlify(inventoryHash).decode() @command('trashSentMessageByAckData') @@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object): logger.info( 'broadcasting inv within API command disseminatePubkey with' ' hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) + invQueue.put((pubkeyStreamNumber, inventoryHash)) @command( 'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag') diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 469ccbfa..974631cb 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -30,7 +30,7 @@ from addresses import ( from bmconfigparser import config from helper_sql import ( sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) -from network import knownnodes +from network import knownnodes, invQueue from network.node import Peer from tr import _translate @@ -729,7 +729,7 @@ class objectProcessor(threading.Thread): inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) state.Inventory[inventoryHash] = ( objectType, toStreamNumber, ackPayload, expiresTime, b'') - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index f2821f65..f79d9240 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -28,7 +28,7 @@ import tr from addresses import decodeAddress, decodeVarint, encodeVarint from bmconfigparser import config from helper_sql import sqlExecute, sqlQuery -from network import knownnodes, StoppableThread +from network import knownnodes, StoppableThread, invQueue from six.moves import configparser, queue @@ -293,7 +293,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -381,7 +381,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -474,7 +474,7 @@ class singleWorker(StoppableThread): self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: config.set( @@ -522,7 +522,7 @@ class singleWorker(StoppableThread): self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) def sendBroadcast(self): """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" @@ -690,7 +690,7 @@ class singleWorker(StoppableThread): ' for object: %s', hexlify(inventoryHash) ) - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( @@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread): 'Broadcasting inv for my msg(within sendmsg function): %s', hexlify(inventoryHash) ) - queues.invQueue.put((toStreamNumber, inventoryHash)) + invQueue.put((toStreamNumber, inventoryHash)) # Update the sent message in the sent table with the # necessary information. @@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread): state.Inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') self.logger.info('sending inv (for the getpubkey message)') - queues.invQueue.put((streamNumber, inventoryHash)) + invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/network/__init__.py b/src/network/__init__.py index 42e9d035..642842df 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -3,15 +3,18 @@ Network subsystem package """ from .dandelion import Dandelion from .threads import StoppableThread +from .multiqueue import MultiQueue dandelion_ins = Dandelion() +# network queues +invQueue = MultiQueue() + __all__ = ["StoppableThread"] def start(config, state): """Start network threads""" - import state from .announcethread import AnnounceThread import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 797dab5e..2b8dff79 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -17,7 +17,7 @@ import protocol import state import connectionpool from bmconfigparser import config -from queues import invQueue, objectProcessorQueue, portCheckerQueue +from queues import objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict from network.advanceddispatcher import AdvancedDispatcher from network.bmobject import ( @@ -26,7 +26,7 @@ from network.bmobject import ( BMObjectUnwantedStreamError ) from network.proxy import ProxyError -from network import dandelion_ins +from network import dandelion_ins, invQueue from node import Node, Peer from objectracker import ObjectTracker, missingObjects diff --git a/src/network/invthread.py b/src/network/invthread.py index 0b79710a..503eefa1 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,8 +9,7 @@ import addresses import protocol import state import connectionpool -from network import dandelion_ins -from queues import invQueue +from network import dandelion_ins, invQueue from threads import StoppableThread diff --git a/src/network/tcp.py b/src/network/tcp.py index f2dce07d..ec29c9ae 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -17,8 +17,8 @@ import state import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes -from network import dandelion_ins -from queues import invQueue, receiveDataQueue, UISignalQueue +from network import dandelion_ins, invQueue +from queues import receiveDataQueue, UISignalQueue from tr import _translate import asyncore_pollchoose as asyncore diff --git a/src/queues.py b/src/queues.py index a95e8e46..8e46ccf0 100644 --- a/src/queues.py +++ b/src/queues.py @@ -43,7 +43,6 @@ addressGeneratorQueue = queue.Queue() #: `.network.ReceiveQueueThread` instances dump objects they hear #: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() -invQueue = MultiQueue() addrQueue = MultiQueue() portCheckerQueue = queue.Queue() receiveDataQueue = queue.Queue() -- 2.45.1 From 49e89ecdf20586c15349644d271e9fa6fdab4deb Mon Sep 17 00:00:00 2001 From: anand k Date: Sun, 23 Jun 2024 10:57:57 +0530 Subject: [PATCH 3/3] Moved addrQueue to network module --- src/network/__init__.py | 1 + src/network/addrthread.py | 2 +- src/queues.py | 5 ----- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/network/__init__.py b/src/network/__init__.py index 642842df..073c8435 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -9,6 +9,7 @@ dandelion_ins = Dandelion() # network queues invQueue = MultiQueue() +addrQueue = MultiQueue() __all__ = ["StoppableThread"] diff --git a/src/network/addrthread.py b/src/network/addrthread.py index a77e609c..81e44506 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -7,7 +7,7 @@ from six.moves import queue # magic imports! import connectionpool from protocol import assembleAddrMessage -from queues import addrQueue # FIXME: init with queue +from network import addrQueue # FIXME: init with queue from threads import StoppableThread diff --git a/src/queues.py b/src/queues.py index 8e46ccf0..18eb6dfa 100644 --- a/src/queues.py +++ b/src/queues.py @@ -6,10 +6,6 @@ import time from six.moves import queue -from network.multiqueue import MultiQueue - - - class ObjectProcessorQueue(queue.Queue): """Special queue class using lock for `.threads.objectProcessor`""" @@ -43,7 +39,6 @@ addressGeneratorQueue = queue.Queue() #: `.network.ReceiveQueueThread` instances dump objects they hear #: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() -addrQueue = MultiQueue() portCheckerQueue = queue.Queue() receiveDataQueue = queue.Queue() #: The address generator thread uses this queue to get information back -- 2.45.1