From 24d8ffcd548e25ac0f7ab3baa874bed11ace65c7 Mon Sep 17 00:00:00 2001 From: mailchuck Date: Sat, 21 Nov 2015 11:59:44 +0100 Subject: [PATCH] Make UPnP into a thread - UPnP is now a separate thread that will continue to setup UPnP - shutdown waits for threads that shutdown correctly (Addresses Bitmessage#549) --- src/bitmessagemain.py | 9 +-- src/shared.py | 10 +-- src/upnp.py | 149 ++++++++++++++++++++++++------------------ 3 files changed, 98 insertions(+), 70 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 6078cabd..d29d0928 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -149,10 +149,6 @@ class Main: # is the application already running? If yes then exit. thisapp = singleton.singleinstance("", daemon) - if shared.safeConfigGetBoolean('bitmessagesettings','upnp'): - import upnp - upnp.createPortMapping() - # get curses flag curses = False if '-c' in sys.argv: @@ -211,6 +207,11 @@ class Main: singleListenerThread.setup(selfInitiatedConnections) singleListenerThread.daemon = True # close the main program even if there are threads left singleListenerThread.start() + + if shared.safeConfigGetBoolean('bitmessagesettings','upnp'): + import upnp + upnpThread = upnp.uPnPThread() + upnpThread.start() if daemon == False and shared.safeConfigGetBoolean('bitmessagesettings', 'daemon') == False: if curses == False: diff --git a/src/shared.py b/src/shared.py index 47f0ac14..364d7f9c 100644 --- a/src/shared.py +++ b/src/shared.py @@ -386,9 +386,6 @@ def doCleanShutdown(): 'Flushing inventory in memory out to disk. This should normally only take a second...')) flushInventory() - if safeConfigGetBoolean('bitmessagesettings','upnp'): - import upnp - upnp.deletePortMapping() # Verify that the objectProcessor has finished exiting. It should have incremented the # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. while shutdown == 1: @@ -402,7 +399,12 @@ def doCleanShutdown(): # Wait long enough to guarantee that any running proof of work worker threads will check the # shutdown variable and exit. If the main thread closes before they do then they won't stop. - time.sleep(.25) + time.sleep(.25) + + for thread in threading.enumerate(): + if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: + logger.debug("Waiting for thread %s", thread.name) + thread.join() if safeConfigGetBoolean('bitmessagesettings','daemon'): logger.info('Clean shutdown complete.') diff --git a/src/upnp.py b/src/upnp.py index b4648e02..00d27b68 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -4,42 +4,10 @@ import httplib from random import randint import socket from struct import unpack, pack +import threading +import time import shared -routers = [] - -def searchRouter(): - from debug import logger - SSDP_ADDR = "239.255.255.250" - SSDP_PORT = 1900 - SSDP_MX = 2 - SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" - ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \ - "HOST: %s:%d\r\n" % (SSDP_ADDR, SSDP_PORT) + \ - "MAN: \"ssdp:discover\"\r\n" + \ - "MX: %d\r\n" % (SSDP_MX, ) + \ - "ST: %s\r\n" % (SSDP_ST, ) + "\r\n" - routers = [] - - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) - sock.settimeout(2) - logger.debug("Sending UPnP query") - sock.sendto(ssdpRequest, (SSDP_ADDR, SSDP_PORT)) - except: - logger.exception("UPnP sock failed") - try: - while True: - resp,(ip,port) = sock.recvfrom(1000) - if resp is None: - continue - routers.append(Router(resp, ip)) - except: - logger.error("Failure running UPnP router search.", exc_info=True) - - return routers - def createRequestXML(service, action, arguments=[]): from xml.dom.minidom import Document @@ -97,6 +65,8 @@ class Router: path = "" address = None routerPath = None + extPort = None + def __init__(self, ssdpResponse, address): import urllib2 from xml.dom.minidom import parseString @@ -148,6 +118,7 @@ class Router: pass def AddPortMapping(self, externalPort, internalPort, internalClient, protocol, description, leaseDuration = 0, enabled = 1): + from debug import logger resp = self.soapRequest('WANIPConnection:1', 'AddPortMapping', [ ('NewExternalPort', str(externalPort)), ('NewProtocol', protocol), @@ -158,6 +129,7 @@ class Router: ('NewLeaseDuration', str(leaseDuration)) ]) self.extPort = externalPort + logger.info("Successfully established UPnP mapping for %s:%i on external port %i", internalClient, internalPort, externalPort) return resp def DeletePortMapping(self, externalPort, protocol): @@ -196,37 +168,90 @@ class Router: raise UPnPError(errinfo[0].childNodes[0].data) return resp -def createPortMappingInternal(router): - from debug import logger +class uPnPThread(threading.Thread): + def __init__ (self): + threading.Thread.__init__(self, name="uPnPThread") + self.localPort = shared.config.getint('bitmessagesettings', 'port') + self.extPort = None + self.routers = [] + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + self.sock.settimeout(10) + self.sendSleep = 60 + + def run(self): + from debug import logger + + logger.debug("Starting UPnP thread") + lastSent = 0 + while shared.shutdown == 0: + if time.time() - lastSent > self.sendSleep and len(self.routers) == 0: + self.sendSearchRouter() + lastSent = time.time() + try: + while shared.shutdown == 0: + resp,(ip,port) = self.sock.recvfrom(1000) + if resp is None: + continue + newRouter = Router(resp, ip) + for router in self.routers: + if router.location == newRouter.location: + break + else: + logger.debug("Found UPnP router at %s", ip) + self.routers.append(newRouter) + self.createPortMapping(newRouter) + break + except socket.timeout as e: + pass + except: + logger.error("Failure running UPnP router search.", exc_info=True) + for router in self.routers: + if router.extPort is None: + self.createPortMapping(router) + for router in self.routers: + if router.extPort is not None: + self.deletePortMapping(router) + logger.debug("UPnP thread done") + + def sendSearchRouter(self): + from debug import logger + SSDP_ADDR = "239.255.255.250" + SSDP_PORT = 1900 + SSDP_MX = 2 + SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" + ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \ + "HOST: %s:%d\r\n" % (SSDP_ADDR, SSDP_PORT) + \ + "MAN: \"ssdp:discover\"\r\n" + \ + "MX: %d\r\n" % (SSDP_MX, ) + \ + "ST: %s\r\n" % (SSDP_ST, ) + "\r\n" - for i in range(0, 50): try: - routerIP, = unpack('>I', socket.inet_aton(router.address)) - localIP = router.localAddress - localPort = shared.config.getint('bitmessagesettings', 'port') - if i == 0: - extPort = localPort # try same port first - else: - extPort = randint(32767, 65535) - logger.debug("Requesting UPnP mapping for %s:%i on external port %i", localIP, localPort, extPort) - router.AddPortMapping(extPort, localPort, localIP, 'TCP', 'BitMessage') - logger.info("Successfully established UPnP mapping for %s:%i on external port %i", localIP, localPort, extPort) - shared.extPort = extPort - break - except UPnPError: - logger.debug("UPnP error: ", exc_info=True) + logger.debug("Sending UPnP query") + self.sock.sendto(ssdpRequest, (SSDP_ADDR, SSDP_PORT)) + except: + logger.exception("UPnP send query failed") -def createPortMapping(): - from debug import logger - global routers + def createPortMapping(self, router): + from debug import logger + + for i in range(50): + try: + routerIP, = unpack('>I', socket.inet_aton(router.address)) + localIP = router.localAddress + if i == 0: + extPort = self.localPort # try same port first + else: + extPort = randint(32767, 65535) + logger.debug("Requesting UPnP mapping for %s:%i on external port %i", localIP, self.localPort, extPort) + router.AddPortMapping(extPort, self.localPort, localIP, 'TCP', 'BitMessage') + shared.extPort = extPort + break + except UPnPError: + logger.debug("UPnP error: ", exc_info=True) + + def deletePortMapping(self, router): + router.DeletePortMapping(router.extPort, 'TCP') - routers = searchRouter() - logger.debug("Found %i UPnP routers", len(routers)) - for router in routers: - createPortMappingInternal(router) -def deletePortMapping(): - for router in routers: - if hasattr(router, "extPort"): - router.DeletePortMapping(router.extPort, 'TCP')