From 9f4cdaf82bcf291cc12ffeb1fa5225ec51cd598d Mon Sep 17 00:00:00 2001 From: mailchuck Date: Tue, 24 Nov 2015 01:55:17 +0100 Subject: [PATCH] Cleaner shutdown Addresses Bitmessage#549 --- src/api.py | 9 ++++++++- src/bitmessagemain.py | 23 +++++++++++++++++------ src/class_addressGenerator.py | 19 +++++++++++++++---- src/class_singleCleaner.py | 10 ++++++---- src/class_singleListener.py | 34 +++++++++++++++++++++++----------- src/class_singleWorker.py | 17 ++++++++++++++--- src/shared.py | 8 +++----- 7 files changed, 86 insertions(+), 34 deletions(-) diff --git a/src/api.py b/src/api.py index 20568ffc..7c355941 100644 --- a/src/api.py +++ b/src/api.py @@ -12,7 +12,7 @@ if __name__ == "__main__": import sys sys.exit(0) -from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler +from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer import json import shared @@ -43,6 +43,13 @@ class APIError(Exception): def __str__(self): return "API Error %04i: %s" % (self.error_number, self.error_message) + +class StoppableXMLRPCServer(SimpleXMLRPCServer): + def serve_forever(self): + while shared.shutdown == 0: + self.handle_request() + + # This is one of several classes that constitute the API # This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros). # http://code.activestate.com/recipes/501148-xmlrpc-serverclient-which-does-cookie-handling-and/ diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index d29d0928..2767e465 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -23,8 +23,7 @@ import sys from subprocess import call import time -from SimpleXMLRPCServer import SimpleXMLRPCServer -from api import MySimpleXMLRPCRequestHandler +from api import MySimpleXMLRPCRequestHandler, StoppableXMLRPCServer from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections import shared @@ -44,6 +43,7 @@ from debug import logger # Helper Functions import helper_bootstrap import helper_generic +from helper_threading import * # singleton lock instance thisapp = None @@ -119,13 +119,24 @@ def _fixWinsock(): socket.IPV6_V6ONLY = 27 # This thread, of which there is only one, runs the API. -class singleAPI(threading.Thread): - +class singleAPI(threading.Thread, StoppableThread): def __init__(self): - threading.Thread.__init__(self) + threading.Thread.__init__(self, name="singleAPI") + self.initStop() + + def stopThread(self): + super(singleAPI, self).stopThread() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint( + 'bitmessagesettings', 'apiport'))) + s.shutdown(socket.SHUT_RDWR) + s.close() + except: + pass def run(self): - se = SimpleXMLRPCServer((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint( + se = StoppableXMLRPCServer((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint( 'bitmessagesettings', 'apiport')), MySimpleXMLRPCRequestHandler, True, True) se.register_introspection_functions() se.serve_forever() diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index af25b210..5d1598c4 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -8,17 +8,26 @@ import hashlib import highlevelcrypto from addresses import * from debug import logger +from helper_threading import * from pyelliptic import arithmetic import tr -class addressGenerator(threading.Thread): +class addressGenerator(threading.Thread, StoppableThread): def __init__(self): # QThread.__init__(self, parent) - threading.Thread.__init__(self) + threading.Thread.__init__(self, name="addressGenerator") + self.initStop() + + def stopThread(self): + try: + shared.addressGeneratorQueue.put(("stopThread", "data")) + except: + pass + super(addressGenerator, self).stopThread() def run(self): - while True: + while shared.shutdown == 0: queueValue = shared.addressGeneratorQueue.get() nonceTrialsPerByte = 0 payloadLengthExtraBytes = 0 @@ -54,6 +63,8 @@ class addressGenerator(threading.Thread): numberOfNullBytesDemandedOnFrontOfRipeHash = 2 else: numberOfNullBytesDemandedOnFrontOfRipeHash = 1 # the default + elif queueValue[0] == 'stopThread': + break else: sys.stderr.write( 'Programming error: A structure with the wrong number of values was passed into the addressGeneratorQueue. Here is the queueValue: %s\n' % repr(queueValue)) @@ -278,4 +289,4 @@ class addressGenerator(threading.Thread): else: raise Exception( "Error in the addressGenerator thread. Thread was given a command it could not understand: " + command) - + shared.apiAddressGeneratorQueue.task_done() diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index bccf4a62..f9e87b1a 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -7,6 +7,7 @@ import pickle import tr#anslate from helper_sql import * +from helper_threading import * from debug import logger """ @@ -28,10 +29,11 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...) """ -class singleCleaner(threading.Thread): +class singleCleaner(threading.Thread, StoppableThread): def __init__(self): threading.Thread.__init__(self, name="singleCleaner") + self.initStop() def run(self): timeWeLastClearedInventoryAndPubkeysTables = 0 @@ -41,7 +43,7 @@ class singleCleaner(threading.Thread): # Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file. shared.maximumLengthOfTimeToBotherResendingMessages = float('inf') - while True: + while shared.shutdown == 0: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. @@ -84,7 +86,7 @@ class singleCleaner(threading.Thread): for row in queryreturn: if len(row) < 2: logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row)) - time.sleep(3) + self.stop.wait(3) break toAddress, ackData, status = row if status == 'awaitingpubkey': @@ -121,7 +123,7 @@ class singleCleaner(threading.Thread): os._exit(0) shared.knownNodesLock.release() shared.needToWriteKnownNodesToDisk = False - time.sleep(300) + self.stop.wait(300) def resendPubkeyRequest(address): diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 77e6eee6..a571867a 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -4,6 +4,7 @@ import socket from class_sendDataThread import * from class_receiveDataThread import * import helper_bootstrap +from helper_threading import * import errno import re @@ -15,10 +16,11 @@ import re # (within the recversion function of the recieveData thread) -class singleListener(threading.Thread): +class singleListener(threading.Thread, StoppableThread): def __init__(self): threading.Thread.__init__(self, name="singleListener") + self.initStop() def setup(self, selfInitiatedConnections): self.selfInitiatedConnections = selfInitiatedConnections @@ -37,6 +39,16 @@ class singleListener(threading.Thread): sock.bind((HOST, PORT)) sock.listen(2) return sock + + def stopThread(self): + super(singleListener, self).stopThread() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.connect(('127.0.0.1', shared.config.getint('bitmessagesettings', 'port'))) + s.shutdown(socket.SHUT_RDWR) + s.close() + except: + pass def run(self): # If there is a trusted peer then we don't want to accept @@ -44,15 +56,15 @@ class singleListener(threading.Thread): if shared.trustedPeer: return - while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'): - time.sleep(1) + while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0: + self.stop.wait(1) helper_bootstrap.dns() # We typically don't want to accept incoming connections if the user is using a # SOCKS proxy, unless they have configured otherwise. If they eventually select # proxy 'none' or configure SOCKS listening then this will start listening for # connections. - while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): - time.sleep(5) + while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten') and shared.shutdown == 0: + self.stop.wait(5) logger.info('Listening for incoming connections.') @@ -73,19 +85,19 @@ class singleListener(threading.Thread): # regexp to match an IPv4-mapped IPv6 address mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$') - while True: + while shared.shutdown == 0: # We typically don't want to accept incoming connections if the user is using a # SOCKS proxy, unless they have configured otherwise. If they eventually select # proxy 'none' or configure SOCKS listening then this will start listening for # connections. - while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): - time.sleep(10) - while len(shared.connectedHostsList) > 220: + while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten') and shared.shutdown == 0: + self.stop.wait(10) + while len(shared.connectedHostsList) > 220 and shared.shutdown == 0: logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.') - time.sleep(10) + self.stop.wait(10) - while True: + while shared.shutdown == 0: socketObject, sockaddr = sock.accept() (HOST, PORT) = sockaddr[0:2] diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index de8848b2..6a827ce2 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -15,17 +15,26 @@ from debug import logger from helper_sql import * import helper_inbox from helper_generic import addDataPadding +from helper_threading import * import l10n # This thread, of which there is only one, does the heavy lifting: # calculating POWs. -class singleWorker(threading.Thread): +class singleWorker(threading.Thread, StoppableThread): def __init__(self): # QThread.__init__(self, parent) threading.Thread.__init__(self, name="singleWorker") + self.initStop() + + def stopThread(self): + try: + shared.workerQueue.put(("stopThread", "data")) + except: + pass + super(singleWorker, self).stopThread() def run(self): @@ -52,7 +61,7 @@ class singleWorker(threading.Thread): logger.info('Watching for ackdata ' + ackdata.encode('hex')) shared.ackdataForWhichImWatching[ackdata] = 0 - time.sleep( + self.stop.wait( 10) # give some time for the GUI to start before we start on existing POW tasks. queryreturn = sqlQuery( @@ -68,7 +77,7 @@ class singleWorker(threading.Thread): # just in case there are any tasks for Broadcasts # that have yet to be sent. - while True: + while shared.shutdown == 0: command, data = shared.workerQueue.get() if command == 'sendmessage': self.sendMsg() @@ -80,6 +89,8 @@ class singleWorker(threading.Thread): self.sendOutOrStoreMyV3Pubkey(data) elif command == 'sendOutOrStoreMyV4Pubkey': self.sendOutOrStoreMyV4Pubkey(data) + elif command == 'stopThread': + return else: logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) diff --git a/src/shared.py b/src/shared.py index 96eefa29..249bd562 100644 --- a/src/shared.py +++ b/src/shared.py @@ -414,12 +414,10 @@ def doCleanShutdown(): from class_outgoingSynSender import outgoingSynSender for thread in threading.enumerate(): - if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: - if thread.isAlive() and isinstance(thread, StoppableThread): - thread.stopThread() + if thread.isAlive() and isinstance(thread, StoppableThread): + thread.stopThread() for thread in threading.enumerate(): - if thread.name == "uPnPThread": - #or "outgoingSynSender" in thread.name: + if thread is not threading.currentThread() and isinstance(thread, StoppableThread) and not isinstance(thread, outgoingSynSender): logger.debug("Waiting for thread %s", thread.name) thread.join()