Used dependency injection in network threads

This commit is contained in:
anand k 2024-06-20 07:55:26 +05:30
parent f2bb4d1fd1
commit bb6fc18ec4
No known key found for this signature in database
GPG Key ID: 515AC24FA525DDE0
10 changed files with 63 additions and 40 deletions

View File

@ -414,7 +414,7 @@ class SettingsDialog(QtGui.QDialog):
'bitmessagesettings', 'udp'): 'bitmessagesettings', 'udp'):
self.config.set('bitmessagesettings', 'udp', str(udp_enabled)) self.config.set('bitmessagesettings', 'udp', str(udp_enabled))
if udp_enabled: if udp_enabled:
announceThread = AnnounceThread() announceThread = AnnounceThread(self.config)
announceThread.daemon = True announceThread.daemon = True
announceThread.start() announceThread.start()
else: else:

View File

@ -38,15 +38,16 @@ def start(config, state):
readKnownNodes() readKnownNodes()
connectionpool.pool.connectToStream(1) connectionpool.pool.connectToStream(1)
for thread in ( for thread in (
BMNetworkThread(), InvThread(), AddrThread(), BMNetworkThread(queues), InvThread(protocol, state, queues, addresses),
DownloadThread(), UploadThread() AddrThread(protocol, queues), DownloadThread(state, protocol, addresses),
UploadThread(protocol, state)
): ):
thread.daemon = True thread.daemon = True
thread.start() thread.start()
# Optional components # Optional components
for i in range(config.getint('threads', 'receive')): for i in range(config.getint('threads', 'receive')):
thread = ReceiveQueueThread(i) thread = ReceiveQueueThread(queues, i)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'): if config.safeGetBoolean('bitmessagesettings', 'udp'):

View File

@ -6,8 +6,6 @@ from six.moves import queue
# magic imports! # magic imports!
import connectionpool import connectionpool
from helper_random import randomshuffle from helper_random import randomshuffle
from protocol import assembleAddrMessage
from network import queues # FIXME: init with queue
from threads import StoppableThread from threads import StoppableThread
@ -16,12 +14,17 @@ class AddrThread(StoppableThread):
"""(Node) address broadcasting thread""" """(Node) address broadcasting thread"""
name = "AddrBroadcaster" name = "AddrBroadcaster"
def __init__(self, protocol, queues):
self.protocol = protocol
self.queues = queues
StoppableThread.__init__()
def run(self): def run(self):
while not self._stopped: while not self._stopped:
chunk = [] chunk = []
while True: while True:
try: try:
data = queues.addrQueue.get(False) data = self.queues.addrQueue.get(False)
chunk.append(data) chunk.append(data)
except queue.Empty: except queue.Empty:
break break
@ -41,9 +44,9 @@ class AddrThread(StoppableThread):
continue continue
filtered.append((stream, peer, seen)) filtered.append((stream, peer, seen))
if filtered: if filtered:
i.append_write_buf(assembleAddrMessage(filtered)) i.append_write_buf(self.protocol.assembleAddrMessage(filtered))
queues.addrQueue.iterate() self.queues.addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):
queues.addrQueue.task_done() self.queues.addrQueue.task_done()
self.stop.wait(1) self.stop.wait(1)

View File

@ -5,7 +5,6 @@ import time
# magic imports! # magic imports!
import connectionpool import connectionpool
from network import config
from protocol import assembleAddrMessage from protocol import assembleAddrMessage
from node import Peer from node import Peer
@ -17,18 +16,22 @@ class AnnounceThread(StoppableThread):
name = "Announcer" name = "Announcer"
announceInterval = 60 announceInterval = 60
def __init__(self, config):
self.config = config
StoppableThread.__init__()
def run(self): def run(self):
lastSelfAnnounced = 0 lastSelfAnnounced = 0
while not self._stopped: while not self._stopped:
processed = 0 processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval: if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf() self.announceSelf(self.config)
lastSelfAnnounced = time.time() lastSelfAnnounced = time.time()
if processed == 0: if processed == 0:
self.stop.wait(10) self.stop.wait(10)
@staticmethod @staticmethod
def announceSelf(): def announceSelf(config):
"""Announce our presence""" """Announce our presence"""
for connection in connectionpool.pool.udpSockets.values(): for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing: if not connection.announcing:

View File

@ -2,7 +2,7 @@
`DownloadThread` class definition `DownloadThread` class definition
""" """
import time import time
from network import state, protocol, addresses, dandelion_ins from network import dandelion_ins
import helper_random import helper_random
import connectionpool import connectionpool
from objectracker import missingObjects from objectracker import missingObjects
@ -17,8 +17,11 @@ class DownloadThread(StoppableThread):
cleanInterval = 60 cleanInterval = 60
requestExpires = 3600 requestExpires = 3600
def __init__(self): def __init__(self, state, protocol, addresses):
super(DownloadThread, self).__init__(name="Downloader") super(DownloadThread, self).__init__(name="Downloader")
self.state = state
self.protocol = protocol
self.addresses = addresses
self.lastCleaned = time.time() self.lastCleaned = time.time()
def cleanPending(self): def cleanPending(self):
@ -57,7 +60,7 @@ class DownloadThread(StoppableThread):
payload = bytearray() payload = bytearray()
chunkCount = 0 chunkCount = 0
for chunk in request: for chunk in request:
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk): if chunk in self.state.Inventory and not dandelion_ins.hasHash(chunk):
try: try:
del i.objectsNewToMe[chunk] del i.objectsNewToMe[chunk]
except KeyError: except KeyError:
@ -68,8 +71,8 @@ class DownloadThread(StoppableThread):
missingObjects[chunk] = now missingObjects[chunk] = now
if not chunkCount: if not chunkCount:
continue continue
payload[0:0] = addresses.encodeVarint(chunkCount) payload[0:0] = self.addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload)) i.append_write_buf(self.protocol.CreatePacket('getdata', payload))
self.logger.debug( self.logger.debug(
'%s:%i Requesting %i objects', '%s:%i Requesting %i objects',
i.destination.host, i.destination.port, chunkCount) i.destination.host, i.destination.port, chunkCount)

View File

@ -5,7 +5,6 @@ import Queue
import random import random
from time import time from time import time
from network import protocol, state, queues, addresses
import connectionpool import connectionpool
from network import dandelion_ins from network import dandelion_ins
from threads import StoppableThread from threads import StoppableThread
@ -34,6 +33,13 @@ class InvThread(StoppableThread):
name = "InvBroadcaster" name = "InvBroadcaster"
def __init__(self, protocol, state, queues, addresses):
self.protocol = protocol
self.state = state
self.queues = queues
self.addresses = addresses
StoppableThread.__init__(self)
@staticmethod @staticmethod
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
@ -45,13 +51,13 @@ class InvThread(StoppableThread):
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
def run(self): # pylint: disable=too-many-branches def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion_ins.expire(queues.invQueue)) handleExpiredDandelion(dandelion_ins.expire(self.queues.invQueue))
try: try:
data = queues.invQueue.get(False) data = self.queues.invQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
# locally generated # locally generated
if len(data) == 2 or data[2] is None: if len(data) == 2 or data[2] is None:
@ -78,7 +84,7 @@ class InvThread(StoppableThread):
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311 if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & self.protocol.NODE_DANDELION > 0:
stems.append(inv[1]) stems.append(inv[1])
else: else:
fluffs.append(inv[1]) fluffs.append(inv[1])
@ -87,20 +93,20 @@ class InvThread(StoppableThread):
if fluffs: if fluffs:
random.shuffle(fluffs) random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(self.protocol.CreatePacket(
'inv', 'inv',
addresses.encodeVarint( self.addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs))) len(fluffs)) + ''.join(fluffs)))
if stems: if stems:
random.shuffle(stems) random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket( connection.append_write_buf(self.protocol.CreatePacket(
'dinv', 'dinv',
addresses.encodeVarint( self.addresses.encodeVarint(
len(stems)) + ''.join(stems))) len(stems)) + ''.join(stems)))
queues.invQueue.iterate() self.queues.invQueue.iterate()
for _ in range(len(chunk)): for _ in range(len(chunk)):
queues.invQueue.task_done() self.queues.invQueue.task_done()
dandelion_ins.reRandomiseStems() dandelion_ins.reRandomiseStems()

View File

@ -3,7 +3,6 @@ A thread to handle network concerns
""" """
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import connectionpool import connectionpool
from network import queues
from threads import StoppableThread from threads import StoppableThread
@ -11,12 +10,16 @@ class BMNetworkThread(StoppableThread):
"""Main network thread""" """Main network thread"""
name = "Asyncore" name = "Asyncore"
def __init__(self, queues):
self.queues = queues
StoppableThread.__init__(self)
def run(self): def run(self):
try: try:
while not self._stopped: while not self._stopped:
connectionpool.pool.loop() connectionpool.pool.loop()
except Exception as e: except Exception as e:
queues.excQueue.put((self.name, e)) self.queues.excQueue.put((self.name, e))
raise raise
def stopThread(self): def stopThread(self):

View File

@ -7,20 +7,20 @@ import socket
import connectionpool import connectionpool
from network.advanceddispatcher import UnknownStateError from network.advanceddispatcher import UnknownStateError
from network import queues
from threads import StoppableThread from threads import StoppableThread
class ReceiveQueueThread(StoppableThread): class ReceiveQueueThread(StoppableThread):
"""This thread processes data received from the network """This thread processes data received from the network
(which is done by the asyncore thread)""" (which is done by the asyncore thread)"""
def __init__(self, num=0): def __init__(self, queues, num=0):
self.queues = queues
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
def run(self): def run(self):
while not self._stopped: while not self._stopped:
try: try:
dest = queues.receiveDataQueue.get(block=True, timeout=1) dest = self.queues.receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty: except Queue.Empty:
continue continue
@ -38,7 +38,7 @@ class ReceiveQueueThread(StoppableThread):
connection = connectionpool.pool.getConnectionByAddr(dest) connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found # connection object not found
except KeyError: except KeyError:
queues.receiveDataQueue.task_done() self.queues.receiveDataQueue.task_done()
continue continue
try: try:
connection.process() connection.process()
@ -52,4 +52,4 @@ class ReceiveQueueThread(StoppableThread):
self.logger.error('Socket error: %s', err) self.logger.error('Socket error: %s', err)
except: # noqa:E722 except: # noqa:E722
self.logger.error('Error processing', exc_info=True) self.logger.error('Error processing', exc_info=True)
queues.receiveDataQueue.task_done() self.queues.receiveDataQueue.task_done()

View File

@ -4,7 +4,6 @@
import time import time
import helper_random import helper_random
from network import protocol, state
import connectionpool import connectionpool
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins from network import dandelion_ins
@ -18,6 +17,11 @@ class UploadThread(StoppableThread):
maxBufSize = 2097152 # 2MB maxBufSize = 2097152 # 2MB
name = "Uploader" name = "Uploader"
def __init__(self, protocol, state):
self.protocol = protocol
self.state = state
StoppableThread.__init__()
def run(self): def run(self):
while not self._stopped: while not self._stopped:
uploaded = 0 uploaded = 0
@ -48,8 +52,8 @@ class UploadThread(StoppableThread):
i.destination) i.destination)
break break
try: try:
payload.extend(protocol.CreatePacket( payload.extend(self.protocol.CreatePacket(
'object', state.Inventory[chunk].payload)) 'object', self.state.Inventory[chunk].payload))
chunk_count += 1 chunk_count += 1
except KeyError: except KeyError:
i.antiIntersectionDelay() i.antiIntersectionDelay()

View File

@ -74,7 +74,7 @@ class TestNetwork(TestPartialRun):
for _ in range(10): for _ in range(10):
try: try:
self.state.announceThread.announceSelf() self.state.announceThread.announceSelf(self.config)
except AttributeError: except AttributeError:
self.fail('state.announceThread is not set properly') self.fail('state.announceThread is not set properly')
time.sleep(1) time.sleep(1)