From 6578b5b9250224872612aa338c20622056dcba68 Mon Sep 17 00:00:00 2001
From: anand k <anand.theskss@gmail.com>
Date: Sun, 23 Jun 2024 08:43:51 +0530
Subject: [PATCH 1/3] Moved multiqueue to network module

---
 src/{ => network}/multiqueue.py | 9 ++-------
 src/queues.py                   | 7 +++----
 src/tests/test_multiqueue.py    | 2 +-
 3 files changed, 6 insertions(+), 12 deletions(-)
 rename src/{ => network}/multiqueue.py (89%)

diff --git a/src/multiqueue.py b/src/network/multiqueue.py
similarity index 89%
rename from src/multiqueue.py
rename to src/network/multiqueue.py
index 88b6a4dd..3fad4e34 100644
--- a/src/multiqueue.py
+++ b/src/network/multiqueue.py
@@ -2,16 +2,11 @@
 A queue with multiple internal subqueues.
 Elements are added into a random subqueue, and retrieval rotates
 """
-
+import random
 from collections import deque
 
 from six.moves import queue
 
-try:
-    import helper_random
-except ImportError:
-    from . import helper_random
-
 
 class MultiQueue(queue.Queue):
     """A base queue class"""
@@ -38,7 +33,7 @@ class MultiQueue(queue.Queue):
     # Put a new item in the queue
     def _put(self, item):
         # self.queue.append(item)
-        self.queues[helper_random.randomrandrange(self.queueCount)].append(
+        self.queues[random.randrange(self.queueCount)].append(  # nosec B311
             (item))
 
     # Get an item from the queue
diff --git a/src/queues.py b/src/queues.py
index 4a9b98d2..a95e8e46 100644
--- a/src/queues.py
+++ b/src/queues.py
@@ -5,10 +5,9 @@ import time
 
 from six.moves import queue
 
-try:
-    from multiqueue import MultiQueue
-except ImportError:
-    from .multiqueue import MultiQueue
+
+from network.multiqueue import MultiQueue
+
 
 
 class ObjectProcessorQueue(queue.Queue):
diff --git a/src/tests/test_multiqueue.py b/src/tests/test_multiqueue.py
index 87149d56..4b041f1c 100644
--- a/src/tests/test_multiqueue.py
+++ b/src/tests/test_multiqueue.py
@@ -1,7 +1,7 @@
 """Test cases for multiqueue"""
 
 import unittest
-from pybitmessage.multiqueue import MultiQueue
+from pybitmessage.network.multiqueue import MultiQueue
 
 
 class TestMultiQueue(unittest.TestCase):
-- 
2.45.1


From 5cd4ecb4371741a94c0f7831dbed39834cfd920b Mon Sep 17 00:00:00 2001
From: anand k <anand.theskss@gmail.com>
Date: Sun, 23 Jun 2024 10:53:48 +0530
Subject: [PATCH 2/3] Moved invQueue to network module

---
 src/api.py                   |  6 +++---
 src/class_objectProcessor.py |  4 ++--
 src/class_singleWorker.py    | 16 ++++++++--------
 src/network/__init__.py      |  5 ++++-
 src/network/bmproto.py       |  4 ++--
 src/network/invthread.py     |  3 +--
 src/network/tcp.py           |  4 ++--
 src/queues.py                |  1 -
 8 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/src/api.py b/src/api.py
index a4445569..f9bf55de 100644
--- a/src/api.py
+++ b/src/api.py
@@ -100,7 +100,7 @@ try:
 except ImportError:
     connectionpool = None
 
-from network import stats, StoppableThread
+from network import stats, StoppableThread, invQueue
 from version import softwareVersion
 
 try:  # TODO: write tests for XML vulnerabilities
@@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
         logger.info(
             'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
             ' command): %s', hexlify(inventoryHash))
-        queues.invQueue.put((toStreamNumber, inventoryHash))
+        invQueue.put((toStreamNumber, inventoryHash))
         return hexlify(inventoryHash).decode()
 
     @command('trashSentMessageByAckData')
@@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
         logger.info(
             'broadcasting inv within API command disseminatePubkey with'
             ' hash: %s', hexlify(inventoryHash))
-        queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
+        invQueue.put((pubkeyStreamNumber, inventoryHash))
 
     @command(
         'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')
diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py
index 469ccbfa..974631cb 100644
--- a/src/class_objectProcessor.py
+++ b/src/class_objectProcessor.py
@@ -30,7 +30,7 @@ from addresses import (
 from bmconfigparser import config
 from helper_sql import (
     sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
-from network import knownnodes
+from network import knownnodes, invQueue
 from network.node import Peer
 from tr import _translate
 
@@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
             inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
             state.Inventory[inventoryHash] = (
                 objectType, toStreamNumber, ackPayload, expiresTime, b'')
-            queues.invQueue.put((toStreamNumber, inventoryHash))
+            invQueue.put((toStreamNumber, inventoryHash))
 
         # Display timing data
         timeRequiredToAttemptToDecryptMessage = time.time(
diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py
index f2821f65..f79d9240 100644
--- a/src/class_singleWorker.py
+++ b/src/class_singleWorker.py
@@ -28,7 +28,7 @@ import tr
 from addresses import decodeAddress, decodeVarint, encodeVarint
 from bmconfigparser import config
 from helper_sql import sqlExecute, sqlQuery
-from network import knownnodes, StoppableThread
+from network import knownnodes, StoppableThread, invQueue
 from six.moves import configparser, queue
 
 
@@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'sending inv (within sendOnionPeerObj function) for object: %s',
             hexlify(inventoryHash))
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
 
     def sendBroadcast(self):
         """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
                 ' for object: %s',
                 hexlify(inventoryHash)
             )
-            queues.invQueue.put((streamNumber, inventoryHash))
+            invQueue.put((streamNumber, inventoryHash))
 
             queues.UISignalQueue.put((
                 'updateSentItemStatusByAckdata', (
@@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
                 'Broadcasting inv for my msg(within sendmsg function): %s',
                 hexlify(inventoryHash)
             )
-            queues.invQueue.put((toStreamNumber, inventoryHash))
+            invQueue.put((toStreamNumber, inventoryHash))
 
             # Update the sent message in the sent table with the
             # necessary information.
@@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
         state.Inventory[inventoryHash] = (
             objectType, streamNumber, payload, embeddedTime, '')
         self.logger.info('sending inv (for the getpubkey message)')
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
 
         # wait 10% past expiration
         sleeptill = int(time.time() + TTL * 1.1)
diff --git a/src/network/__init__.py b/src/network/__init__.py
index 42e9d035..642842df 100644
--- a/src/network/__init__.py
+++ b/src/network/__init__.py
@@ -3,15 +3,18 @@ Network subsystem package
 """
 from .dandelion import Dandelion
 from .threads import StoppableThread
+from .multiqueue import MultiQueue
 
 dandelion_ins = Dandelion()
 
+# network queues
+invQueue = MultiQueue()
+
 __all__ = ["StoppableThread"]
 
 
 def start(config, state):
     """Start network threads"""
-    import state
     from .announcethread import AnnounceThread
     import connectionpool  # pylint: disable=relative-import
     from .addrthread import AddrThread
diff --git a/src/network/bmproto.py b/src/network/bmproto.py
index 797dab5e..2b8dff79 100644
--- a/src/network/bmproto.py
+++ b/src/network/bmproto.py
@@ -17,7 +17,7 @@ import protocol
 import state
 import connectionpool
 from bmconfigparser import config
-from queues import invQueue, objectProcessorQueue, portCheckerQueue
+from queues import objectProcessorQueue, portCheckerQueue
 from randomtrackingdict import RandomTrackingDict
 from network.advanceddispatcher import AdvancedDispatcher
 from network.bmobject import (
@@ -26,7 +26,7 @@ from network.bmobject import (
     BMObjectUnwantedStreamError
 )
 from network.proxy import ProxyError
-from network import dandelion_ins
+from network import dandelion_ins, invQueue
 from node import Node, Peer
 from objectracker import ObjectTracker, missingObjects
 
diff --git a/src/network/invthread.py b/src/network/invthread.py
index 0b79710a..503eefa1 100644
--- a/src/network/invthread.py
+++ b/src/network/invthread.py
@@ -9,8 +9,7 @@ import addresses
 import protocol
 import state
 import connectionpool
-from network import dandelion_ins
-from queues import invQueue
+from network import dandelion_ins, invQueue
 from threads import StoppableThread
 
 
diff --git a/src/network/tcp.py b/src/network/tcp.py
index f2dce07d..ec29c9ae 100644
--- a/src/network/tcp.py
+++ b/src/network/tcp.py
@@ -17,8 +17,8 @@ import state
 import connectionpool
 from bmconfigparser import config
 from highlevelcrypto import randomBytes
-from network import dandelion_ins
-from queues import invQueue, receiveDataQueue, UISignalQueue
+from network import dandelion_ins, invQueue
+from queues import receiveDataQueue, UISignalQueue
 from tr import _translate
 
 import asyncore_pollchoose as asyncore
diff --git a/src/queues.py b/src/queues.py
index a95e8e46..8e46ccf0 100644
--- a/src/queues.py
+++ b/src/queues.py
@@ -43,7 +43,6 @@ addressGeneratorQueue = queue.Queue()
 #: `.network.ReceiveQueueThread` instances dump objects they hear
 #: on the network into this queue to be processed.
 objectProcessorQueue = ObjectProcessorQueue()
-invQueue = MultiQueue()
 addrQueue = MultiQueue()
 portCheckerQueue = queue.Queue()
 receiveDataQueue = queue.Queue()
-- 
2.45.1


From 49e89ecdf20586c15349644d271e9fa6fdab4deb Mon Sep 17 00:00:00 2001
From: anand k <anand.theskss@gmail.com>
Date: Sun, 23 Jun 2024 10:57:57 +0530
Subject: [PATCH 3/3] Moved addrQueue to network module

---
 src/network/__init__.py   | 1 +
 src/network/addrthread.py | 2 +-
 src/queues.py             | 5 -----
 3 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/src/network/__init__.py b/src/network/__init__.py
index 642842df..073c8435 100644
--- a/src/network/__init__.py
+++ b/src/network/__init__.py
@@ -9,6 +9,7 @@ dandelion_ins = Dandelion()
 
 # network queues
 invQueue = MultiQueue()
+addrQueue = MultiQueue()
 
 __all__ = ["StoppableThread"]
 
diff --git a/src/network/addrthread.py b/src/network/addrthread.py
index a77e609c..81e44506 100644
--- a/src/network/addrthread.py
+++ b/src/network/addrthread.py
@@ -7,7 +7,7 @@ from six.moves import queue
 # magic imports!
 import connectionpool
 from protocol import assembleAddrMessage
-from queues import addrQueue  # FIXME: init with queue
+from network import addrQueue  # FIXME: init with queue
 
 from threads import StoppableThread
 
diff --git a/src/queues.py b/src/queues.py
index 8e46ccf0..18eb6dfa 100644
--- a/src/queues.py
+++ b/src/queues.py
@@ -6,10 +6,6 @@ import time
 from six.moves import queue
 
 
-from network.multiqueue import MultiQueue
-
-
-
 class ObjectProcessorQueue(queue.Queue):
     """Special queue class using lock for `.threads.objectProcessor`"""
 
@@ -43,7 +39,6 @@ addressGeneratorQueue = queue.Queue()
 #: `.network.ReceiveQueueThread` instances dump objects they hear
 #: on the network into this queue to be processed.
 objectProcessorQueue = ObjectProcessorQueue()
-addrQueue = MultiQueue()
 portCheckerQueue = queue.Queue()
 receiveDataQueue = queue.Queue()
 #: The address generator thread uses this queue to get information back
-- 
2.45.1