From ad75552b5c0ea173798e62fd756e79c46c59c2fa Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 14 Jan 2017 23:20:15 +0100 Subject: [PATCH] Move shutdown from shared.py to state.py --- src/api.py | 3 ++- src/bitmessageqt/support.py | 4 ++-- src/class_addressGenerator.py | 3 ++- src/class_objectProcessor.py | 18 +++++++++--------- src/class_outgoingSynSender.py | 8 ++++---- src/class_singleCleaner.py | 2 +- src/class_singleListener.py | 12 ++++++------ src/class_singleWorker.py | 22 +++++++++++----------- src/class_smtpDeliver.py | 3 ++- src/openclpow.py | 2 +- src/proofofwork.py | 14 ++++++++------ src/shared.py | 6 ++---- src/state.py | 2 ++ src/upnp.py | 5 +++-- 14 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/api.py b/src/api.py index 18b2dd20..cf02f69b 100644 --- a/src/api.py +++ b/src/api.py @@ -33,6 +33,7 @@ from struct import pack from helper_sql import sqlQuery,sqlExecute,SqlBulkExecute,sqlStoredProcedure from debug import logger from inventory import Inventory +import state from version import softwareVersion # Helper Functions @@ -52,7 +53,7 @@ class APIError(Exception): class StoppableXMLRPCServer(SimpleXMLRPCServer): def serve_forever(self): - while shared.shutdown == 0: + while state.shutdown == 0: self.handle_request() diff --git a/src/bitmessageqt/support.py b/src/bitmessageqt/support.py index 86068542..fc132453 100644 --- a/src/bitmessageqt/support.py +++ b/src/bitmessageqt/support.py @@ -68,7 +68,7 @@ def checkHasNormalAddress(): def createAddressIfNeeded(myapp): if not checkHasNormalAddress(): shared.addressGeneratorQueue.put(('createRandomAddress', 4, 1, str(QtGui.QApplication.translate("Support", SUPPORT_MY_LABEL)), 1, "", False, protocol.networkDefaultProofOfWorkNonceTrialsPerByte, protocol.networkDefaultPayloadLengthExtraBytes)) - while shared.shutdown == 0 and not checkHasNormalAddress(): + while state.shutdown == 0 and not checkHasNormalAddress(): time.sleep(.2) myapp.rerenderComboBoxSendFrom() return checkHasNormalAddress() @@ -76,7 +76,7 @@ def createAddressIfNeeded(myapp): def createSupportMessage(myapp): checkAddressBook(myapp) address = createAddressIfNeeded(myapp) - if shared.shutdown: + if state.shutdown: return myapp.ui.lineEditSubject.setText(str(QtGui.QApplication.translate("Support", SUPPORT_SUBJECT))) diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index ebedf2f9..00957594 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -14,6 +14,7 @@ import protocol from pyelliptic import arithmetic import tr from binascii import hexlify +import state class addressGenerator(threading.Thread, StoppableThread): @@ -30,7 +31,7 @@ class addressGenerator(threading.Thread, StoppableThread): super(addressGenerator, self).stopThread() def run(self): - while shared.shutdown == 0: + while state.shutdown == 0: queueValue = shared.addressGeneratorQueue.get() nonceTrialsPerByte = 0 payloadLengthExtraBytes = 0 diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 4d0b7be0..00e5fbd4 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -22,7 +22,7 @@ import helper_msgcoding import helper_sent from helper_sql import * import protocol -from state import neededPubkeys +import state import tr from debug import logger import l10n @@ -73,7 +73,7 @@ class objectProcessor(threading.Thread): except Exception as e: logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc()) - if shared.shutdown: + if state.shutdown: time.sleep(.5) # Wait just a moment for most of the connections to close numberOfObjectsThatWereInTheObjectProcessorQueue = 0 with SqlBulkExecute() as sql: @@ -83,7 +83,7 @@ class objectProcessor(threading.Thread): objectType,data) numberOfObjectsThatWereInTheObjectProcessorQueue += 1 logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue)) - shared.shutdown = 2 + state.shutdown = 2 break def processgetpubkey(self, data): @@ -286,12 +286,12 @@ class objectProcessor(threading.Thread): return tag = data[readPosition:readPosition + 32] - if tag not in neededPubkeys: + if tag not in state.neededPubkeys: logger.info('We don\'t need this v4 pubkey. We didn\'t ask for it.') return # Let us try to decrypt the pubkey - toAddress, cryptorObject = neededPubkeys[tag] + toAddress, cryptorObject = state.neededPubkeys[tag] if shared.decryptAndCheckPubkeyPayload(data, toAddress) == 'successful': # At this point we know that we have been waiting on this pubkey. # This function will command the workerThread to start work on @@ -783,8 +783,8 @@ class objectProcessor(threading.Thread): # stream number, and RIPE hash. status, addressVersion, streamNumber, ripe = decodeAddress(address) if addressVersion <=3: - if address in neededPubkeys: - del neededPubkeys[address] + if address in state.neededPubkeys: + del state.neededPubkeys[address] self.sendMessages(address) else: logger.debug('We don\'t need this pub key. We didn\'t ask for it. For address: %s' % address) @@ -794,8 +794,8 @@ class objectProcessor(threading.Thread): elif addressVersion >= 4: tag = hashlib.sha512(hashlib.sha512(encodeVarint( addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:] - if tag in neededPubkeys: - del neededPubkeys[tag] + if tag in state.neededPubkeys: + del state.neededPubkeys[tag] self.sendMessages(address) def sendMessages(self, address): diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index ef63d770..9ea361cc 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -38,7 +38,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): shared.knownNodes[self.streamNumber][peer] = time.time() shared.knownNodesLock.release() else: - while not shared.shutdown: + while not state.shutdown: shared.knownNodesLock.acquire() try: peer, = random.sample(shared.knownNodes[self.streamNumber], 1) @@ -82,7 +82,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): maximumConnections = 1 if state.trustedPeer else 8 # maximum number of outgoing connections = 8 while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections: self.stop.wait(10) - if shared.shutdown: + if state.shutdown: break random.seed() peer = self._getPeer() @@ -93,7 +93,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): random.seed() peer = self._getPeer() self.stop.wait(1) - if shared.shutdown: + if state.shutdown: break # Clear out the shared.alreadyAttemptedConnectionsList every half # hour so that this program will again attempt a connection @@ -108,7 +108,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): shared.alreadyAttemptedConnectionsListLock.release() except threading.ThreadError as e: pass - if shared.shutdown: + if state.shutdown: break self.name = "outgoingSynSender-" + peer.host.replace(":", ".") # log parser field separator address_family = socket.AF_INET diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 03ebc633..a4177986 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -48,7 +48,7 @@ class singleCleaner(threading.Thread, StoppableThread): # Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file. shared.maximumLengthOfTimeToBotherResendingMessages = float('inf') - while shared.shutdown == 0: + while state.shutdown == 0: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) Inventory().flush() diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 0ad82333..49acc71c 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -65,7 +65,7 @@ class singleListener(threading.Thread, StoppableThread): if state.trustedPeer: return - while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0: + while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and state.shutdown == 0: self.stop.wait(1) helper_bootstrap.dns() # We typically don't want to accept incoming connections if the user is using a @@ -76,7 +76,7 @@ class singleListener(threading.Thread, StoppableThread): while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and \ (not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and \ ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname')) and \ - shared.shutdown == 0: + state.shutdown == 0: self.stop.wait(5) logger.info('Listening for incoming connections.') @@ -99,19 +99,19 @@ class singleListener(threading.Thread, StoppableThread): # regexp to match an IPv4-mapped IPv6 address mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$') - while shared.shutdown == 0: + while state.shutdown == 0: # We typically don't want to accept incoming connections if the user is using a # SOCKS proxy, unless they have configured otherwise. If they eventually select # proxy 'none' or configure SOCKS listening then this will start listening for # connections. - while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and shared.shutdown == 0: + while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and state.shutdown == 0: self.stop.wait(10) - while len(shared.connectedHostsList) > 220 and shared.shutdown == 0: + while len(shared.connectedHostsList) > 220 and state.shutdown == 0: logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.') self.stop.wait(10) - while shared.shutdown == 0: + while state.shutdown == 0: socketObject, sockaddr = sock.accept() (HOST, PORT) = sockaddr[0:2] diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index fc4d324d..b09bb6fd 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -21,7 +21,7 @@ from helper_threading import * from inventory import Inventory import l10n import protocol -from state import neededPubkeys +import state from binascii import hexlify, unhexlify # This thread, of which there is only one, does the heavy lifting: @@ -57,13 +57,13 @@ class singleWorker(threading.Thread, StoppableThread): toAddress, = row toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress(toAddress) if toAddressVersionNumber <= 3 : - neededPubkeys[toAddress] = 0 + state.neededPubkeys[toAddress] = 0 elif toAddressVersionNumber >= 4: doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest() privEncryptionKey = doubleHashOfAddressData[:32] # Note that this is the first half of the sha512 hash. tag = doubleHashOfAddressData[32:] - neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. + state.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. # Initialize the shared.ackdataForWhichImWatching data structure queryreturn = sqlQuery( @@ -76,7 +76,7 @@ class singleWorker(threading.Thread, StoppableThread): self.stop.wait( 10) # give some time for the GUI to start before we start on existing POW tasks. - if shared.shutdown == 0: + if state.shutdown == 0: # just in case there are any pending tasks for msg # messages that have yet to be sent. shared.workerQueue.put(('sendmessage', '')) @@ -84,7 +84,7 @@ class singleWorker(threading.Thread, StoppableThread): # that have yet to be sent. shared.workerQueue.put(('sendbroadcast', '')) - while shared.shutdown == 0: + while state.shutdown == 0: self.busy = 0 command, data = shared.workerQueue.get() self.busy = 1 @@ -553,7 +553,7 @@ class singleWorker(threading.Thread, StoppableThread): toTag = '' else: toTag = hashlib.sha512(hashlib.sha512(encodeVarint(toAddressVersionNumber)+encodeVarint(toStreamNumber)+toRipe).digest()).digest()[32:] - if toaddress in neededPubkeys or toTag in neededPubkeys: + if toaddress in state.neededPubkeys or toTag in state.neededPubkeys: # We already sent a request for the pubkey sqlExecute( '''UPDATE sent SET status='awaitingpubkey', sleeptill=? WHERE toaddress=? AND status='msgqueued' ''', @@ -577,7 +577,7 @@ class singleWorker(threading.Thread, StoppableThread): toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest() privEncryptionKey = doubleHashOfToAddressData[:32] # The first half of the sha512 hash. tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash. - neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) + state.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) for value in Inventory().by_type_and_tag(1, toTag): if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table. @@ -585,7 +585,7 @@ class singleWorker(threading.Thread, StoppableThread): sqlExecute( '''UPDATE sent SET status='doingmsgpow', retrynumber=0 WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''', toaddress) - del neededPubkeys[tag] + del state.neededPubkeys[tag] break #else: # There was something wrong with this pubkey object even # though it had the correct tag- almost certainly because @@ -879,15 +879,15 @@ class singleWorker(threading.Thread, StoppableThread): retryNumber = queryReturn[0][0] if addressVersionNumber <= 3: - neededPubkeys[toAddress] = 0 + state.neededPubkeys[toAddress] = 0 elif addressVersionNumber >= 4: # If the user just clicked 'send' then the tag (and other information) will already # be in the neededPubkeys dictionary. But if we are recovering from a restart # of the client then we have to put it in now. privEncryptionKey = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[:32] # Note that this is the first half of the sha512 hash. tag = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[32:] # Note that this is the second half of the sha512 hash. - if tag not in neededPubkeys: - neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. + if tag not in state.neededPubkeys: + state.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. if retryNumber == 0: TTL = 2.5*24*60*60 # 2.5 days. This was chosen fairly arbitrarily. diff --git a/src/class_smtpDeliver.py b/src/class_smtpDeliver.py index 18206065..2314238f 100644 --- a/src/class_smtpDeliver.py +++ b/src/class_smtpDeliver.py @@ -10,6 +10,7 @@ from debug import logger from helper_threading import * from bitmessageqt.uisignaler import UISignaler import shared +import state SMTPDOMAIN = "bmaddr.lan" @@ -34,7 +35,7 @@ class smtpDeliver(threading.Thread, StoppableThread): return cls._instance def run(self): - while shared.shutdown == 0: + while state.shutdown == 0: command, data = shared.UISignalQueue.get() if command == 'writeNewAddressToTable': label, address, streamNumber = data diff --git a/src/openclpow.py b/src/openclpow.py index 001333e5..168f963f 100644 --- a/src/openclpow.py +++ b/src/openclpow.py @@ -7,7 +7,7 @@ import os from configparser import BMConfigParser import paths -from shared import shutdown +from state import shutdown from debug import logger libAvailable = True diff --git a/src/proofofwork.py b/src/proofofwork.py index 78188777..da9282f2 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -15,6 +15,8 @@ import tr import os import ctypes +import state + bitmsglib = 'bitmsghash.so' def _set_idle(): @@ -43,10 +45,10 @@ def _doSafePoW(target, initialHash): logger.debug("Safe PoW start") nonce = 0 trialValue = float('inf') - while trialValue > target and shared.shutdown == 0: + while trialValue > target and state.shutdown == 0: nonce += 1 trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - if shared.shutdown != 0: + if state.shutdown != 0: raise StopIteration("Interrupted") logger.debug("Safe PoW done") return [trialValue, nonce] @@ -71,7 +73,7 @@ def _doFastPoW(target, initialHash): result.append(pool.apply_async(_pool_worker, args=(i, initialHash, target, pool_size))) while True: - if shared.shutdown > 0: + if state.shutdown > 0: try: pool.terminate() pool.join() @@ -101,7 +103,7 @@ def _doCPoW(target, initialHash): logger.debug("C PoW start") nonce = bmpow(out_h, out_m) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - if shared.shutdown != 0: + if state.shutdown != 0: raise StopIteration("Interrupted") logger.debug("C PoW done") return [trialValue, nonce] @@ -117,7 +119,7 @@ def _doGPUPoW(target, initialHash): logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames) openclpow.enabledGpus = [] raise Exception("GPU did not calculate correctly.") - if shared.shutdown != 0: + if state.shutdown != 0: raise StopIteration("Interrupted") logger.debug("GPU PoW done") return [trialValue, nonce] @@ -186,7 +188,7 @@ def buildCPoW(): notifyBuild(True) def run(target, initialHash): - if shared.shutdown != 0: + if state.shutdown != 0: raise target = int(target) if openclpow.openclEnabled(): diff --git a/src/shared.py b/src/shared.py index b185e7e9..82d08d35 100644 --- a/src/shared.py +++ b/src/shared.py @@ -51,7 +51,6 @@ knownNodes = {} printLock = threading.Lock() statusIconColor = 'red' connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. -shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit. thisapp = None # singleton lock instance alreadyAttemptedConnectionsList = { } # This is a list of nodes to which we have already attempted a connection @@ -197,8 +196,7 @@ def reloadBroadcastSendersForWhichImWatching(): MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) def doCleanShutdown(): - global shutdown - shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. + state.shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. try: parserInputQueue.put(None, False) except Queue.Full: @@ -228,7 +226,7 @@ def doCleanShutdown(): # Verify that the objectProcessor has finished exiting. It should have incremented the # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. - while shutdown == 1: + while state.shutdown == 1: time.sleep(.1) # This one last useless query will guarantee that the previous flush committed and that the diff --git a/src/state.py b/src/state.py index f8af54c7..6ae563c4 100644 --- a/src/state.py +++ b/src/state.py @@ -15,6 +15,8 @@ networkProtocolAvailability = None appdata = '' #holds the location of the application data storage directory +shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit. + # If the trustedpeer option is specified in keys.dat then this will # contain a Peer which will be connected to instead of using the # addresses advertised by other peers. The client will only connect to diff --git a/src/upnp.py b/src/upnp.py index 69f89233..b8d41f4b 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -9,6 +9,7 @@ import time from configparser import BMConfigParser from helper_threading import * import shared +import state import tr def createRequestXML(service, action, arguments=None): @@ -197,7 +198,7 @@ class uPnPThread(threading.Thread, StoppableThread): logger.debug("Starting UPnP thread") logger.debug("Local IP: %s", self.localIP) lastSent = 0 - while shared.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): + while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): if time.time() - lastSent > self.sendSleep and len(self.routers) == 0: try: self.sendSearchRouter() @@ -205,7 +206,7 @@ class uPnPThread(threading.Thread, StoppableThread): pass lastSent = time.time() try: - while shared.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): + while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'): resp,(ip,port) = self.sock.recvfrom(1000) if resp is None: continue