Init StoppableThread with state

This commit is contained in:
Dmitri Bogomolov 2021-03-03 12:32:38 +02:00
parent 37344930f6
commit ef5575aa63
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
11 changed files with 51 additions and 60 deletions

View File

@ -186,13 +186,13 @@ class Main(object):
shared.reloadBroadcastSendersForWhichImWatching() shared.reloadBroadcastSendersForWhichImWatching()
# Start the address generation thread # Start the address generation thread
addressGeneratorThread = addressGenerator() addressGeneratorThread = addressGenerator(state)
# close the main program even if there are threads left # close the main program even if there are threads left
addressGeneratorThread.daemon = True addressGeneratorThread.daemon = True
addressGeneratorThread.start() addressGeneratorThread.start()
# Start the thread that calculates POWs # Start the thread that calculates POWs
singleWorkerThread = singleWorker() singleWorkerThread = singleWorker(state)
# close the main program even if there are threads left # close the main program even if there are threads left
singleWorkerThread.daemon = True singleWorkerThread.daemon = True
singleWorkerThread.start() singleWorkerThread.start()
@ -209,26 +209,26 @@ class Main(object):
if daemon and config.safeGet( if daemon and config.safeGet(
'bitmessagesettings', 'smtpdeliver', '') != '': 'bitmessagesettings', 'smtpdeliver', '') != '':
from class_smtpDeliver import smtpDeliver from class_smtpDeliver import smtpDeliver
smtpDeliveryThread = smtpDeliver() smtpDeliveryThread = smtpDeliver(state)
smtpDeliveryThread.start() smtpDeliveryThread.start()
# SMTP daemon thread # SMTP daemon thread
if daemon and config.safeGetBoolean( if daemon and config.safeGetBoolean(
'bitmessagesettings', 'smtpd'): 'bitmessagesettings', 'smtpd'):
from class_smtpServer import smtpServer from class_smtpServer import smtpServer
smtpServerThread = smtpServer() smtpServerThread = smtpServer(state)
smtpServerThread.start() smtpServerThread.start()
# API is also objproc dependent # API is also objproc dependent
if config.safeGetBoolean('bitmessagesettings', 'apienabled'): if config.safeGetBoolean('bitmessagesettings', 'apienabled'):
import api # pylint: disable=relative-import import api # pylint: disable=relative-import
singleAPIThread = api.singleAPI() singleAPIThread = api.singleAPI(state)
# close the main program even if there are threads left # close the main program even if there are threads left
singleAPIThread.daemon = True singleAPIThread.daemon = True
singleAPIThread.start() singleAPIThread.start()
# Start the cleanerThread # Start the cleanerThread
singleCleanerThread = singleCleaner() singleCleanerThread = singleCleaner(state)
# close the main program even if there are threads left # close the main program even if there are threads left
singleCleanerThread.daemon = True singleCleanerThread.daemon = True
singleCleanerThread.start() singleCleanerThread.start()

View File

@ -9,7 +9,6 @@ import defaults
import highlevelcrypto import highlevelcrypto
import queues import queues
import shared import shared
import state
import tr import tr
from addresses import decodeAddress, encodeAddress, encodeVarint from addresses import decodeAddress, encodeAddress, encodeVarint
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
@ -45,9 +44,7 @@ class addressGenerator(StoppableThread):
""" """
# pylint: disable=too-many-locals, too-many-branches # pylint: disable=too-many-locals, too-many-branches
# pylint: disable=protected-access, too-many-statements # pylint: disable=protected-access, too-many-statements
# pylint: disable=too-many-nested-blocks while self.state.shutdown == 0:
while state.shutdown == 0:
queueValue = queues.addressGeneratorQueue.get() queueValue = queues.addressGeneratorQueue.get()
nonceTrialsPerByte = 0 nonceTrialsPerByte = 0
payloadLengthExtraBytes = 0 payloadLengthExtraBytes = 0

View File

@ -23,7 +23,6 @@ import proofofwork
import protocol import protocol
import queues import queues
import shared import shared
import state
import tr import tr
from addresses import ( from addresses import (
calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint
@ -48,8 +47,8 @@ def sizeof_fmt(num, suffix='h/s'):
class singleWorker(StoppableThread): class singleWorker(StoppableThread):
"""Thread for performing PoW""" """Thread for performing PoW"""
def __init__(self): def __init__(self, state):
super(singleWorker, self).__init__(name="singleWorker") super(singleWorker, self).__init__(state, name="singleWorker")
proofofwork.init() proofofwork.init()
def stopThread(self): def stopThread(self):
@ -64,9 +63,9 @@ class singleWorker(StoppableThread):
def run(self): def run(self):
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: while not helper_sql.sql_ready.wait(1.0) and self.state.shutdown == 0:
self.stop.wait(1.0) self.stop.wait(1.0)
if state.shutdown > 0: if self.state.shutdown > 0:
return return
# Initialize the neededPubkeys dictionary. # Initialize the neededPubkeys dictionary.
@ -79,7 +78,7 @@ class singleWorker(StoppableThread):
_, toAddressVersionNumber, toStreamNumber, toRipe = \ _, toAddressVersionNumber, toStreamNumber, toRipe = \
decodeAddress(toAddress) decodeAddress(toAddress)
if toAddressVersionNumber <= 3: if toAddressVersionNumber <= 3:
state.neededPubkeys[toAddress] = 0 self.state.neededPubkeys[toAddress] = 0
elif toAddressVersionNumber >= 4: elif toAddressVersionNumber >= 4:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
encodeVarint(toAddressVersionNumber) encodeVarint(toAddressVersionNumber)
@ -90,7 +89,7 @@ class singleWorker(StoppableThread):
tag = doubleHashOfAddressData[32:] tag = doubleHashOfAddressData[32:]
# We'll need this for when we receive a pubkey reply: # We'll need this for when we receive a pubkey reply:
# it will be encrypted and we'll need to decrypt it. # it will be encrypted and we'll need to decrypt it.
state.neededPubkeys[tag] = ( self.state.neededPubkeys[tag] = (
toAddress, toAddress,
highlevelcrypto.makeCryptor( highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey)) hexlify(privEncryptionKey))
@ -102,19 +101,19 @@ class singleWorker(StoppableThread):
for row in queryreturn: for row in queryreturn:
ackdata, = row ackdata, = row
self.logger.info('Watching for ackdata %s', hexlify(ackdata)) self.logger.info('Watching for ackdata %s', hexlify(ackdata))
state.ackdataForWhichImWatching[ackdata] = 0 self.state.ackdataForWhichImWatching[ackdata] = 0
# Fix legacy (headerless) watched ackdata to include header # Fix legacy (headerless) watched ackdata to include header
for oldack in state.ackdataForWhichImWatching: for oldack in self.state.ackdataForWhichImWatching:
if len(oldack) == 32: if len(oldack) == 32:
# attach legacy header, always constant (msg/1/1) # attach legacy header, always constant (msg/1/1)
newack = '\x00\x00\x00\x02\x01\x01' + oldack newack = '\x00\x00\x00\x02\x01\x01' + oldack
state.ackdataForWhichImWatching[newack] = 0 self.state.ackdataForWhichImWatching[newack] = 0
sqlExecute( sqlExecute(
'''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''', '''UPDATE sent SET ackdata=? WHERE ackdata=? AND folder = 'sent' ''',
newack, oldack newack, oldack
) )
del state.ackdataForWhichImWatching[oldack] del self.state.ackdataForWhichImWatching[oldack]
# For the case if user deleted knownnodes # For the case if user deleted knownnodes
# but is still having onionpeer objects in inventory # but is still having onionpeer objects in inventory
@ -129,7 +128,7 @@ class singleWorker(StoppableThread):
# before we start on existing POW tasks. # before we start on existing POW tasks.
self.stop.wait(10) self.stop.wait(10)
if state.shutdown: if self.state.shutdown:
return return
# just in case there are any pending tasks for msg # just in case there are any pending tasks for msg
@ -142,7 +141,7 @@ class singleWorker(StoppableThread):
# send onionpeer object # send onionpeer object
queues.workerQueue.put(('sendOnionPeerObj', '')) queues.workerQueue.put(('sendOnionPeerObj', ''))
while state.shutdown == 0: while self.state.shutdown == 0:
self.busy = 0 self.busy = 0
command, data = queues.workerQueue.get() command, data = queues.workerQueue.get()
self.busy = 1 self.busy = 1
@ -494,7 +493,7 @@ class singleWorker(StoppableThread):
def sendOnionPeerObj(self, peer=None): def sendOnionPeerObj(self, peer=None):
"""Send onionpeer object representing peer""" """Send onionpeer object representing peer"""
if not peer: # find own onionhostname if not peer: # find own onionhostname
for peer in state.ownAddresses: for peer in self.state.ownAddresses:
if peer.host.endswith('.onion'): if peer.host.endswith('.onion'):
break break
else: else:
@ -799,8 +798,8 @@ class singleWorker(StoppableThread):
encodeVarint(toAddressVersionNumber) encodeVarint(toAddressVersionNumber)
+ encodeVarint(toStreamNumber) + toRipe + encodeVarint(toStreamNumber) + toRipe
).digest()).digest()[32:] ).digest()).digest()[32:]
if toaddress in state.neededPubkeys or \ if toaddress in self.state.neededPubkeys or \
toTag in state.neededPubkeys: toTag in self.state.neededPubkeys:
# We already sent a request for the pubkey # We already sent a request for the pubkey
sqlExecute( sqlExecute(
'''UPDATE sent SET status='awaitingpubkey', ''' '''UPDATE sent SET status='awaitingpubkey', '''
@ -841,7 +840,7 @@ class singleWorker(StoppableThread):
privEncryptionKey = doubleHashOfToAddressData[:32] privEncryptionKey = doubleHashOfToAddressData[:32]
# The second half of the sha512 hash. # The second half of the sha512 hash.
tag = doubleHashOfToAddressData[32:] tag = doubleHashOfToAddressData[32:]
state.neededPubkeys[tag] = ( self.state.neededPubkeys[tag] = (
toaddress, toaddress,
highlevelcrypto.makeCryptor( highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey)) hexlify(privEncryptionKey))
@ -864,7 +863,7 @@ class singleWorker(StoppableThread):
''' status='doingpubkeypow') AND ''' ''' status='doingpubkeypow') AND '''
''' folder='sent' ''', ''' folder='sent' ''',
toaddress) toaddress)
del state.neededPubkeys[tag] del self.state.neededPubkeys[tag]
break break
# else: # else:
# There was something wrong with this # There was something wrong with this
@ -906,7 +905,7 @@ class singleWorker(StoppableThread):
# if we aren't sending this to ourselves or a chan # if we aren't sending this to ourselves or a chan
if not BMConfigParser().has_section(toaddress): if not BMConfigParser().has_section(toaddress):
state.ackdataForWhichImWatching[ackdata] = 0 self.state.ackdataForWhichImWatching[ackdata] = 0
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', ( 'updateSentItemStatusByAckdata', (
ackdata, ackdata,
@ -1400,7 +1399,7 @@ class singleWorker(StoppableThread):
retryNumber = queryReturn[0][0] retryNumber = queryReturn[0][0]
if addressVersionNumber <= 3: if addressVersionNumber <= 3:
state.neededPubkeys[toAddress] = 0 self.state.neededPubkeys[toAddress] = 0
elif addressVersionNumber >= 4: elif addressVersionNumber >= 4:
# If the user just clicked 'send' then the tag # If the user just clicked 'send' then the tag
# (and other information) will already be in the # (and other information) will already be in the
@ -1417,10 +1416,10 @@ class singleWorker(StoppableThread):
encodeVarint(addressVersionNumber) encodeVarint(addressVersionNumber)
+ encodeVarint(streamNumber) + ripe + encodeVarint(streamNumber) + ripe
).digest()).digest()[32:] ).digest()).digest()[32:]
if tag not in state.neededPubkeys: if tag not in self.state.neededPubkeys:
# We'll need this for when we receive a pubkey reply: # We'll need this for when we receive a pubkey reply:
# it will be encrypted and we'll need to decrypt it. # it will be encrypted and we'll need to decrypt it.
state.neededPubkeys[tag] = ( self.state.neededPubkeys[tag] = (
toAddress, toAddress,
highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
) )

View File

@ -29,28 +29,28 @@ def start(config, state):
# init, needs to be early because other thread may access it early # init, needs to be early because other thread may access it early
Dandelion() Dandelion()
BMConnectionPool().connectToStream(1) BMConnectionPool().connectToStream(1)
asyncoreThread = BMNetworkThread() asyncoreThread = BMNetworkThread(state)
asyncoreThread.daemon = True asyncoreThread.daemon = True
asyncoreThread.start() asyncoreThread.start()
invThread = InvThread() invThread = InvThread(state)
invThread.daemon = True invThread.daemon = True
invThread.start() invThread.start()
addrThread = AddrThread() addrThread = AddrThread(state)
addrThread.daemon = True addrThread.daemon = True
addrThread.start() addrThread.start()
downloadThread = DownloadThread() downloadThread = DownloadThread()
downloadThread.daemon = True downloadThread.daemon = True
downloadThread.start() downloadThread.start()
uploadThread = UploadThread() uploadThread = UploadThread(state) # state is not used
uploadThread.daemon = True uploadThread.daemon = True
uploadThread.start() uploadThread.start()
# Optional components # Optional components
for i in range(config.getint('threads', 'receive')): for i in range(config.getint('threads', 'receive')):
receiveQueueThread = ReceiveQueueThread(i) receiveQueueThread = ReceiveQueueThread(state, i)
receiveQueueThread.daemon = True receiveQueueThread.daemon = True
receiveQueueThread.start() receiveQueueThread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'): if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread() state.announceThread = AnnounceThread(state)
state.announceThread.daemon = True state.announceThread.daemon = True
state.announceThread.start() state.announceThread.start()

View File

@ -4,7 +4,6 @@ Announce addresses as they are received from other hosts
from six.moves import queue from six.moves import queue
import state
from helper_random import randomshuffle from helper_random import randomshuffle
from network.assemble import assemble_addr from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
@ -17,7 +16,7 @@ class AddrThread(StoppableThread):
name = "AddrBroadcaster" name = "AddrBroadcaster"
def run(self): def run(self):
while not state.shutdown: while not self.state.shutdown:
chunk = [] chunk = []
while True: while True:
try: try:

View File

@ -3,7 +3,6 @@ Announce myself (node address)
""" """
import time import time
import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from network.assemble import assemble_addr from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
@ -18,7 +17,7 @@ class AnnounceThread(StoppableThread):
def run(self): def run(self):
lastSelfAnnounced = 0 lastSelfAnnounced = 0
while not self._stopped and state.shutdown == 0: while not self._stopped and self.state.shutdown == 0:
processed = 0 processed = 0
if lastSelfAnnounced < time.time() - self.announceInterval: if lastSelfAnnounced < time.time() - self.announceInterval:
self.announceSelf() self.announceSelf()
@ -26,13 +25,12 @@ class AnnounceThread(StoppableThread):
if processed == 0: if processed == 0:
self.stop.wait(10) self.stop.wait(10)
@staticmethod def announceSelf(self):
def announceSelf():
"""Announce our presence""" """Announce our presence"""
for connection in BMConnectionPool().udpSockets.values(): for connection in BMConnectionPool().udpSockets.values():
if not connection.announcing: if not connection.announcing:
continue continue
for stream in state.streamsInWhichIAmParticipating: for stream in self.state.streamsInWhichIAmParticipating:
addr = ( addr = (
stream, stream,
Peer( Peer(

View File

@ -22,7 +22,7 @@ class DownloadThread(StoppableThread):
requestExpires = 3600 requestExpires = 3600
def __init__(self): def __init__(self):
super(DownloadThread, self).__init__(name="Downloader") super(DownloadThread, self).__init__(None, name="Downloader")
self.lastCleaned = time.time() self.lastCleaned = time.time()
def cleanPending(self): def cleanPending(self):

View File

@ -7,7 +7,6 @@ from time import time
import addresses import addresses
import protocol import protocol
import state
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion from network.dandelion import Dandelion
from queues import invQueue from queues import invQueue
@ -37,18 +36,17 @@ class InvThread(StoppableThread):
name = "InvBroadcaster" name = "InvBroadcaster"
@staticmethod def handleLocallyGenerated(self, stream, hashId):
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream) Dandelion().addHash(hashId, stream=stream)
for connection in BMConnectionPool().connections(): for connection in BMConnectionPool().connections():
if state.dandelion and connection != \ if self.state.dandelion and connection != \
Dandelion().objectChildStem(hashId): Dandelion().objectChildStem(hashId):
continue continue
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
def run(self): # pylint: disable=too-many-branches def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks while not self.state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
@ -78,7 +76,7 @@ class InvThread(StoppableThread):
if connection == Dandelion().objectChildStem(inv[1]): if connection == Dandelion().objectChildStem(inv[1]):
# Fluff trigger by RNG # Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off # auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion: if random.randint(1, 100) >= self.state.dandelion:
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & protocol.NODE_DANDELION > 0:

View File

@ -2,7 +2,6 @@
A thread to handle network concerns A thread to handle network concerns
""" """
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import state
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from queues import excQueue from queues import excQueue
from threads import StoppableThread from threads import StoppableThread
@ -14,7 +13,7 @@ class BMNetworkThread(StoppableThread):
def run(self): def run(self):
try: try:
while not self._stopped and state.shutdown == 0: while not self._stopped and self.state.shutdown == 0:
BMConnectionPool().loop() BMConnectionPool().loop()
except Exception as e: except Exception as e:
excQueue.put((self.name, e)) excQueue.put((self.name, e))

View File

@ -5,7 +5,6 @@ import errno
import Queue import Queue
import socket import socket
import state
from network.advanceddispatcher import UnknownStateError from network.advanceddispatcher import UnknownStateError
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from queues import receiveDataQueue from queues import receiveDataQueue
@ -15,17 +14,18 @@ from threads import StoppableThread
class ReceiveQueueThread(StoppableThread): class ReceiveQueueThread(StoppableThread):
"""This thread processes data received from the network """This thread processes data received from the network
(which is done by the asyncore thread)""" (which is done by the asyncore thread)"""
def __init__(self, num=0): def __init__(self, state, num=0):
super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) super(ReceiveQueueThread, self).__init__(
state, name="ReceiveQueue_%i" % num)
def run(self): def run(self):
while not self._stopped and state.shutdown == 0: while not self._stopped and self.state.shutdown == 0:
try: try:
dest = receiveDataQueue.get(block=True, timeout=1) dest = receiveDataQueue.get(block=True, timeout=1)
except Queue.Empty: except Queue.Empty:
continue continue
if self._stopped or state.shutdown: if self._stopped or self.state.shutdown:
break break
# cycle as long as there is data # cycle as long as there is data

View File

@ -11,7 +11,8 @@ class StoppableThread(threading.Thread):
name = None name = None
logger = logging.getLogger('default') logger = logging.getLogger('default')
def __init__(self, name=None): def __init__(self, state, name=None):
self.state = state
if name: if name:
self.name = name self.name = name
super(StoppableThread, self).__init__(name=self.name) super(StoppableThread, self).__init__(name=self.name)