Cleaner shutdown

Addresses Bitmessage#549
This commit is contained in:
mailchuck 2015-11-24 01:55:17 +01:00 committed by Peter Surda
parent 6dbe20a25c
commit d69c2e097f
7 changed files with 86 additions and 34 deletions

View File

@ -12,7 +12,7 @@ if __name__ == "__main__":
import sys import sys
sys.exit(0) sys.exit(0)
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
import json import json
import shared import shared
@ -43,6 +43,13 @@ class APIError(Exception):
def __str__(self): def __str__(self):
return "API Error %04i: %s" % (self.error_number, self.error_message) return "API Error %04i: %s" % (self.error_number, self.error_message)
class StoppableXMLRPCServer(SimpleXMLRPCServer):
def serve_forever(self):
while shared.shutdown == 0:
self.handle_request()
# This is one of several classes that constitute the API # This is one of several classes that constitute the API
# This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros). # This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros).
# http://code.activestate.com/recipes/501148-xmlrpc-serverclient-which-does-cookie-handling-and/ # http://code.activestate.com/recipes/501148-xmlrpc-serverclient-which-does-cookie-handling-and/

View File

@ -23,8 +23,7 @@ import sys
from subprocess import call from subprocess import call
import time import time
from SimpleXMLRPCServer import SimpleXMLRPCServer from api import MySimpleXMLRPCRequestHandler, StoppableXMLRPCServer
from api import MySimpleXMLRPCRequestHandler
from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections
import shared import shared
@ -44,6 +43,7 @@ from debug import logger
# Helper Functions # Helper Functions
import helper_bootstrap import helper_bootstrap
import helper_generic import helper_generic
from helper_threading import *
# singleton lock instance # singleton lock instance
thisapp = None thisapp = None
@ -119,13 +119,24 @@ def _fixWinsock():
socket.IPV6_V6ONLY = 27 socket.IPV6_V6ONLY = 27
# This thread, of which there is only one, runs the API. # This thread, of which there is only one, runs the API.
class singleAPI(threading.Thread): class singleAPI(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="singleAPI")
self.initStop()
def stopThread(self):
super(singleAPI, self).stopThread()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint(
'bitmessagesettings', 'apiport')))
s.shutdown(socket.SHUT_RDWR)
s.close()
except:
pass
def run(self): def run(self):
se = SimpleXMLRPCServer((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint( se = StoppableXMLRPCServer((shared.config.get('bitmessagesettings', 'apiinterface'), shared.config.getint(
'bitmessagesettings', 'apiport')), MySimpleXMLRPCRequestHandler, True, True) 'bitmessagesettings', 'apiport')), MySimpleXMLRPCRequestHandler, True, True)
se.register_introspection_functions() se.register_introspection_functions()
se.serve_forever() se.serve_forever()

View File

@ -8,17 +8,26 @@ import hashlib
import highlevelcrypto import highlevelcrypto
from addresses import * from addresses import *
from debug import logger from debug import logger
from helper_threading import *
from pyelliptic import arithmetic from pyelliptic import arithmetic
import tr import tr
class addressGenerator(threading.Thread): class addressGenerator(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
# QThread.__init__(self, parent) # QThread.__init__(self, parent)
threading.Thread.__init__(self) threading.Thread.__init__(self, name="addressGenerator")
self.initStop()
def stopThread(self):
try:
shared.addressGeneratorQueue.put(("stopThread", "data"))
except:
pass
super(addressGenerator, self).stopThread()
def run(self): def run(self):
while True: while shared.shutdown == 0:
queueValue = shared.addressGeneratorQueue.get() queueValue = shared.addressGeneratorQueue.get()
nonceTrialsPerByte = 0 nonceTrialsPerByte = 0
payloadLengthExtraBytes = 0 payloadLengthExtraBytes = 0
@ -54,6 +63,8 @@ class addressGenerator(threading.Thread):
numberOfNullBytesDemandedOnFrontOfRipeHash = 2 numberOfNullBytesDemandedOnFrontOfRipeHash = 2
else: else:
numberOfNullBytesDemandedOnFrontOfRipeHash = 1 # the default numberOfNullBytesDemandedOnFrontOfRipeHash = 1 # the default
elif queueValue[0] == 'stopThread':
break
else: else:
sys.stderr.write( sys.stderr.write(
'Programming error: A structure with the wrong number of values was passed into the addressGeneratorQueue. Here is the queueValue: %s\n' % repr(queueValue)) 'Programming error: A structure with the wrong number of values was passed into the addressGeneratorQueue. Here is the queueValue: %s\n' % repr(queueValue))
@ -278,4 +289,4 @@ class addressGenerator(threading.Thread):
else: else:
raise Exception( raise Exception(
"Error in the addressGenerator thread. Thread was given a command it could not understand: " + command) "Error in the addressGenerator thread. Thread was given a command it could not understand: " + command)
shared.apiAddressGeneratorQueue.task_done()

View File

@ -7,6 +7,7 @@ import pickle
import tr#anslate import tr#anslate
from helper_sql import * from helper_sql import *
from helper_threading import *
from debug import logger from debug import logger
""" """
@ -28,10 +29,11 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
""" """
class singleCleaner(threading.Thread): class singleCleaner(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="singleCleaner") threading.Thread.__init__(self, name="singleCleaner")
self.initStop()
def run(self): def run(self):
timeWeLastClearedInventoryAndPubkeysTables = 0 timeWeLastClearedInventoryAndPubkeysTables = 0
@ -41,7 +43,7 @@ class singleCleaner(threading.Thread):
# Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file. # Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file.
shared.maximumLengthOfTimeToBotherResendingMessages = float('inf') shared.maximumLengthOfTimeToBotherResendingMessages = float('inf')
while True: while shared.shutdown == 0:
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
@ -84,7 +86,7 @@ class singleCleaner(threading.Thread):
for row in queryreturn: for row in queryreturn:
if len(row) < 2: if len(row) < 2:
logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row)) logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
time.sleep(3) self.stop.wait(3)
break break
toAddress, ackData, status = row toAddress, ackData, status = row
if status == 'awaitingpubkey': if status == 'awaitingpubkey':
@ -121,7 +123,7 @@ class singleCleaner(threading.Thread):
os._exit(0) os._exit(0)
shared.knownNodesLock.release() shared.knownNodesLock.release()
shared.needToWriteKnownNodesToDisk = False shared.needToWriteKnownNodesToDisk = False
time.sleep(300) self.stop.wait(300)
def resendPubkeyRequest(address): def resendPubkeyRequest(address):

View File

@ -4,6 +4,7 @@ import socket
from class_sendDataThread import * from class_sendDataThread import *
from class_receiveDataThread import * from class_receiveDataThread import *
import helper_bootstrap import helper_bootstrap
from helper_threading import *
import errno import errno
import re import re
@ -15,10 +16,11 @@ import re
# (within the recversion function of the recieveData thread) # (within the recversion function of the recieveData thread)
class singleListener(threading.Thread): class singleListener(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="singleListener") threading.Thread.__init__(self, name="singleListener")
self.initStop()
def setup(self, selfInitiatedConnections): def setup(self, selfInitiatedConnections):
self.selfInitiatedConnections = selfInitiatedConnections self.selfInitiatedConnections = selfInitiatedConnections
@ -38,21 +40,31 @@ class singleListener(threading.Thread):
sock.listen(2) sock.listen(2)
return sock return sock
def stopThread(self):
super(singleListener, self).stopThread()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect(('127.0.0.1', shared.config.getint('bitmessagesettings', 'port')))
s.shutdown(socket.SHUT_RDWR)
s.close()
except:
pass
def run(self): def run(self):
# If there is a trusted peer then we don't want to accept # If there is a trusted peer then we don't want to accept
# incoming connections so we'll just abandon the thread # incoming connections so we'll just abandon the thread
if shared.trustedPeer: if shared.trustedPeer:
return return
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'): while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0:
time.sleep(1) self.stop.wait(1)
helper_bootstrap.dns() helper_bootstrap.dns()
# We typically don't want to accept incoming connections if the user is using a # We typically don't want to accept incoming connections if the user is using a
# SOCKS proxy, unless they have configured otherwise. If they eventually select # SOCKS proxy, unless they have configured otherwise. If they eventually select
# proxy 'none' or configure SOCKS listening then this will start listening for # proxy 'none' or configure SOCKS listening then this will start listening for
# connections. # connections.
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten') and shared.shutdown == 0:
time.sleep(5) self.stop.wait(5)
logger.info('Listening for incoming connections.') logger.info('Listening for incoming connections.')
@ -73,19 +85,19 @@ class singleListener(threading.Thread):
# regexp to match an IPv4-mapped IPv6 address # regexp to match an IPv4-mapped IPv6 address
mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$') mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$')
while True: while shared.shutdown == 0:
# We typically don't want to accept incoming connections if the user is using a # We typically don't want to accept incoming connections if the user is using a
# SOCKS proxy, unless they have configured otherwise. If they eventually select # SOCKS proxy, unless they have configured otherwise. If they eventually select
# proxy 'none' or configure SOCKS listening then this will start listening for # proxy 'none' or configure SOCKS listening then this will start listening for
# connections. # connections.
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten') and shared.shutdown == 0:
time.sleep(10) self.stop.wait(10)
while len(shared.connectedHostsList) > 220: while len(shared.connectedHostsList) > 220 and shared.shutdown == 0:
logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.') logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
time.sleep(10) self.stop.wait(10)
while True: while shared.shutdown == 0:
socketObject, sockaddr = sock.accept() socketObject, sockaddr = sock.accept()
(HOST, PORT) = sockaddr[0:2] (HOST, PORT) = sockaddr[0:2]

View File

@ -15,17 +15,26 @@ from debug import logger
from helper_sql import * from helper_sql import *
import helper_inbox import helper_inbox
from helper_generic import addDataPadding from helper_generic import addDataPadding
from helper_threading import *
import l10n import l10n
# This thread, of which there is only one, does the heavy lifting: # This thread, of which there is only one, does the heavy lifting:
# calculating POWs. # calculating POWs.
class singleWorker(threading.Thread): class singleWorker(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
# QThread.__init__(self, parent) # QThread.__init__(self, parent)
threading.Thread.__init__(self, name="singleWorker") threading.Thread.__init__(self, name="singleWorker")
self.initStop()
def stopThread(self):
try:
shared.workerQueue.put(("stopThread", "data"))
except:
pass
super(singleWorker, self).stopThread()
def run(self): def run(self):
@ -52,7 +61,7 @@ class singleWorker(threading.Thread):
logger.info('Watching for ackdata ' + ackdata.encode('hex')) logger.info('Watching for ackdata ' + ackdata.encode('hex'))
shared.ackdataForWhichImWatching[ackdata] = 0 shared.ackdataForWhichImWatching[ackdata] = 0
time.sleep( self.stop.wait(
10) # give some time for the GUI to start before we start on existing POW tasks. 10) # give some time for the GUI to start before we start on existing POW tasks.
queryreturn = sqlQuery( queryreturn = sqlQuery(
@ -68,7 +77,7 @@ class singleWorker(threading.Thread):
# just in case there are any tasks for Broadcasts # just in case there are any tasks for Broadcasts
# that have yet to be sent. # that have yet to be sent.
while True: while shared.shutdown == 0:
command, data = shared.workerQueue.get() command, data = shared.workerQueue.get()
if command == 'sendmessage': if command == 'sendmessage':
self.sendMsg() self.sendMsg()
@ -80,6 +89,8 @@ class singleWorker(threading.Thread):
self.sendOutOrStoreMyV3Pubkey(data) self.sendOutOrStoreMyV3Pubkey(data)
elif command == 'sendOutOrStoreMyV4Pubkey': elif command == 'sendOutOrStoreMyV4Pubkey':
self.sendOutOrStoreMyV4Pubkey(data) self.sendOutOrStoreMyV4Pubkey(data)
elif command == 'stopThread':
return
else: else:
logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)

View File

@ -414,12 +414,10 @@ def doCleanShutdown():
from class_outgoingSynSender import outgoingSynSender from class_outgoingSynSender import outgoingSynSender
for thread in threading.enumerate(): for thread in threading.enumerate():
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: if thread.isAlive() and isinstance(thread, StoppableThread):
if thread.isAlive() and isinstance(thread, StoppableThread): thread.stopThread()
thread.stopThread()
for thread in threading.enumerate(): for thread in threading.enumerate():
if thread.name == "uPnPThread": if thread is not threading.currentThread() and isinstance(thread, StoppableThread) and not isinstance(thread, outgoingSynSender):
#or "outgoingSynSender" in thread.name:
logger.debug("Waiting for thread %s", thread.name) logger.debug("Waiting for thread %s", thread.name)
thread.join() thread.join()