From ef5575aa63436edc2de6fcd84aaba412489c5c52 Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Wed, 3 Mar 2021 12:32:38 +0200 Subject: [PATCH] Init StoppableThread with state --- src/bitmessagemain.py | 12 ++++----- src/class_addressGenerator.py | 5 +--- src/class_singleWorker.py | 43 +++++++++++++++---------------- src/network/__init__.py | 12 ++++----- src/network/addrthread.py | 3 +-- src/network/announcethread.py | 8 +++--- src/network/downloadthread.py | 2 +- src/network/invthread.py | 10 +++---- src/network/networkthread.py | 3 +-- src/network/receivequeuethread.py | 10 +++---- src/network/threads.py | 3 ++- 11 files changed, 51 insertions(+), 60 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 438c04f3..e39679a3 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -186,13 +186,13 @@ class Main(object): shared.reloadBroadcastSendersForWhichImWatching() # Start the address generation thread - addressGeneratorThread = addressGenerator() + addressGeneratorThread = addressGenerator(state) # close the main program even if there are threads left addressGeneratorThread.daemon = True addressGeneratorThread.start() # Start the thread that calculates POWs - singleWorkerThread = singleWorker() + singleWorkerThread = singleWorker(state) # close the main program even if there are threads left singleWorkerThread.daemon = True singleWorkerThread.start() @@ -209,26 +209,26 @@ class Main(object): if daemon and config.safeGet( 'bitmessagesettings', 'smtpdeliver', '') != '': from class_smtpDeliver import smtpDeliver - smtpDeliveryThread = smtpDeliver() + smtpDeliveryThread = smtpDeliver(state) smtpDeliveryThread.start() # SMTP daemon thread if daemon and config.safeGetBoolean( 'bitmessagesettings', 'smtpd'): from class_smtpServer import smtpServer - smtpServerThread = smtpServer() + smtpServerThread = smtpServer(state) smtpServerThread.start() # API is also objproc dependent if config.safeGetBoolean('bitmessagesettings', 'apienabled'): import api # pylint: disable=relative-import - singleAPIThread = api.singleAPI() + singleAPIThread = api.singleAPI(state) # close the main program even if there are threads left singleAPIThread.daemon = True singleAPIThread.start() # Start the cleanerThread - singleCleanerThread = singleCleaner() + singleCleanerThread = singleCleaner(state) # close the main program even if there are threads left singleCleanerThread.daemon = True singleCleanerThread.start() diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index 25b0c5df..2356a1b9 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -9,7 +9,6 @@ import defaults import highlevelcrypto import queues import shared -import state import tr from addresses import decodeAddress, encodeAddress, encodeVarint from bmconfigparser import BMConfigParser @@ -45,9 +44,7 @@ class addressGenerator(StoppableThread): """ # pylint: disable=too-many-locals, too-many-branches # pylint: disable=protected-access, too-many-statements - # pylint: disable=too-many-nested-blocks - - while state.shutdown == 0: + while self.state.shutdown == 0: queueValue = queues.addressGeneratorQueue.get() nonceTrialsPerByte = 0 payloadLengthExtraBytes = 0 diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index fea842ea..68659a4e 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -23,7 +23,6 @@ import proofofwork import protocol import queues import shared -import state import tr from addresses import ( calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint @@ -48,8 +47,8 @@ def sizeof_fmt(num, suffix='h/s'): class singleWorker(StoppableThread): """Thread for performing PoW""" - def __init__(self): - super(singleWorker, self).__init__(name="singleWorker") + def __init__(self, state): + super(singleWorker, self).__init__(state, name="singleWorker") proofofwork.init() def stopThread(self): @@ -64,9 +63,9 @@ class singleWorker(StoppableThread): def run(self): # pylint: disable=attribute-defined-outside-init - while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: + while not helper_sql.sql_ready.wait(1.0) and self.state.shutdown == 0: self.stop.wait(1.0) - if state.shutdown > 0: + if self.state.shutdown > 0: return # Initialize the neededPubkeys dictionary. @@ -79,7 +78,7 @@ class singleWorker(StoppableThread): _, toAddressVersionNumber, toStreamNumber, toRipe = \ decodeAddress(toAddress) if toAddressVersionNumber <= 3: - state.neededPubkeys[toAddress] = 0 + self.state.neededPubkeys[toAddress] = 0 elif toAddressVersionNumber >= 4: doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( encodeVarint(toAddressVersionNumber) @@ -90,7 +89,7 @@ class singleWorker(StoppableThread): tag = doubleHashOfAddressData[32:] # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( + self.state.neededPubkeys[tag] = ( toAddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -102,19 +101,19 @@ class singleWorker(StoppableThread): for row in queryreturn: ackdata, = row self.logger.info('Watching for ackdata %s', hexlify(ackdata)) - state.ackdataForWhichImWatching[ackdata] = 0 + self.state.ackdataForWhichImWatching[ackdata] = 0 # Fix legacy (headerless) watched ackdata to include header - for oldack in state.ackdataForWhichImWatching: + for oldack in self.state.ackdataForWhichImWatching: if len(oldack) == 32: # attach legacy header, always constant (msg/1/1) newack = '\x00\x00\x00\x02\x01\x01' + oldack - state.ackdataForWhichImWatching[newack] = 0 + self.state.ackdataForWhichImWatching[newack] = 0 sqlExecute( '''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''', newack, oldack ) - del state.ackdataForWhichImWatching[oldack] + del self.state.ackdataForWhichImWatching[oldack] # For the case if user deleted knownnodes # but is still having onionpeer objects in inventory @@ -129,7 +128,7 @@ class singleWorker(StoppableThread): # before we start on existing POW tasks. self.stop.wait(10) - if state.shutdown: + if self.state.shutdown: return # just in case there are any pending tasks for msg @@ -142,7 +141,7 @@ class singleWorker(StoppableThread): # send onionpeer object queues.workerQueue.put(('sendOnionPeerObj', '')) - while state.shutdown == 0: + while self.state.shutdown == 0: self.busy = 0 command, data = queues.workerQueue.get() self.busy = 1 @@ -494,7 +493,7 @@ class singleWorker(StoppableThread): def sendOnionPeerObj(self, peer=None): """Send onionpeer object representing peer""" if not peer: # find own onionhostname - for peer in state.ownAddresses: + for peer in self.state.ownAddresses: if peer.host.endswith('.onion'): break else: @@ -799,8 +798,8 @@ class singleWorker(StoppableThread): encodeVarint(toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe ).digest()).digest()[32:] - if toaddress in state.neededPubkeys or \ - toTag in state.neededPubkeys: + if toaddress in self.state.neededPubkeys or \ + toTag in self.state.neededPubkeys: # We already sent a request for the pubkey sqlExecute( '''UPDATE sent SET status='awaitingpubkey', ''' @@ -841,7 +840,7 @@ class singleWorker(StoppableThread): privEncryptionKey = doubleHashOfToAddressData[:32] # The second half of the sha512 hash. tag = doubleHashOfToAddressData[32:] - state.neededPubkeys[tag] = ( + self.state.neededPubkeys[tag] = ( toaddress, highlevelcrypto.makeCryptor( hexlify(privEncryptionKey)) @@ -864,7 +863,7 @@ class singleWorker(StoppableThread): ''' status='doingpubkeypow') AND ''' ''' folder='sent' ''', toaddress) - del state.neededPubkeys[tag] + del self.state.neededPubkeys[tag] break # else: # There was something wrong with this @@ -906,7 +905,7 @@ class singleWorker(StoppableThread): # if we aren't sending this to ourselves or a chan if not BMConfigParser().has_section(toaddress): - state.ackdataForWhichImWatching[ackdata] = 0 + self.state.ackdataForWhichImWatching[ackdata] = 0 queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( ackdata, @@ -1400,7 +1399,7 @@ class singleWorker(StoppableThread): retryNumber = queryReturn[0][0] if addressVersionNumber <= 3: - state.neededPubkeys[toAddress] = 0 + self.state.neededPubkeys[toAddress] = 0 elif addressVersionNumber >= 4: # If the user just clicked 'send' then the tag # (and other information) will already be in the @@ -1417,10 +1416,10 @@ class singleWorker(StoppableThread): encodeVarint(addressVersionNumber) + encodeVarint(streamNumber) + ripe ).digest()).digest()[32:] - if tag not in state.neededPubkeys: + if tag not in self.state.neededPubkeys: # We'll need this for when we receive a pubkey reply: # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( + self.state.neededPubkeys[tag] = ( toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) ) diff --git a/src/network/__init__.py b/src/network/__init__.py index 9236c757..cd517094 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -29,28 +29,28 @@ def start(config, state): # init, needs to be early because other thread may access it early Dandelion() BMConnectionPool().connectToStream(1) - asyncoreThread = BMNetworkThread() + asyncoreThread = BMNetworkThread(state) asyncoreThread.daemon = True asyncoreThread.start() - invThread = InvThread() + invThread = InvThread(state) invThread.daemon = True invThread.start() - addrThread = AddrThread() + addrThread = AddrThread(state) addrThread.daemon = True addrThread.start() downloadThread = DownloadThread() downloadThread.daemon = True downloadThread.start() - uploadThread = UploadThread() + uploadThread = UploadThread(state) # state is not used uploadThread.daemon = True uploadThread.start() # Optional components for i in range(config.getint('threads', 'receive')): - receiveQueueThread = ReceiveQueueThread(i) + receiveQueueThread = ReceiveQueueThread(state, i) receiveQueueThread.daemon = True receiveQueueThread.start() if config.safeGetBoolean('bitmessagesettings', 'udp'): - state.announceThread = AnnounceThread() + state.announceThread = AnnounceThread(state) state.announceThread.daemon = True state.announceThread.start() diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 8b46750f..53f665b5 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -4,7 +4,6 @@ Announce addresses as they are received from other hosts from six.moves import queue -import state from helper_random import randomshuffle from network.assemble import assemble_addr from network.connectionpool import BMConnectionPool @@ -17,7 +16,7 @@ class AddrThread(StoppableThread): name = "AddrBroadcaster" def run(self): - while not state.shutdown: + while not self.state.shutdown: chunk = [] while True: try: diff --git a/src/network/announcethread.py b/src/network/announcethread.py index e34ed963..c64951a0 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -3,7 +3,6 @@ Announce myself (node address) """ import time -import state from bmconfigparser import BMConfigParser from network.assemble import assemble_addr from network.connectionpool import BMConnectionPool @@ -18,7 +17,7 @@ class AnnounceThread(StoppableThread): def run(self): lastSelfAnnounced = 0 - while not self._stopped and state.shutdown == 0: + while not self._stopped and self.state.shutdown == 0: processed = 0 if lastSelfAnnounced < time.time() - self.announceInterval: self.announceSelf() @@ -26,13 +25,12 @@ class AnnounceThread(StoppableThread): if processed == 0: self.stop.wait(10) - @staticmethod - def announceSelf(): + def announceSelf(self): """Announce our presence""" for connection in BMConnectionPool().udpSockets.values(): if not connection.announcing: continue - for stream in state.streamsInWhichIAmParticipating: + for stream in self.state.streamsInWhichIAmParticipating: addr = ( stream, Peer( diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 0ae83b5b..22b589f6 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -22,7 +22,7 @@ class DownloadThread(StoppableThread): requestExpires = 3600 def __init__(self): - super(DownloadThread, self).__init__(name="Downloader") + super(DownloadThread, self).__init__(None, name="Downloader") self.lastCleaned = time.time() def cleanPending(self): diff --git a/src/network/invthread.py b/src/network/invthread.py index e68b7692..2e513d59 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -7,7 +7,6 @@ from time import time import addresses import protocol -import state from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion from queues import invQueue @@ -37,18 +36,17 @@ class InvThread(StoppableThread): name = "InvBroadcaster" - @staticmethod - def handleLocallyGenerated(stream, hashId): + def handleLocallyGenerated(self, stream, hashId): """Locally generated inventory items require special handling""" Dandelion().addHash(hashId, stream=stream) for connection in BMConnectionPool().connections(): - if state.dandelion and connection != \ + if self.state.dandelion and connection != \ Dandelion().objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() def run(self): # pylint: disable=too-many-branches - while not state.shutdown: # pylint: disable=too-many-nested-blocks + while not self.state.shutdown: # pylint: disable=too-many-nested-blocks chunk = [] while True: # Dandelion fluff trigger by expiration @@ -78,7 +76,7 @@ class InvThread(StoppableThread): if connection == Dandelion().objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off - if random.randint(1, 100) >= state.dandelion: + if random.randint(1, 100) >= self.state.dandelion: fluffs.append(inv[1]) # send a dinv only if the stem node supports dandelion elif connection.services & protocol.NODE_DANDELION > 0: diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 61ff6c09..d88bc2e9 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -2,7 +2,6 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore -import state from network.connectionpool import BMConnectionPool from queues import excQueue from threads import StoppableThread @@ -14,7 +13,7 @@ class BMNetworkThread(StoppableThread): def run(self): try: - while not self._stopped and state.shutdown == 0: + while not self._stopped and self.state.shutdown == 0: BMConnectionPool().loop() except Exception as e: excQueue.put((self.name, e)) diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 56c01b77..fa8bf99c 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -5,7 +5,6 @@ import errno import Queue import socket -import state from network.advanceddispatcher import UnknownStateError from network.connectionpool import BMConnectionPool from queues import receiveDataQueue @@ -15,17 +14,18 @@ from threads import StoppableThread class ReceiveQueueThread(StoppableThread): """This thread processes data received from the network (which is done by the asyncore thread)""" - def __init__(self, num=0): - super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) + def __init__(self, state, num=0): + super(ReceiveQueueThread, self).__init__( + state, name="ReceiveQueue_%i" % num) def run(self): - while not self._stopped and state.shutdown == 0: + while not self._stopped and self.state.shutdown == 0: try: dest = receiveDataQueue.get(block=True, timeout=1) except Queue.Empty: continue - if self._stopped or state.shutdown: + if self._stopped or self.state.shutdown: break # cycle as long as there is data diff --git a/src/network/threads.py b/src/network/threads.py index 9bdaa85d..19c58e2f 100644 --- a/src/network/threads.py +++ b/src/network/threads.py @@ -11,7 +11,8 @@ class StoppableThread(threading.Thread): name = None logger = logging.getLogger('default') - def __init__(self, name=None): + def __init__(self, state, name=None): + self.state = state if name: self.name = name super(StoppableThread, self).__init__(name=self.name)