Moved Multiqueue, invQueue and addrQueue to network module #2256

Merged
anand-skss merged 3 commits from test3 into v0.6 2024-07-06 16:43:51 +02:00
11 changed files with 27 additions and 36 deletions

View File

@ -100,7 +100,7 @@ try:
except ImportError:
connectionpool = None
from network import stats, StoppableThread
from network import stats, StoppableThread, invQueue
from version import softwareVersion
try: # TODO: write tests for XML vulnerabilities
@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
logger.info(
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
' command): %s', hexlify(inventoryHash))
queues.invQueue.put((toStreamNumber, inventoryHash))
invQueue.put((toStreamNumber, inventoryHash))
return hexlify(inventoryHash).decode()
@command('trashSentMessageByAckData')
@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
logger.info(
'broadcasting inv within API command disseminatePubkey with'
' hash: %s', hexlify(inventoryHash))
queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
invQueue.put((pubkeyStreamNumber, inventoryHash))
@command(
'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')

View File

@ -30,7 +30,7 @@ from addresses import (
from bmconfigparser import config
from helper_sql import (
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
from network import knownnodes
from network import knownnodes, invQueue
from network.node import Peer
from tr import _translate
@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
state.Inventory[inventoryHash] = (
objectType, toStreamNumber, ackPayload, expiresTime, b'')
queues.invQueue.put((toStreamNumber, inventoryHash))
invQueue.put((toStreamNumber, inventoryHash))
# Display timing data
timeRequiredToAttemptToDecryptMessage = time.time(

View File

@ -28,7 +28,7 @@ import tr
from addresses import decodeAddress, decodeVarint, encodeVarint
from bmconfigparser import config
from helper_sql import sqlExecute, sqlQuery
from network import knownnodes, StoppableThread
from network import knownnodes, StoppableThread, invQueue
from six.moves import configparser, queue
@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
config.set(
@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
config.set(
@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
config.set(
@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
self.logger.info(
'sending inv (within sendOnionPeerObj function) for object: %s',
hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
def sendBroadcast(self):
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
' for object: %s',
hexlify(inventoryHash)
)
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', (
@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
'Broadcasting inv for my msg(within sendmsg function): %s',
hexlify(inventoryHash)
)
queues.invQueue.put((toStreamNumber, inventoryHash))
invQueue.put((toStreamNumber, inventoryHash))
# Update the sent message in the sent table with the
# necessary information.
@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
state.Inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '')
self.logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash))
invQueue.put((streamNumber, inventoryHash))
# wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1)

View File

@ -3,15 +3,19 @@ Network subsystem package
"""
from .dandelion import Dandelion
from .threads import StoppableThread
from .multiqueue import MultiQueue
dandelion_ins = Dandelion()
# network queues
invQueue = MultiQueue()
addrQueue = MultiQueue()
__all__ = ["StoppableThread"]
def start(config, state):
"""Start network threads"""
import state
from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread

View File

@ -7,7 +7,7 @@ from six.moves import queue
# magic imports!
import connectionpool
from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue
from network import addrQueue # FIXME: init with queue
anand-skss commented 2024-06-23 07:45:28 +02:00 (Migrated from github.com)
Review

Please check Fixme comment, i'm not sure whether i remove this comment or not.

Please check Fixme comment, i'm not sure whether i remove this comment or not.
PeterSurda commented 2024-07-04 05:24:33 +02:00 (Migrated from github.com)
Review

I'm not sure either, let's leave it as it is.

I'm not sure either, let's leave it as it is.
from threads import StoppableThread

View File

@ -17,7 +17,7 @@ import protocol
import state
import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from queues import objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import (
@ -26,7 +26,7 @@ from network.bmobject import (
BMObjectUnwantedStreamError
)
from network.proxy import ProxyError
from network import dandelion_ins
from network import dandelion_ins, invQueue
from node import Node, Peer
from objectracker import ObjectTracker, missingObjects

View File

@ -9,8 +9,7 @@ import addresses
import protocol
import state
import connectionpool
from network import dandelion_ins
from queues import invQueue
from network import dandelion_ins, invQueue
from threads import StoppableThread

View File

@ -2,16 +2,11 @@
A queue with multiple internal subqueues.
Elements are added into a random subqueue, and retrieval rotates
"""
import random
from collections import deque
from six.moves import queue
try:
import helper_random
except ImportError:
from . import helper_random
class MultiQueue(queue.Queue):
"""A base queue class"""
@ -38,7 +33,7 @@ class MultiQueue(queue.Queue):
# Put a new item in the queue
def _put(self, item):
# self.queue.append(item)
self.queues[helper_random.randomrandrange(self.queueCount)].append(
self.queues[random.randrange(self.queueCount)].append( # nosec B311
(item))
# Get an item from the queue

View File

@ -17,8 +17,8 @@ import state
import connectionpool
from bmconfigparser import config
from highlevelcrypto import randomBytes
from network import dandelion_ins
from queues import invQueue, receiveDataQueue, UISignalQueue
from network import dandelion_ins, invQueue
from queues import receiveDataQueue, UISignalQueue
from tr import _translate
import asyncore_pollchoose as asyncore

View File

@ -5,11 +5,6 @@ import time
from six.moves import queue
try:
from multiqueue import MultiQueue
except ImportError:
from .multiqueue import MultiQueue
class ObjectProcessorQueue(queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`"""
@ -44,8 +39,6 @@ addressGeneratorQueue = queue.Queue()
#: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue()
addrQueue = MultiQueue()
portCheckerQueue = queue.Queue()
receiveDataQueue = queue.Queue()
#: The address generator thread uses this queue to get information back

View File

@ -1,7 +1,7 @@
"""Test cases for multiqueue"""
import unittest
from pybitmessage.multiqueue import MultiQueue
from pybitmessage.network.multiqueue import MultiQueue
class TestMultiQueue(unittest.TestCase):