Moved Multiqueue, invQueue and addrQueue to network module #2256
|
@ -100,7 +100,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
connectionpool = None
|
connectionpool = None
|
||||||
|
|
||||||
from network import stats, StoppableThread
|
from network import stats, StoppableThread, invQueue
|
||||||
from version import softwareVersion
|
from version import softwareVersion
|
||||||
|
|
||||||
try: # TODO: write tests for XML vulnerabilities
|
try: # TODO: write tests for XML vulnerabilities
|
||||||
|
@ -1346,7 +1346,7 @@ class BMRPCDispatcher(object):
|
||||||
logger.info(
|
logger.info(
|
||||||
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
||||||
' command): %s', hexlify(inventoryHash))
|
' command): %s', hexlify(inventoryHash))
|
||||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
invQueue.put((toStreamNumber, inventoryHash))
|
||||||
return hexlify(inventoryHash).decode()
|
return hexlify(inventoryHash).decode()
|
||||||
|
|
||||||
@command('trashSentMessageByAckData')
|
@command('trashSentMessageByAckData')
|
||||||
|
@ -1401,7 +1401,7 @@ class BMRPCDispatcher(object):
|
||||||
logger.info(
|
logger.info(
|
||||||
'broadcasting inv within API command disseminatePubkey with'
|
'broadcasting inv within API command disseminatePubkey with'
|
||||||
' hash: %s', hexlify(inventoryHash))
|
' hash: %s', hexlify(inventoryHash))
|
||||||
queues.invQueue.put((pubkeyStreamNumber, inventoryHash))
|
invQueue.put((pubkeyStreamNumber, inventoryHash))
|
||||||
|
|
||||||
@command(
|
@command(
|
||||||
'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')
|
'getMessageDataByDestinationHash', 'getMessageDataByDestinationTag')
|
||||||
|
|
|
@ -30,7 +30,7 @@ from addresses import (
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from helper_sql import (
|
from helper_sql import (
|
||||||
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
|
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
|
||||||
from network import knownnodes
|
from network import knownnodes, invQueue
|
||||||
from network.node import Peer
|
from network.node import Peer
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
|
@ -729,7 +729,7 @@ class objectProcessor(threading.Thread):
|
||||||
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
|
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
|
||||||
state.Inventory[inventoryHash] = (
|
state.Inventory[inventoryHash] = (
|
||||||
objectType, toStreamNumber, ackPayload, expiresTime, b'')
|
objectType, toStreamNumber, ackPayload, expiresTime, b'')
|
||||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
invQueue.put((toStreamNumber, inventoryHash))
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
timeRequiredToAttemptToDecryptMessage = time.time(
|
timeRequiredToAttemptToDecryptMessage = time.time(
|
||||||
|
|
|
@ -28,7 +28,7 @@ import tr
|
||||||
from addresses import decodeAddress, decodeVarint, encodeVarint
|
from addresses import decodeAddress, decodeVarint, encodeVarint
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from helper_sql import sqlExecute, sqlQuery
|
from helper_sql import sqlExecute, sqlQuery
|
||||||
from network import knownnodes, StoppableThread
|
from network import knownnodes, StoppableThread, invQueue
|
||||||
from six.moves import configparser, queue
|
from six.moves import configparser, queue
|
||||||
|
|
||||||
|
|
||||||
|
@ -293,7 +293,7 @@ class singleWorker(StoppableThread):
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
||||||
|
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
queues.UISignalQueue.put(('updateStatusBar', ''))
|
queues.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
try:
|
try:
|
||||||
config.set(
|
config.set(
|
||||||
|
@ -381,7 +381,7 @@ class singleWorker(StoppableThread):
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
||||||
|
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
queues.UISignalQueue.put(('updateStatusBar', ''))
|
queues.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
try:
|
try:
|
||||||
config.set(
|
config.set(
|
||||||
|
@ -474,7 +474,7 @@ class singleWorker(StoppableThread):
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
'broadcasting inv with hash: %s', hexlify(inventoryHash))
|
||||||
|
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
queues.UISignalQueue.put(('updateStatusBar', ''))
|
queues.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
try:
|
try:
|
||||||
config.set(
|
config.set(
|
||||||
|
@ -522,7 +522,7 @@ class singleWorker(StoppableThread):
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'sending inv (within sendOnionPeerObj function) for object: %s',
|
'sending inv (within sendOnionPeerObj function) for object: %s',
|
||||||
hexlify(inventoryHash))
|
hexlify(inventoryHash))
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
|
|
||||||
def sendBroadcast(self):
|
def sendBroadcast(self):
|
||||||
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
|
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
|
||||||
|
@ -690,7 +690,7 @@ class singleWorker(StoppableThread):
|
||||||
' for object: %s',
|
' for object: %s',
|
||||||
hexlify(inventoryHash)
|
hexlify(inventoryHash)
|
||||||
)
|
)
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
|
|
||||||
queues.UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'updateSentItemStatusByAckdata', (
|
'updateSentItemStatusByAckdata', (
|
||||||
|
@ -1326,7 +1326,7 @@ class singleWorker(StoppableThread):
|
||||||
'Broadcasting inv for my msg(within sendmsg function): %s',
|
'Broadcasting inv for my msg(within sendmsg function): %s',
|
||||||
hexlify(inventoryHash)
|
hexlify(inventoryHash)
|
||||||
)
|
)
|
||||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
invQueue.put((toStreamNumber, inventoryHash))
|
||||||
|
|
||||||
# Update the sent message in the sent table with the
|
# Update the sent message in the sent table with the
|
||||||
# necessary information.
|
# necessary information.
|
||||||
|
@ -1459,7 +1459,7 @@ class singleWorker(StoppableThread):
|
||||||
state.Inventory[inventoryHash] = (
|
state.Inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime, '')
|
objectType, streamNumber, payload, embeddedTime, '')
|
||||||
self.logger.info('sending inv (for the getpubkey message)')
|
self.logger.info('sending inv (for the getpubkey message)')
|
||||||
queues.invQueue.put((streamNumber, inventoryHash))
|
invQueue.put((streamNumber, inventoryHash))
|
||||||
|
|
||||||
# wait 10% past expiration
|
# wait 10% past expiration
|
||||||
sleeptill = int(time.time() + TTL * 1.1)
|
sleeptill = int(time.time() + TTL * 1.1)
|
||||||
|
|
|
@ -3,15 +3,19 @@ Network subsystem package
|
||||||
"""
|
"""
|
||||||
from .dandelion import Dandelion
|
from .dandelion import Dandelion
|
||||||
from .threads import StoppableThread
|
from .threads import StoppableThread
|
||||||
|
from .multiqueue import MultiQueue
|
||||||
|
|
||||||
dandelion_ins = Dandelion()
|
dandelion_ins = Dandelion()
|
||||||
|
|
||||||
|
# network queues
|
||||||
|
invQueue = MultiQueue()
|
||||||
|
addrQueue = MultiQueue()
|
||||||
|
|
||||||
__all__ = ["StoppableThread"]
|
__all__ = ["StoppableThread"]
|
||||||
|
|
||||||
|
|
||||||
def start(config, state):
|
def start(config, state):
|
||||||
"""Start network threads"""
|
"""Start network threads"""
|
||||||
import state
|
|
||||||
from .announcethread import AnnounceThread
|
from .announcethread import AnnounceThread
|
||||||
import connectionpool # pylint: disable=relative-import
|
import connectionpool # pylint: disable=relative-import
|
||||||
from .addrthread import AddrThread
|
from .addrthread import AddrThread
|
||||||
|
|
|
@ -7,7 +7,7 @@ from six.moves import queue
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from protocol import assembleAddrMessage
|
from protocol import assembleAddrMessage
|
||||||
from queues import addrQueue # FIXME: init with queue
|
from network import addrQueue # FIXME: init with queue
|
||||||
|
|||||||
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ import protocol
|
||||||
import state
|
import state
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
from queues import objectProcessorQueue, portCheckerQueue
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
from network.advanceddispatcher import AdvancedDispatcher
|
from network.advanceddispatcher import AdvancedDispatcher
|
||||||
from network.bmobject import (
|
from network.bmobject import (
|
||||||
|
@ -26,7 +26,7 @@ from network.bmobject import (
|
||||||
BMObjectUnwantedStreamError
|
BMObjectUnwantedStreamError
|
||||||
)
|
)
|
||||||
from network.proxy import ProxyError
|
from network.proxy import ProxyError
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins, invQueue
|
||||||
from node import Node, Peer
|
from node import Node, Peer
|
||||||
from objectracker import ObjectTracker, missingObjects
|
from objectracker import ObjectTracker, missingObjects
|
||||||
|
|
||||||
|
|
|
@ -9,8 +9,7 @@ import addresses
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins, invQueue
|
||||||
from queues import invQueue
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,16 +2,11 @@
|
||||||
A queue with multiple internal subqueues.
|
A queue with multiple internal subqueues.
|
||||||
Elements are added into a random subqueue, and retrieval rotates
|
Elements are added into a random subqueue, and retrieval rotates
|
||||||
"""
|
"""
|
||||||
|
import random
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from six.moves import queue
|
from six.moves import queue
|
||||||
|
|
||||||
try:
|
|
||||||
import helper_random
|
|
||||||
except ImportError:
|
|
||||||
from . import helper_random
|
|
||||||
|
|
||||||
|
|
||||||
class MultiQueue(queue.Queue):
|
class MultiQueue(queue.Queue):
|
||||||
"""A base queue class"""
|
"""A base queue class"""
|
||||||
|
@ -38,7 +33,7 @@ class MultiQueue(queue.Queue):
|
||||||
# Put a new item in the queue
|
# Put a new item in the queue
|
||||||
def _put(self, item):
|
def _put(self, item):
|
||||||
# self.queue.append(item)
|
# self.queue.append(item)
|
||||||
self.queues[helper_random.randomrandrange(self.queueCount)].append(
|
self.queues[random.randrange(self.queueCount)].append( # nosec B311
|
||||||
(item))
|
(item))
|
||||||
|
|
||||||
# Get an item from the queue
|
# Get an item from the queue
|
|
@ -17,8 +17,8 @@ import state
|
||||||
import connectionpool
|
import connectionpool
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from highlevelcrypto import randomBytes
|
from highlevelcrypto import randomBytes
|
||||||
from network import dandelion_ins
|
from network import dandelion_ins, invQueue
|
||||||
from queues import invQueue, receiveDataQueue, UISignalQueue
|
from queues import receiveDataQueue, UISignalQueue
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
|
|
|
@ -5,11 +5,6 @@ import time
|
||||||
|
|
||||||
from six.moves import queue
|
from six.moves import queue
|
||||||
|
|
||||||
try:
|
|
||||||
from multiqueue import MultiQueue
|
|
||||||
except ImportError:
|
|
||||||
from .multiqueue import MultiQueue
|
|
||||||
|
|
||||||
|
|
||||||
class ObjectProcessorQueue(queue.Queue):
|
class ObjectProcessorQueue(queue.Queue):
|
||||||
"""Special queue class using lock for `.threads.objectProcessor`"""
|
"""Special queue class using lock for `.threads.objectProcessor`"""
|
||||||
|
@ -44,8 +39,6 @@ addressGeneratorQueue = queue.Queue()
|
||||||
#: `.network.ReceiveQueueThread` instances dump objects they hear
|
#: `.network.ReceiveQueueThread` instances dump objects they hear
|
||||||
#: on the network into this queue to be processed.
|
#: on the network into this queue to be processed.
|
||||||
objectProcessorQueue = ObjectProcessorQueue()
|
objectProcessorQueue = ObjectProcessorQueue()
|
||||||
invQueue = MultiQueue()
|
|
||||||
addrQueue = MultiQueue()
|
|
||||||
portCheckerQueue = queue.Queue()
|
portCheckerQueue = queue.Queue()
|
||||||
receiveDataQueue = queue.Queue()
|
receiveDataQueue = queue.Queue()
|
||||||
#: The address generator thread uses this queue to get information back
|
#: The address generator thread uses this queue to get information back
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
"""Test cases for multiqueue"""
|
"""Test cases for multiqueue"""
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from pybitmessage.multiqueue import MultiQueue
|
from pybitmessage.network.multiqueue import MultiQueue
|
||||||
|
|
||||||
|
|
||||||
class TestMultiQueue(unittest.TestCase):
|
class TestMultiQueue(unittest.TestCase):
|
||||||
|
|
Reference in New Issue
Block a user
Please check Fixme comment, i'm not sure whether i remove this comment or not.
I'm not sure either, let's leave it as it is.