Moved invQueue to network module

This commit is contained in:
anand k 2024-06-23 10:53:48 +05:30
parent 6578b5b925
commit 5cd4ecb437
No known key found for this signature in database
GPG Key ID: 515AC24FA525DDE0
8 changed files with 22 additions and 21 deletions

View File

@ -100,7 +100,7 @@ try:
except ImportError: except ImportError:
connectionpool = None connectionpool = None
from network import stats, StoppableThread from network import stats, StoppableThread, invQueue
from version import softwareVersion from version import softwareVersion
try: # TODO: write tests for XML vulnerabilities try: # TODO: write tests for XML vulnerabilities
@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
logger.info( logger.info(
'Broadcasting inv for msg(API disseminatePreEncryptedMsg' 'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
' command): %s', hexlify(inventoryHash)) ' command): %s', hexlify(inventoryHash))
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
return hexlify(inventoryHash).decode() return hexlify(inventoryHash).decode()
@command('trashSentMessageByAckData') @command('trashSentMessageByAckData')
@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
logger.info( logger.info(
'broadcasting inv within API command disseminatePubkey with' 'broadcasting inv within API command disseminatePubkey with'
' hash: %s', hexlify(inventoryHash)) ' hash: %s', hexlify(inventoryHash))
queues.invQueue.put((pubkeyStreamNumber, inventoryHash)) invQueue.put((pubkeyStreamNumber, inventoryHash))
@command( @command(
'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag') 'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')

View File

@ -30,7 +30,7 @@ from addresses import (
from bmconfigparser import config from bmconfigparser import config
from helper_sql import ( from helper_sql import (
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery) sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
from network import knownnodes from network import knownnodes, invQueue
from network.node import Peer from network.node import Peer
from tr import _translate from tr import _translate
@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload) inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
state.Inventory[inventoryHash] = ( state.Inventory[inventoryHash] = (
objectType, toStreamNumber, ackPayload, expiresTime, b'') objectType, toStreamNumber, ackPayload, expiresTime, b'')
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
# Display timing data # Display timing data
timeRequiredToAttemptToDecryptMessage = time.time( timeRequiredToAttemptToDecryptMessage = time.time(

View File

@ -28,7 +28,7 @@ import tr
from addresses import decodeAddress, decodeVarint, encodeVarint from addresses import decodeAddress, decodeVarint, encodeVarint
from bmconfigparser import config from bmconfigparser import config
from helper_sql import sqlExecute, sqlQuery from helper_sql import sqlExecute, sqlQuery
from network import knownnodes, StoppableThread from network import knownnodes, StoppableThread, invQueue
from six.moves import configparser, queue from six.moves import configparser, queue
@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
config.set( config.set(
@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
self.logger.info( self.logger.info(
'sending inv (within sendOnionPeerObj function) for object: %s', 'sending inv (within sendOnionPeerObj function) for object: %s',
hexlify(inventoryHash)) hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
def sendBroadcast(self): def sendBroadcast(self):
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
' for object: %s', ' for object: %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', ( 'updateSentItemStatusByAckdata', (
@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
'Broadcasting inv for my msg(within sendmsg function): %s', 'Broadcasting inv for my msg(within sendmsg function): %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((toStreamNumber, inventoryHash)) invQueue.put((toStreamNumber, inventoryHash))
# Update the sent message in the sent table with the # Update the sent message in the sent table with the
# necessary information. # necessary information.
@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
state.Inventory[inventoryHash] = ( state.Inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
self.logger.info('sending inv (for the getpubkey message)') self.logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash)) invQueue.put((streamNumber, inventoryHash))
# wait 10% past expiration # wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1) sleeptill = int(time.time() + TTL * 1.1)

View File

@ -3,15 +3,18 @@ Network subsystem package
""" """
from .dandelion import Dandelion from .dandelion import Dandelion
from .threads import StoppableThread from .threads import StoppableThread
from .multiqueue import MultiQueue
dandelion_ins = Dandelion() dandelion_ins = Dandelion()
# network queues
invQueue = MultiQueue()
__all__ = ["StoppableThread"] __all__ = ["StoppableThread"]
def start(config, state): def start(config, state):
"""Start network threads""" """Start network threads"""
import state
from .announcethread import AnnounceThread from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread from .addrthread import AddrThread

View File

@ -17,7 +17,7 @@ import protocol
import state import state
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue from queues import objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import ( from network.bmobject import (
@ -26,7 +26,7 @@ from network.bmobject import (
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.proxy import ProxyError from network.proxy import ProxyError
from network import dandelion_ins from network import dandelion_ins, invQueue
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects

View File

@ -9,8 +9,7 @@ import addresses
import protocol import protocol
import state import state
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins, invQueue
from queues import invQueue
from threads import StoppableThread from threads import StoppableThread

View File

@ -17,8 +17,8 @@ import state
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from highlevelcrypto import randomBytes from highlevelcrypto import randomBytes
from network import dandelion_ins from network import dandelion_ins, invQueue
from queues import invQueue, receiveDataQueue, UISignalQueue from queues import receiveDataQueue, UISignalQueue
from tr import _translate from tr import _translate
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore

View File

@ -43,7 +43,6 @@ addressGeneratorQueue = queue.Queue()
#: `.network.ReceiveQueueThread` instances dump objects they hear #: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed. #: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue()
addrQueue = MultiQueue() addrQueue = MultiQueue()
portCheckerQueue = queue.Queue() portCheckerQueue = queue.Queue()
receiveDataQueue = queue.Queue() receiveDataQueue = queue.Queue()