From f2bb4d1fd12b0f8faab05be580aab3bc81a34d5e Mon Sep 17 00:00:00 2001 From: anand k Date: Mon, 17 Jun 2024 23:15:11 +0530 Subject: [PATCH 1/2] Refactor parent package imports in network module --- src/network/__init__.py | 10 +++++++++- src/network/addrthread.py | 8 ++++---- src/network/advanceddispatcher.py | 2 +- src/network/announcethread.py | 2 +- src/network/bmobject.py | 3 +-- src/network/bmproto.py | 13 ++++--------- src/network/connectionchooser.py | 11 ++++------- src/network/connectionpool.py | 4 +--- src/network/downloadthread.py | 5 +---- src/network/invthread.py | 13 +++++-------- src/network/knownnodes.py | 3 +-- src/network/networkthread.py | 4 ++-- src/network/proxy.py | 2 +- src/network/receivequeuethread.py | 8 ++++---- src/network/tcp.py | 23 +++++++++-------------- src/network/tls.py | 4 ++-- src/network/udp.py | 6 ++---- src/network/uploadthread.py | 3 +-- src/networkdepsinterface.py | 15 +++++++++++++++ 19 files changed, 68 insertions(+), 71 deletions(-) create mode 100644 src/networkdepsinterface.py diff --git a/src/network/__init__.py b/src/network/__init__.py index 42e9d035..c3d7a21d 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -1,9 +1,18 @@ """ Network subsystem package """ +try: + import networkdepsinterface +except ImportError: + from pybitmessage import networkdepsinterface + from .dandelion import Dandelion from .threads import StoppableThread +( + state, queues, config, protocol, + randomtrackingdict, addresses, paths) = networkdepsinterface.importParentPackageDepsToNetwork() + dandelion_ins = Dandelion() __all__ = ["StoppableThread"] @@ -11,7 +20,6 @@ __all__ = ["StoppableThread"] def start(config, state): """Start network threads""" - import state from .announcethread import AnnounceThread import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread diff --git a/src/network/addrthread.py b/src/network/addrthread.py index a0e869e3..489401cc 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -7,7 +7,7 @@ from six.moves import queue import connectionpool from helper_random import randomshuffle from protocol import assembleAddrMessage -from queues import addrQueue # FIXME: init with queue +from network import queues # FIXME: init with queue from threads import StoppableThread @@ -21,7 +21,7 @@ class AddrThread(StoppableThread): chunk = [] while True: try: - data = addrQueue.get(False) + data = queues.addrQueue.get(False) chunk.append(data) except queue.Empty: break @@ -43,7 +43,7 @@ class AddrThread(StoppableThread): if filtered: i.append_write_buf(assembleAddrMessage(filtered)) - addrQueue.iterate() + queues.addrQueue.iterate() for i in range(len(chunk)): - addrQueue.task_done() + queues.addrQueue.task_done() self.stop.wait(1) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 49f0d19d..ddd7dd0c 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -6,7 +6,7 @@ import threading import time import network.asyncore_pollchoose as asyncore -import state +from network import state from threads import BusyError, nonBlocking diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 7cb35e77..1ef1c87f 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -5,7 +5,7 @@ import time # magic imports! import connectionpool -from bmconfigparser import config +from network import config from protocol import assembleAddrMessage from node import Peer diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 83311b9b..82c87d83 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -4,8 +4,7 @@ BMObject and it's exceptions. import logging import time -import protocol -import state +from network import state, protocol import connectionpool from network import dandelion_ins from highlevelcrypto import calculateInventoryHash diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 797dab5e..27d4346a 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -11,13 +11,9 @@ import struct import time # magic imports! -import addresses import knownnodes -import protocol -import state +from network import protocol, state, config, queues, addresses, dandelion_ins import connectionpool -from bmconfigparser import config -from queues import invQueue, objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict from network.advanceddispatcher import AdvancedDispatcher from network.bmobject import ( @@ -26,7 +22,6 @@ from network.bmobject import ( BMObjectUnwantedStreamError ) from network.proxy import ProxyError -from network import dandelion_ins from node import Node, Peer from objectracker import ObjectTracker, missingObjects @@ -409,7 +404,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): try: self.object.checkObjectByType() - objectProcessorQueue.put(( + queues.objectProcessorQueue.put(( self.object.objectType, buffer(self.object.data))) # noqa: F821 except BMObjectInvalidError: BMProto.stopDownloadingObject(self.object.inventoryHash, True) @@ -431,7 +426,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): ) self.handleReceivedObject( self.object.streamNumber, self.object.inventoryHash) - invQueue.put(( + queues.invQueue.put(( self.object.streamNumber, self.object.inventoryHash, self.destination)) return True @@ -472,7 +467,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def bm_command_portcheck(self): """Incoming port check request, queue it.""" - portCheckerQueue.put(Peer(self.destination, self.peerNode.port)) + queues.portCheckerQueue.put(Peer(self.destination, self.peerNode.port)) return True def bm_command_ping(self): diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index d7062d24..186831ab 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -6,10 +6,7 @@ import logging import random import knownnodes -import protocol -import state -from bmconfigparser import config -from queues import queue, portCheckerQueue +from network import protocol, state, config, queues logger = logging.getLogger('default') @@ -34,10 +31,10 @@ def chooseConnection(stream): onionOnly = config.safeGetBoolean( "bitmessagesettings", "onionservicesonly") try: - retval = portCheckerQueue.get(False) - portCheckerQueue.task_done() + retval = queues.portCheckerQueue.get(False) + queues.portCheckerQueue.task_done() return retval - except queue.Empty: + except queues.queue.Empty: pass # with a probability of 0.5, connect to a discovered peer if random.choice((False, True)) and not haveOnion: # nosec B311 diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 36c91c18..c8cf04a2 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -11,9 +11,7 @@ import time import asyncore_pollchoose as asyncore import helper_random import knownnodes -import protocol -import state -from bmconfigparser import config +from network import protocol, state, config from connectionchooser import chooseConnection from node import Peer from proxy import Proxy diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 30a3f2fe..d8cc303e 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -2,12 +2,9 @@ `DownloadThread` class definition """ import time -import state -import addresses +from network import state, protocol, addresses, dandelion_ins import helper_random -import protocol import connectionpool -from network import dandelion_ins from objectracker import missingObjects from threads import StoppableThread diff --git a/src/network/invthread.py b/src/network/invthread.py index 0b79710a..02a5e48f 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -5,12 +5,9 @@ import Queue import random from time import time -import addresses -import protocol -import state +from network import protocol, state, queues, addresses import connectionpool from network import dandelion_ins -from queues import invQueue from threads import StoppableThread @@ -52,9 +49,9 @@ class InvThread(StoppableThread): chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(dandelion_ins.expire(invQueue)) + handleExpiredDandelion(dandelion_ins.expire(queues.invQueue)) try: - data = invQueue.get(False) + data = queues.invQueue.get(False) chunk.append((data[0], data[1])) # locally generated if len(data) == 2 or data[2] is None: @@ -101,9 +98,9 @@ class InvThread(StoppableThread): addresses.encodeVarint( len(stems)) + ''.join(stems))) - invQueue.iterate() + queues.invQueue.iterate() for _ in range(len(chunk)): - invQueue.task_done() + queues.invQueue.task_done() dandelion_ins.reRandomiseStems() diff --git a/src/network/knownnodes.py b/src/network/knownnodes.py index c53be2cd..85a0f49c 100644 --- a/src/network/knownnodes.py +++ b/src/network/knownnodes.py @@ -15,8 +15,7 @@ try: except ImportError: from collections import Iterable -import state -from bmconfigparser import config +from network import state, config from network.node import Peer state.Peer = Peer diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 640d47a1..1fe9ed77 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -3,7 +3,7 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore import connectionpool -from queues import excQueue +from network import queues from threads import StoppableThread @@ -16,7 +16,7 @@ class BMNetworkThread(StoppableThread): while not self._stopped: connectionpool.pool.loop() except Exception as e: - excQueue.put((self.name, e)) + queues.excQueue.put((self.name, e)) raise def stopThread(self): diff --git a/src/network/proxy.py b/src/network/proxy.py index ed1af127..61ed6091 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -8,7 +8,7 @@ import time import asyncore_pollchoose as asyncore from advanceddispatcher import AdvancedDispatcher -from bmconfigparser import config +from network import config from node import Peer logger = logging.getLogger('default') diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 10f2acea..a6b38bd5 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -7,7 +7,7 @@ import socket import connectionpool from network.advanceddispatcher import UnknownStateError -from queues import receiveDataQueue +from network import queues from threads import StoppableThread @@ -20,7 +20,7 @@ class ReceiveQueueThread(StoppableThread): def run(self): while not self._stopped: try: - dest = receiveDataQueue.get(block=True, timeout=1) + dest = queues.receiveDataQueue.get(block=True, timeout=1) except Queue.Empty: continue @@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread): connection = connectionpool.pool.getConnectionByAddr(dest) # connection object not found except KeyError: - receiveDataQueue.task_done() + queues.receiveDataQueue.task_done() continue try: connection.process() @@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread): self.logger.error('Socket error: %s', err) except: # noqa:E722 self.logger.error('Error processing', exc_info=True) - receiveDataQueue.task_done() + queues.receiveDataQueue.task_done() diff --git a/src/network/tcp.py b/src/network/tcp.py index a739e256..50da4c9e 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -10,16 +10,11 @@ import socket import time # magic imports! -import addresses import helper_random import l10n -import protocol -import state +from network import protocol, state, config, queues, addresses, dandelion_ins import connectionpool -from bmconfigparser import config from highlevelcrypto import randomBytes -from network import dandelion_ins -from queues import invQueue, receiveDataQueue, UISignalQueue from tr import _translate import asyncore_pollchoose as asyncore @@ -109,7 +104,7 @@ class TCPConnection(BMProto, TLSDispatcher): max_known_nodes = max( len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) delay = math.ceil(math.log(max_known_nodes + 2, 20)) * ( - 0.2 + invQueue.queueCount / 2.0) + 0.2 + queues.invQueue.queueCount / 2.0) # take the stream with maximum amount of nodes # +2 is to avoid problems with log(0) and log(1) # 20 is avg connected nodes count @@ -135,7 +130,7 @@ class TCPConnection(BMProto, TLSDispatcher): if BMProto.timeOffsetWrongCount > \ maximumTimeOffsetWrongCount and \ not self.fullyEstablished: - UISignalQueue.put(( + queues.UISignalQueue.put(( 'updateStatusBar', _translate( "MainWindow", @@ -158,8 +153,8 @@ class TCPConnection(BMProto, TLSDispatcher): """Initiate inventory synchronisation.""" if not self.isOutbound and not self.local: state.clientHasReceivedIncomingConnections = True - UISignalQueue.put(('setStatusIcon', 'green')) - UISignalQueue.put(( + queues.UISignalQueue.put(('setStatusIcon', 'green')) + queues.UISignalQueue.put(( 'updateNetworkStatusTab', (self.isOutbound, True, self.destination) )) self.antiIntersectionDelay(True) @@ -169,7 +164,7 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.increaseRating(self.destination) knownnodes.addKnownNode( self.streams, self.destination, time.time()) - dandelion_ins.maybeAddStem(self, invQueue) + dandelion_ins.maybeAddStem(self, queues.invQueue) self.sendAddr() self.sendBigInv() @@ -271,12 +266,12 @@ class TCPConnection(BMProto, TLSDispatcher): connectionpool.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.connectedAt = time.time() - receiveDataQueue.put(self.destination) + queues.receiveDataQueue.put(self.destination) def handle_read(self): """Callback for reading from a socket""" TLSDispatcher.handle_read(self) - receiveDataQueue.put(self.destination) + queues.receiveDataQueue.put(self.destination) def handle_write(self): """Callback for writing to a socket""" @@ -286,7 +281,7 @@ class TCPConnection(BMProto, TLSDispatcher): """Callback for connection being closed.""" host_is_global = self.isOutbound or not self.local and not state.socksIP if self.fullyEstablished: - UISignalQueue.put(( + queues.UISignalQueue.put(( 'updateNetworkStatusTab', (self.isOutbound, False, self.destination) )) diff --git a/src/network/tls.py b/src/network/tls.py index a3774b44..6a030c53 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -10,7 +10,7 @@ import sys import network.asyncore_pollchoose as asyncore import paths from network.advanceddispatcher import AdvancedDispatcher -from queues import receiveDataQueue +from network import queues logger = logging.getLogger('default') @@ -216,5 +216,5 @@ class TLSDispatcher(AdvancedDispatcher): self.bm_proto_reset() self.set_state("connection_fully_established") - receiveDataQueue.put(self.destination) + queues.receiveDataQueue.put(self.destination) return False diff --git a/src/network/udp.py b/src/network/udp.py index b16146f9..60c401b7 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -6,10 +6,8 @@ import socket import time # magic imports! -import protocol -import state +from network import protocol, state, queues import connectionpool -from queues import receiveDataQueue from bmproto import BMProto from node import Peer @@ -138,7 +136,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes # self.local works correctly self.read_buf[0:] = recdata self.bm_proto_reset() - receiveDataQueue.put(self.listening) + queues.receiveDataQueue.put(self.listening) def handle_write(self): try: diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index e91f08fa..6f67cc64 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -4,8 +4,7 @@ import time import helper_random -import protocol -import state +from network import protocol, state import connectionpool from randomtrackingdict import RandomTrackingDict from network import dandelion_ins diff --git a/src/networkdepsinterface.py b/src/networkdepsinterface.py new file mode 100644 index 00000000..d26dd341 --- /dev/null +++ b/src/networkdepsinterface.py @@ -0,0 +1,15 @@ + +import state +import queues +import protocol +import paths +import randomtrackingdict +import addresses +from bmconfigparser import config + + +def importParentPackageDepsToNetwork(): + """ + Exports parent package dependencies to the network. + """ + return (state, queues, config, protocol, randomtrackingdict, addresses, paths) -- 2.45.1 From 2b93736a33e694d9670864adf1832ac48d167d02 Mon Sep 17 00:00:00 2001 From: anand k Date: Thu, 20 Jun 2024 07:55:26 +0530 Subject: [PATCH 2/2] Used dependency injection in network threads --- src/bitmessageqt/settings.py | 2 +- src/network/__init__.py | 9 +++++---- src/network/addrthread.py | 15 +++++++++------ src/network/announcethread.py | 9 ++++++--- src/network/downloadthread.py | 13 ++++++++----- src/network/invthread.py | 28 +++++++++++++++++----------- src/network/networkthread.py | 7 +++++-- src/network/receivequeuethread.py | 10 +++++----- src/network/uploadthread.py | 10 +++++++--- src/tests/test_network.py | 2 +- 10 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/bitmessageqt/settings.py b/src/bitmessageqt/settings.py index eeb507c7..c9fb21cd 100644 --- a/src/bitmessageqt/settings.py +++ b/src/bitmessageqt/settings.py @@ -414,7 +414,7 @@ class SettingsDialog(QtGui.QDialog): 'bitmessagesettings', 'udp'): self.config.set('bitmessagesettings', 'udp', str(udp_enabled)) if udp_enabled: - announceThread = AnnounceThread() + announceThread = AnnounceThread(self.config) announceThread.daemon = True announceThread.start() else: diff --git a/src/network/__init__.py b/src/network/__init__.py index c3d7a21d..730a9579 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -38,18 +38,19 @@ def start(config, state): readKnownNodes() connectionpool.pool.connectToStream(1) for thread in ( - BMNetworkThread(), InvThread(), AddrThread(), - DownloadThread(), UploadThread() + BMNetworkThread(queues), InvThread(protocol, state, queues, addresses), + AddrThread(protocol, queues), DownloadThread(state, protocol, addresses), + UploadThread(protocol, state) ): thread.daemon = True thread.start() # Optional components for i in range(config.getint('threads', 'receive')): - thread = ReceiveQueueThread(i) + thread = ReceiveQueueThread(queues, i) thread.daemon = True thread.start() if config.safeGetBoolean('bitmessagesettings', 'udp'): - state.announceThread = AnnounceThread() + state.announceThread = AnnounceThread(config) state.announceThread.daemon = True state.announceThread.start() diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 489401cc..132223c0 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -6,8 +6,6 @@ from six.moves import queue # magic imports! import connectionpool from helper_random import randomshuffle -from protocol import assembleAddrMessage -from network import queues # FIXME: init with queue from threads import StoppableThread @@ -16,12 +14,17 @@ class AddrThread(StoppableThread): """(Node) address broadcasting thread""" name = "AddrBroadcaster" + def __init__(self, protocol, queues): + self.protocol = protocol + self.queues = queues + StoppableThread.__init__(self) + def run(self): while not self._stopped: chunk = [] while True: try: - data = queues.addrQueue.get(False) + data = self.queues.addrQueue.get(False) chunk.append(data) except queue.Empty: break @@ -41,9 +44,9 @@ class AddrThread(StoppableThread): continue filtered.append((stream, peer, seen)) if filtered: - i.append_write_buf(assembleAddrMessage(filtered)) + i.append_write_buf(self.protocol.assembleAddrMessage(filtered)) - queues.addrQueue.iterate() + self.queues.addrQueue.iterate() for i in range(len(chunk)): - queues.addrQueue.task_done() + self.queues.addrQueue.task_done() self.stop.wait(1) diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 1ef1c87f..cb26165e 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -5,7 +5,6 @@ import time # magic imports! import connectionpool -from network import config from protocol import assembleAddrMessage from node import Peer @@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread): name = "Announcer" announceInterval = 60 + def __init__(self, config): + self.config = config + StoppableThread.__init__(self) + def run(self): lastSelfAnnounced = 0 while not self._stopped: processed = 0 if lastSelfAnnounced < time.time() - self.announceInterval: - self.announceSelf() + self.announceSelf(self.config) lastSelfAnnounced = time.time() if processed == 0: self.stop.wait(10) @staticmethod - def announceSelf(): + def announceSelf(config): """Announce our presence""" for connection in connectionpool.pool.udpSockets.values(): if not connection.announcing: diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index d8cc303e..54b238fe 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -2,7 +2,7 @@ `DownloadThread` class definition """ import time -from network import state, protocol, addresses, dandelion_ins +from network import dandelion_ins import helper_random import connectionpool from objectracker import missingObjects @@ -17,8 +17,11 @@ class DownloadThread(StoppableThread): cleanInterval = 60 requestExpires = 3600 - def __init__(self): + def __init__(self, state, protocol, addresses): super(DownloadThread, self).__init__(name="Downloader") + self.state = state + self.protocol = protocol + self.addresses = addresses self.lastCleaned = time.time() def cleanPending(self): @@ -57,7 +60,7 @@ class DownloadThread(StoppableThread): payload = bytearray() chunkCount = 0 for chunk in request: - if chunk in state.Inventory and not dandelion_ins.hasHash(chunk): + if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk): try: del i.objectsNewToMe[chunk] except KeyError: @@ -68,8 +71,8 @@ class DownloadThread(StoppableThread): missingObjects[chunk] = now if not chunkCount: continue - payload[0:0] = addresses.encodeVarint(chunkCount) - i.append_write_buf(protocol.CreatePacket('getdata', payload)) + payload[0:0] = self.addresses.encodeVarint(chunkCount) + i.append_write_buf(self.protocol.CreatePacket('getdata', payload)) self.logger.debug( '%s:%i Requesting %i objects', i.destination.host, i.destination.port, chunkCount) diff --git a/src/network/invthread.py b/src/network/invthread.py index 02a5e48f..7c9fdcf4 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -5,7 +5,6 @@ import Queue import random from time import time -from network import protocol, state, queues, addresses import connectionpool from network import dandelion_ins from threads import StoppableThread @@ -34,6 +33,13 @@ class InvThread(StoppableThread): name = "InvBroadcaster" + def __init__(self, protocol, state, queues, addresses): + self.protocol = protocol + self.state = state + self.queues = queues + self.addresses = addresses + StoppableThread.__init__(self) + @staticmethod def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" @@ -45,13 +51,13 @@ class InvThread(StoppableThread): connection.objectsNewToThem[hashId] = time() def run(self): # pylint: disable=too-many-branches - while not state.shutdown: # pylint: disable=too-many-nested-blocks + while not self.state.shutdown: # pylint: disable=too-many-nested-blocks chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(dandelion_ins.expire(queues.invQueue)) + handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue)) try: - data = queues.invQueue.get(False) + data = self.queues.invQueue.get(False) chunk.append((data[0], data[1])) # locally generated if len(data) == 2 or data[2] is None: @@ -78,7 +84,7 @@ class InvThread(StoppableThread): if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311 fluffs.append(inv[1]) # send a dinv only if the stem node supports dandelion - elif connection.services & protocol.NODE_DANDELION > 0: + elif connection.services & self.protocol.NODE_DANDELION > 0: stems.append(inv[1]) else: fluffs.append(inv[1]) @@ -87,20 +93,20 @@ class InvThread(StoppableThread): if fluffs: random.shuffle(fluffs) - connection.append_write_buf(protocol.CreatePacket( + connection.append_write_buf(self.protocol.CreatePacket( 'inv', - addresses.encodeVarint( + self.addresses.encodeVarint( len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) - connection.append_write_buf(protocol.CreatePacket( + connection.append_write_buf(self.protocol.CreatePacket( 'dinv', - addresses.encodeVarint( + self.addresses.encodeVarint( len(stems)) + ''.join(stems))) - queues.invQueue.iterate() + self.queues.invQueue.iterate() for _ in range(len(chunk)): - queues.invQueue.task_done() + self.queues.invQueue.task_done() dandelion_ins.reRandomiseStems() diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 1fe9ed77..ea1ca264 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -3,7 +3,6 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore import connectionpool -from network import queues from threads import StoppableThread @@ -11,12 +10,16 @@ class BMNetworkThread(StoppableThread): """Main network thread""" name = "Asyncore" + def __init__(self, queues): + self.queues = queues + StoppableThread.__init__(self) + def run(self): try: while not self._stopped: connectionpool.pool.loop() except Exception as e: - queues.excQueue.put((self.name, e)) + self.queues.excQueue.put((self.name, e)) raise def stopThread(self): diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index a6b38bd5..49a9dccd 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -7,20 +7,20 @@ import socket import connectionpool from network.advanceddispatcher import UnknownStateError -from network import queues from threads import StoppableThread class ReceiveQueueThread(StoppableThread): """This thread processes data received from the network (which is done by the asyncore thread)""" - def __init__(self, num=0): + def __init__(self, queues, num=0): + self.queues = queues super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) def run(self): while not self._stopped: try: - dest = queues.receiveDataQueue.get(block=True, timeout=1) + dest = self.queues.receiveDataQueue.get(block=True, timeout=1) except Queue.Empty: continue @@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread): connection = connectionpool.pool.getConnectionByAddr(dest) # connection object not found except KeyError: - queues.receiveDataQueue.task_done() + self.queues.receiveDataQueue.task_done() continue try: connection.process() @@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread): self.logger.error('Socket error: %s', err) except: # noqa:E722 self.logger.error('Error processing', exc_info=True) - queues.receiveDataQueue.task_done() + self.queues.receiveDataQueue.task_done() diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 6f67cc64..c309ec7d 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -4,7 +4,6 @@ import time import helper_random -from network import protocol, state import connectionpool from randomtrackingdict import RandomTrackingDict from network import dandelion_ins @@ -18,6 +17,11 @@ class UploadThread(StoppableThread): maxBufSize = 2097152 # 2MB name = "Uploader" + def __init__(self, protocol, state): + self.protocol = protocol + self.state = state + StoppableThread.__init__(self) + def run(self): while not self._stopped: uploaded = 0 @@ -48,8 +52,8 @@ class UploadThread(StoppableThread): i.destination) break try: - payload.extend(protocol.CreatePacket( - 'object', state.Inventory[chunk].payload)) + payload.extend(self.protocol.CreatePacket( + 'object', self.state.Inventory[chunk].payload)) chunk_count += 1 except KeyError: i.antiIntersectionDelay() diff --git a/src/tests/test_network.py b/src/tests/test_network.py index 206117e0..f9a4cfe0 100644 --- a/src/tests/test_network.py +++ b/src/tests/test_network.py @@ -74,7 +74,7 @@ class TestNetwork(TestPartialRun): for _ in range(10): try: - self.state.announceThread.announceSelf() + self.state.announceThread.announceSelf(self.config) except AttributeError: self.fail('state.announceThread is not set properly') time.sleep(1) -- 2.45.1