From 5cd4ecb4371741a94c0f7831dbed39834cfd920b Mon Sep 17 00:00:00 2001
From: anand k <anand.theskss@gmail.com>
Date: Sun, 23 Jun 2024 10:53:48 +0530
Subject: [PATCH] Moved invQueue to network module

---
 src/api.py                   |  6 +++---
 src/class_objectProcessor.py |  4 ++--
 src/class_singleWorker.py    | 16 ++++++++--------
 src/network/__init__.py      |  5 ++++-
 src/network/bmproto.py       |  4 ++--
 src/network/invthread.py     |  3 +--
 src/network/tcp.py           |  4 ++--
 src/queues.py                |  1 -
 8 files changed, 22 insertions(+), 21 deletions(-)

diff --git a/src/api.py b/src/api.py
index a4445569..f9bf55de 100644
--- a/src/api.py
+++ b/src/api.py
@@ -100,7 +100,7 @@ try:
 except ImportError:
     connectionpool = None
 
-from network import stats, StoppableThread
+from network import stats, StoppableThread, invQueue
 from version import softwareVersion
 
 try:  # TODO: write tests for XML vulnerabilities
@@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
         logger.info(
             'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
             ' command): %s', hexlify(inventoryHash))
-        queues.invQueue.put((toStreamNumber, inventoryHash))
+        invQueue.put((toStreamNumber, inventoryHash))
         return hexlify(inventoryHash).decode()
 
     @command('trashSentMessageByAckData')
@@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
         logger.info(
             'broadcasting inv within API command disseminatePubkey with'
             ' hash: %s', hexlify(inventoryHash))
-        queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
+        invQueue.put((pubkeyStreamNumber, inventoryHash))
 
     @command(
         'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')
diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py
index 469ccbfa..974631cb 100644
--- a/src/class_objectProcessor.py
+++ b/src/class_objectProcessor.py
@@ -30,7 +30,7 @@ from addresses import (
 from bmconfigparser import config
 from helper_sql import (
     sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
-from network import knownnodes
+from network import knownnodes, invQueue
 from network.node import Peer
 from tr import _translate
 
@@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
             inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
             state.Inventory[inventoryHash] = (
                 objectType, toStreamNumber, ackPayload, expiresTime, b'')
-            queues.invQueue.put((toStreamNumber, inventoryHash))
+            invQueue.put((toStreamNumber, inventoryHash))
 
         # Display timing data
         timeRequiredToAttemptToDecryptMessage = time.time(
diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py
index f2821f65..f79d9240 100644
--- a/src/class_singleWorker.py
+++ b/src/class_singleWorker.py
@@ -28,7 +28,7 @@ import tr
 from addresses import decodeAddress, decodeVarint, encodeVarint
 from bmconfigparser import config
 from helper_sql import sqlExecute, sqlQuery
-from network import knownnodes, StoppableThread
+from network import knownnodes, StoppableThread, invQueue
 from six.moves import configparser, queue
 
 
@@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'broadcasting inv with hash: %s', hexlify(inventoryHash))
 
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
         queues.UISignalQueue.put(('updateStatusBar', ''))
         try:
             config.set(
@@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
         self.logger.info(
             'sending inv (within sendOnionPeerObj function) for object: %s',
             hexlify(inventoryHash))
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
 
     def sendBroadcast(self):
         """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
                 ' for object: %s',
                 hexlify(inventoryHash)
             )
-            queues.invQueue.put((streamNumber, inventoryHash))
+            invQueue.put((streamNumber, inventoryHash))
 
             queues.UISignalQueue.put((
                 'updateSentItemStatusByAckdata', (
@@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
                 'Broadcasting inv for my msg(within sendmsg function): %s',
                 hexlify(inventoryHash)
             )
-            queues.invQueue.put((toStreamNumber, inventoryHash))
+            invQueue.put((toStreamNumber, inventoryHash))
 
             # Update the sent message in the sent table with the
             # necessary information.
@@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
         state.Inventory[inventoryHash] = (
             objectType, streamNumber, payload, embeddedTime, '')
         self.logger.info('sending inv (for the getpubkey message)')
-        queues.invQueue.put((streamNumber, inventoryHash))
+        invQueue.put((streamNumber, inventoryHash))
 
         # wait 10% past expiration
         sleeptill = int(time.time() + TTL * 1.1)
diff --git a/src/network/__init__.py b/src/network/__init__.py
index 42e9d035..642842df 100644
--- a/src/network/__init__.py
+++ b/src/network/__init__.py
@@ -3,15 +3,18 @@ Network subsystem package
 """
 from .dandelion import Dandelion
 from .threads import StoppableThread
+from .multiqueue import MultiQueue
 
 dandelion_ins = Dandelion()
 
+# network queues
+invQueue = MultiQueue()
+
 __all__ = ["StoppableThread"]
 
 
 def start(config, state):
     """Start network threads"""
-    import state
     from .announcethread import AnnounceThread
     import connectionpool  # pylint: disable=relative-import
     from .addrthread import AddrThread
diff --git a/src/network/bmproto.py b/src/network/bmproto.py
index 797dab5e..2b8dff79 100644
--- a/src/network/bmproto.py
+++ b/src/network/bmproto.py
@@ -17,7 +17,7 @@ import protocol
 import state
 import connectionpool
 from bmconfigparser import config
-from queues import invQueue, objectProcessorQueue, portCheckerQueue
+from queues import objectProcessorQueue, portCheckerQueue
 from randomtrackingdict import RandomTrackingDict
 from network.advanceddispatcher import AdvancedDispatcher
 from network.bmobject import (
@@ -26,7 +26,7 @@ from network.bmobject import (
     BMObjectUnwantedStreamError
 )
 from network.proxy import ProxyError
-from network import dandelion_ins
+from network import dandelion_ins, invQueue
 from node import Node, Peer
 from objectracker import ObjectTracker, missingObjects
 
diff --git a/src/network/invthread.py b/src/network/invthread.py
index 0b79710a..503eefa1 100644
--- a/src/network/invthread.py
+++ b/src/network/invthread.py
@@ -9,8 +9,7 @@ import addresses
 import protocol
 import state
 import connectionpool
-from network import dandelion_ins
-from queues import invQueue
+from network import dandelion_ins, invQueue
 from threads import StoppableThread
 
 
diff --git a/src/network/tcp.py b/src/network/tcp.py
index f2dce07d..ec29c9ae 100644
--- a/src/network/tcp.py
+++ b/src/network/tcp.py
@@ -17,8 +17,8 @@ import state
 import connectionpool
 from bmconfigparser import config
 from highlevelcrypto import randomBytes
-from network import dandelion_ins
-from queues import invQueue, receiveDataQueue, UISignalQueue
+from network import dandelion_ins, invQueue
+from queues import receiveDataQueue, UISignalQueue
 from tr import _translate
 
 import asyncore_pollchoose as asyncore
diff --git a/src/queues.py b/src/queues.py
index a95e8e46..8e46ccf0 100644
--- a/src/queues.py
+++ b/src/queues.py
@@ -43,7 +43,6 @@ addressGeneratorQueue = queue.Queue()
 #: `.network.ReceiveQueueThread` instances dump objects they hear
 #: on the network into this queue to be processed.
 objectProcessorQueue = ObjectProcessorQueue()
-invQueue = MultiQueue()
 addrQueue = MultiQueue()
 portCheckerQueue = queue.Queue()
 receiveDataQueue = queue.Queue()