From 9065910a38341c6e006a3c3046e9699c8593ca26 Mon Sep 17 00:00:00 2001 From: mailchuck Date: Sun, 22 Nov 2015 16:18:59 +0100 Subject: [PATCH] Threads close better - UPnP and outgoingSynSender threads close slightly better. - extPort initialisation was missing --- src/class_outgoingSynSender.py | 46 +++++++++++++++++++++------------- src/helper_threading.py | 10 ++++++++ src/shared.py | 11 +++++++- src/upnp.py | 5 +++- 4 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 src/helper_threading.py diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index 732f6c9f..d011ae94 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -2,6 +2,7 @@ import threading import time import random import shared +import select import socks import socket import sys @@ -9,14 +10,16 @@ import tr from class_sendDataThread import * from class_receiveDataThread import * +from helper_threading import * # For each stream to which we connect, several outgoingSynSender threads # will exist and will collectively create 8 connections with peers. -class outgoingSynSender(threading.Thread): +class outgoingSynSender(threading.Thread, StoppableThread): def __init__(self): threading.Thread.__init__(self, name="outgoingSynSender") + self.initStop() def setup(self, streamNumber, selfInitiatedConnections): self.streamNumber = streamNumber @@ -35,15 +38,22 @@ class outgoingSynSender(threading.Thread): shared.knownNodesLock.release() return peer + + def stopThread(self): + super(outgoingSynSender, self).stopThread() + try: + self.sock.shutdown(socket.SHUT_RDWR) + except: + pass def run(self): - while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'): - time.sleep(2) - while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): + while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect') and not self._stopped: + self.stop.wait(2) + while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections') and not self._stopped: self.name = "outgoingSynSender" maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8 while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections: - time.sleep(10) + self.stop.wait(10) if shared.shutdown: break random.seed() @@ -54,7 +64,9 @@ class outgoingSynSender(threading.Thread): # print 'choosing new sample' random.seed() peer = self._getPeer() - time.sleep(1) + self.stop.wait(1) + if shared.shutdown: + break # Clear out the shared.alreadyAttemptedConnectionsList every half # hour so that this program will again attempt a connection # to any nodes, even ones it has already tried. @@ -71,7 +83,7 @@ class outgoingSynSender(threading.Thread): else: address_family = socket.AF_INET6 try: - sock = socks.socksocket(address_family, socket.SOCK_STREAM) + self.sock = socks.socksocket(address_family, socket.SOCK_STREAM) except: """ The line can fail on Windows systems which aren't @@ -92,8 +104,8 @@ class outgoingSynSender(threading.Thread): continue # This option apparently avoids the TIME_WAIT state so that we # can rebind faster - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.settimeout(20) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.settimeout(20) if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2: logger.debug('Trying an outgoing connection to ' + str(peer)) @@ -113,10 +125,10 @@ class outgoingSynSender(threading.Thread): 'bitmessagesettings', 'socksusername') sockspassword = shared.config.get( 'bitmessagesettings', 'sockspassword') - sock.setproxy( + self.sock.setproxy( proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) else: - sock.setproxy( + self.sock.setproxy( proxytype, sockshostname, socksport, rdns) elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': if shared.verbose >= 2: @@ -133,19 +145,19 @@ class outgoingSynSender(threading.Thread): 'bitmessagesettings', 'socksusername') sockspassword = shared.config.get( 'bitmessagesettings', 'sockspassword') - sock.setproxy( + self.sock.setproxy( proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) else: - sock.setproxy( + self.sock.setproxy( proxytype, sockshostname, socksport, rdns) try: - sock.connect((peer.host, peer.port)) + self.sock.connect((peer.host, peer.port)) rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. - rd.setup(sock, + rd.setup(self.sock, peer.host, peer.port, self.streamNumber, @@ -157,7 +169,7 @@ class outgoingSynSender(threading.Thread): sd = sendDataThread(sendDataThreadQueue) - sd.setup(sock, peer.host, peer.port, self.streamNumber, + sd.setup(self.sock, peer.host, peer.port, self.streamNumber, someObjectsOfWhichThisRemoteNodeIsAlreadyAware) sd.start() sd.sendVersionMessage() @@ -221,4 +233,4 @@ class outgoingSynSender(threading.Thread): except Exception as err: import traceback logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:') - time.sleep(0.1) + self.stop.wait(0.1) diff --git a/src/helper_threading.py b/src/helper_threading.py new file mode 100644 index 00000000..599d297d --- /dev/null +++ b/src/helper_threading.py @@ -0,0 +1,10 @@ +import threading + +class StoppableThread(object): + def initStop(self): + self.stop = threading.Event() + self._stopped = False + + def stopThread(self): + self._stopped = True + self.stop.set() \ No newline at end of file diff --git a/src/shared.py b/src/shared.py index 364d7f9c..75d7eb6c 100644 --- a/src/shared.py +++ b/src/shared.py @@ -32,6 +32,7 @@ import highlevelcrypto import shared #import helper_startup from helper_sql import * +from helper_threading import * config = ConfigParser.SafeConfigParser() @@ -116,6 +117,9 @@ frozen = getattr(sys,'frozen', None) # security. trustedPeer = None +# For UPnP +extPort = None + #Compiled struct for packing/unpacking headers #New code should use CreatePacket instead of Header.pack Header = Struct('!L12sL4s') @@ -160,7 +164,7 @@ def assembleVersionMessage(remoteHost, remotePort, myStreamNumber): payload += pack('>q', 1) # bitflags of the services I offer. payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. - if safeConfigGetBoolean('bitmessagesettings', 'upnp'): + if safeConfigGetBoolean('bitmessagesettings', 'upnp' and extPort): payload += pack('>H', extPort) else: payload += pack('>H', shared.config.getint('bitmessagesettings', 'port')) @@ -401,6 +405,11 @@ def doCleanShutdown(): # shutdown variable and exit. If the main thread closes before they do then they won't stop. time.sleep(.25) + from class_outgoingSynSender import outgoingSynSender + for thread in threading.enumerate(): + if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: + if thread.isAlive() and isinstance(thread, StoppableThread): + thread.stopThread() for thread in threading.enumerate(): if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: logger.debug("Waiting for thread %s", thread.name) diff --git a/src/upnp.py b/src/upnp.py index 3d046973..6aef00a6 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -6,6 +6,7 @@ import socket from struct import unpack, pack import threading import time +from helper_threading import * import shared def createRequestXML(service, action, arguments=[]): @@ -168,7 +169,7 @@ class Router: raise UPnPError(errinfo[0].childNodes[0].data) return resp -class uPnPThread(threading.Thread): +class uPnPThread(threading.Thread, StoppableThread): def __init__ (self): threading.Thread.__init__(self, name="uPnPThread") self.localPort = shared.config.getint('bitmessagesettings', 'port') @@ -178,6 +179,7 @@ class uPnPThread(threading.Thread): self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.sock.settimeout(2) self.sendSleep = 60 + self.initStop() def run(self): from debug import logger @@ -213,6 +215,7 @@ class uPnPThread(threading.Thread): for router in self.routers: if router.extPort is not None: self.deletePortMapping(router) + shared.extPort = None logger.debug("UPnP thread done") def sendSearchRouter(self):