Moved Multiqueue, invQueue and addrQueue to network module #2256

Merged
anand-skss merged 3 commits from test3 into v0.6 2024-07-06 16:43:51 +02:00
11 changed files with 27 additions and 36 deletions

View File

@ -100,7 +100,7 @@ try:
except ImportError: except ImportError:
connectionpool = None connectionpool = None
from network import stats, StoppableThread from network import stats, StoppableThread, invQueue
from version import softwareVersion from version import softwareVersion
try: # TODO: write tests for XML vulnerabilities try: # TODO: write tests for XML vulnerabilities
@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
logger.info( logger.info(
'Broadcasting inv for msg(API disseminatePreEncryptedMsg' 'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
' command): %s', hexlify(inventoryHash)) ' command): %s', hexlify(inventoryHash))
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
return hexlify(inventoryHash).decode() return hexlify(inventoryHash).decode()
@command('trashSentMessageByAckData') @command('trashSentMessageByAckData')
@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
logger.info( logger.info(
'broadcasting inv within API command disseminatePubkey with' 'broadcasting inv within API command disseminatePubkey with'
' hash: %s', hexlify(inventoryHash)) ' hash: %s', hexlify(inventoryHash))
queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) invQueue.put((pubkeyStreamNumber, inventoryHash))
@command( @command(
'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag') 'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')

View File

@ -30,7 +30,7 @@ from addresses import (
from bmconfigparser import config from bmconfigparser import config
from helper_sql import ( from helper_sql import (
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
from network import knownnodes from network import knownnodes, invQueue
from network.node import Peer from network.node import Peer
from tr import _translate from tr import _translate
@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
state.Inventory[inventoryHash] = ( state.Inventory[inventoryHash] = (
objectType, toStreamNumber, ackPayload, expiresTime, b'') objectType, toStreamNumber, ackPayload, expiresTime, b'')
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
# Display timing data # Display timing data
timeRequiredToAttemptToDecryptMessage = time.time( timeRequiredToAttemptToDecryptMessage = time.time(

View File

@ -28,7 +28,7 @@ import tr
from addresses import decodeAddress, decodeVarint, encodeVarint from addresses import decodeAddress, decodeVarint, encodeVarint
from bmconfigparser import config from bmconfigparser import config
from helper_sql import sqlExecute, sqlQuery from helper_sql import sqlExecute, sqlQuery
from network import knownnodes, StoppableThread from network import knownnodes, StoppableThread, invQueue
from six.moves import configparser, queue from six.moves import configparser, queue
@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'sending inv (within sendOnionPeerObj function) for object: %s', 'sending inv (within sendOnionPeerObj function) for object: %s',
hexlify(inventoryHash)) hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
def sendBroadcast(self): def sendBroadcast(self):
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
' for object: %s', ' for object: %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', ( 'updateSentItemStatusByAckdata', (
@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
'Broadcasting inv for my msg(within sendmsg function): %s', 'Broadcasting inv for my msg(within sendmsg function): %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
# Update the sent message in the sent table with the # Update the sent message in the sent table with the
# necessary information. # necessary information.
@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
state.Inventory[inventoryHash] = ( state.Inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
self.logger.info('sending inv (for the getpubkey message)') self.logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
# wait 10% past expiration # wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1) sleeptill = int(time.time() + TTL * 1.1)

View File

@ -3,15 +3,19 @@ Network subsystem package
""" """
from .dandelion import Dandelion from .dandelion import Dandelion
from .threads import StoppableThread from .threads import StoppableThread
from .multiqueue import MultiQueue
dandelion_ins = Dandelion() dandelion_ins = Dandelion()
# network queues
invQueue = MultiQueue()
addrQueue = MultiQueue()
__all__ = ["StoppableThread"] __all__ = ["StoppableThread"]
def start(config, state): def start(config, state):
"""Start network threads""" """Start network threads"""
import state
from .announcethread import AnnounceThread from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread from .addrthread import AddrThread

View File

@ -7,7 +7,7 @@ from six.moves import queue
# magic imports! # magic imports!
import connectionpool import connectionpool
from protocol import assembleAddrMessage from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue from network import addrQueue # FIXME: init with queue
anand-skss commented 2024-06-23 07:45:28 +02:00 (Migrated from github.com)
Review

Please check Fixme comment, i'm not sure whether i remove this comment or not.

Please check Fixme comment, i'm not sure whether i remove this comment or not.
PeterSurda commented 2024-07-04 05:24:33 +02:00 (Migrated from github.com)
Review

I'm not sure either, let's leave it as it is.

I'm not sure either, let's leave it as it is.
from threads import StoppableThread from threads import StoppableThread

View File

@ -17,7 +17,7 @@ import protocol
import state import state
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue from queues import objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import ( from network.bmobject import (
@ -26,7 +26,7 @@ from network.bmobject import (
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.proxy import ProxyError from network.proxy import ProxyError
from network import dandelion_ins from network import dandelion_ins, invQueue
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects

View File

@ -9,8 +9,7 @@ import addresses
import protocol import protocol
import state import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins, invQueue
from queues import invQueue
from threads import StoppableThread from threads import StoppableThread

View File

@ -2,16 +2,11 @@
A queue with multiple internal subqueues. A queue with multiple internal subqueues.
Elements are added into a random subqueue, and retrieval rotates Elements are added into a random subqueue, and retrieval rotates
""" """
import random
from collections import deque from collections import deque
from six.moves import queue from six.moves import queue
try:
import helper_random
except ImportError:
from . import helper_random
class MultiQueue(queue.Queue): class MultiQueue(queue.Queue):
"""A base queue class""" """A base queue class"""
@ -38,7 +33,7 @@ class MultiQueue(queue.Queue):
# Put a new item in the queue # Put a new item in the queue
def _put(self, item): def _put(self, item):
# self.queue.append(item) # self.queue.append(item)
self.queues[helper_random.randomrandrange(self.queueCount)].append( self.queues[random.randrange(self.queueCount)].append( # nosec B311
(item)) (item))
# Get an item from the queue # Get an item from the queue

View File

@ -17,8 +17,8 @@ import state
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from highlevelcrypto import randomBytes from highlevelcrypto import randomBytes
from network import dandelion_ins from network import dandelion_ins, invQueue
from queues import invQueue, receiveDataQueue, UISignalQueue from queues import receiveDataQueue, UISignalQueue
from tr import _translate from tr import _translate
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore

View File

@ -5,11 +5,6 @@ import time
from six.moves import queue from six.moves import queue
try:
from multiqueue import MultiQueue
except ImportError:
from .multiqueue import MultiQueue
class ObjectProcessorQueue(queue.Queue): class ObjectProcessorQueue(queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`""" """Special queue class using lock for `.threads.objectProcessor`"""
@ -44,8 +39,6 @@ addressGeneratorQueue = queue.Queue()
#: `.network.ReceiveQueueThread` instances dump objects they hear #: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed. #: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue()
addrQueue = MultiQueue()
portCheckerQueue = queue.Queue() portCheckerQueue = queue.Queue()
receiveDataQueue = queue.Queue() receiveDataQueue = queue.Queue()
#: The address generator thread uses this queue to get information back #: The address generator thread uses this queue to get information back

View File

@ -1,7 +1,7 @@
"""Test cases for multiqueue""" """Test cases for multiqueue"""
import unittest import unittest
from pybitmessage.multiqueue import MultiQueue from pybitmessage.network.multiqueue import MultiQueue
class TestMultiQueue(unittest.TestCase): class TestMultiQueue(unittest.TestCase):