Make UPnP into a thread

- UPnP is now a separate thread that will continue to setup UPnP
- shutdown waits for threads that shutdown correctly (Addresses
Bitmessage#549)
This commit is contained in:
mailchuck 2015-11-21 11:59:44 +01:00
parent 8075e375b5
commit 24d8ffcd54
Signed by untrusted user who does not match committer: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 98 additions and 70 deletions

View File

@ -149,10 +149,6 @@ class Main:
# is the application already running? If yes then exit. # is the application already running? If yes then exit.
thisapp = singleton.singleinstance("", daemon) thisapp = singleton.singleinstance("", daemon)
if shared.safeConfigGetBoolean('bitmessagesettings','upnp'):
import upnp
upnp.createPortMapping()
# get curses flag # get curses flag
curses = False curses = False
if '-c' in sys.argv: if '-c' in sys.argv:
@ -211,6 +207,11 @@ class Main:
singleListenerThread.setup(selfInitiatedConnections) singleListenerThread.setup(selfInitiatedConnections)
singleListenerThread.daemon = True # close the main program even if there are threads left singleListenerThread.daemon = True # close the main program even if there are threads left
singleListenerThread.start() singleListenerThread.start()
if shared.safeConfigGetBoolean('bitmessagesettings','upnp'):
import upnp
upnpThread = upnp.uPnPThread()
upnpThread.start()
if daemon == False and shared.safeConfigGetBoolean('bitmessagesettings', 'daemon') == False: if daemon == False and shared.safeConfigGetBoolean('bitmessagesettings', 'daemon') == False:
if curses == False: if curses == False:

View File

@ -386,9 +386,6 @@ def doCleanShutdown():
'Flushing inventory in memory out to disk. This should normally only take a second...')) 'Flushing inventory in memory out to disk. This should normally only take a second...'))
flushInventory() flushInventory()
if safeConfigGetBoolean('bitmessagesettings','upnp'):
import upnp
upnp.deletePortMapping()
# Verify that the objectProcessor has finished exiting. It should have incremented the # Verify that the objectProcessor has finished exiting. It should have incremented the
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
while shutdown == 1: while shutdown == 1:
@ -402,7 +399,12 @@ def doCleanShutdown():
# Wait long enough to guarantee that any running proof of work worker threads will check the # Wait long enough to guarantee that any running proof of work worker threads will check the
# shutdown variable and exit. If the main thread closes before they do then they won't stop. # shutdown variable and exit. If the main thread closes before they do then they won't stop.
time.sleep(.25) time.sleep(.25)
for thread in threading.enumerate():
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
logger.debug("Waiting for thread %s", thread.name)
thread.join()
if safeConfigGetBoolean('bitmessagesettings','daemon'): if safeConfigGetBoolean('bitmessagesettings','daemon'):
logger.info('Clean shutdown complete.') logger.info('Clean shutdown complete.')

View File

@ -4,42 +4,10 @@ import httplib
from random import randint from random import randint
import socket import socket
from struct import unpack, pack from struct import unpack, pack
import threading
import time
import shared import shared
routers = []
def searchRouter():
from debug import logger
SSDP_ADDR = "239.255.255.250"
SSDP_PORT = 1900
SSDP_MX = 2
SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \
"HOST: %s:%d\r\n" % (SSDP_ADDR, SSDP_PORT) + \
"MAN: \"ssdp:discover\"\r\n" + \
"MX: %d\r\n" % (SSDP_MX, ) + \
"ST: %s\r\n" % (SSDP_ST, ) + "\r\n"
routers = []
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.settimeout(2)
logger.debug("Sending UPnP query")
sock.sendto(ssdpRequest, (SSDP_ADDR, SSDP_PORT))
except:
logger.exception("UPnP sock failed")
try:
while True:
resp,(ip,port) = sock.recvfrom(1000)
if resp is None:
continue
routers.append(Router(resp, ip))
except:
logger.error("Failure running UPnP router search.", exc_info=True)
return routers
def createRequestXML(service, action, arguments=[]): def createRequestXML(service, action, arguments=[]):
from xml.dom.minidom import Document from xml.dom.minidom import Document
@ -97,6 +65,8 @@ class Router:
path = "" path = ""
address = None address = None
routerPath = None routerPath = None
extPort = None
def __init__(self, ssdpResponse, address): def __init__(self, ssdpResponse, address):
import urllib2 import urllib2
from xml.dom.minidom import parseString from xml.dom.minidom import parseString
@ -148,6 +118,7 @@ class Router:
pass pass
def AddPortMapping(self, externalPort, internalPort, internalClient, protocol, description, leaseDuration = 0, enabled = 1): def AddPortMapping(self, externalPort, internalPort, internalClient, protocol, description, leaseDuration = 0, enabled = 1):
from debug import logger
resp = self.soapRequest('WANIPConnection:1', 'AddPortMapping', [ resp = self.soapRequest('WANIPConnection:1', 'AddPortMapping', [
('NewExternalPort', str(externalPort)), ('NewExternalPort', str(externalPort)),
('NewProtocol', protocol), ('NewProtocol', protocol),
@ -158,6 +129,7 @@ class Router:
('NewLeaseDuration', str(leaseDuration)) ('NewLeaseDuration', str(leaseDuration))
]) ])
self.extPort = externalPort self.extPort = externalPort
logger.info("Successfully established UPnP mapping for %s:%i on external port %i", internalClient, internalPort, externalPort)
return resp return resp
def DeletePortMapping(self, externalPort, protocol): def DeletePortMapping(self, externalPort, protocol):
@ -196,37 +168,90 @@ class Router:
raise UPnPError(errinfo[0].childNodes[0].data) raise UPnPError(errinfo[0].childNodes[0].data)
return resp return resp
def createPortMappingInternal(router): class uPnPThread(threading.Thread):
from debug import logger def __init__ (self):
threading.Thread.__init__(self, name="uPnPThread")
self.localPort = shared.config.getint('bitmessagesettings', 'port')
self.extPort = None
self.routers = []
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
self.sock.settimeout(10)
self.sendSleep = 60
def run(self):
from debug import logger
logger.debug("Starting UPnP thread")
lastSent = 0
while shared.shutdown == 0:
if time.time() - lastSent > self.sendSleep and len(self.routers) == 0:
self.sendSearchRouter()
lastSent = time.time()
try:
while shared.shutdown == 0:
resp,(ip,port) = self.sock.recvfrom(1000)
if resp is None:
continue
newRouter = Router(resp, ip)
for router in self.routers:
if router.location == newRouter.location:
break
else:
logger.debug("Found UPnP router at %s", ip)
self.routers.append(newRouter)
self.createPortMapping(newRouter)
break
except socket.timeout as e:
pass
except:
logger.error("Failure running UPnP router search.", exc_info=True)
for router in self.routers:
if router.extPort is None:
self.createPortMapping(router)
for router in self.routers:
if router.extPort is not None:
self.deletePortMapping(router)
logger.debug("UPnP thread done")
def sendSearchRouter(self):
from debug import logger
SSDP_ADDR = "239.255.255.250"
SSDP_PORT = 1900
SSDP_MX = 2
SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
ssdpRequest = "M-SEARCH * HTTP/1.1\r\n" + \
"HOST: %s:%d\r\n" % (SSDP_ADDR, SSDP_PORT) + \
"MAN: \"ssdp:discover\"\r\n" + \
"MX: %d\r\n" % (SSDP_MX, ) + \
"ST: %s\r\n" % (SSDP_ST, ) + "\r\n"
for i in range(0, 50):
try: try:
routerIP, = unpack('>I', socket.inet_aton(router.address)) logger.debug("Sending UPnP query")
localIP = router.localAddress self.sock.sendto(ssdpRequest, (SSDP_ADDR, SSDP_PORT))
localPort = shared.config.getint('bitmessagesettings', 'port') except:
if i == 0: logger.exception("UPnP send query failed")
extPort = localPort # try same port first
else:
extPort = randint(32767, 65535)
logger.debug("Requesting UPnP mapping for %s:%i on external port %i", localIP, localPort, extPort)
router.AddPortMapping(extPort, localPort, localIP, 'TCP', 'BitMessage')
logger.info("Successfully established UPnP mapping for %s:%i on external port %i", localIP, localPort, extPort)
shared.extPort = extPort
break
except UPnPError:
logger.debug("UPnP error: ", exc_info=True)
def createPortMapping(): def createPortMapping(self, router):
from debug import logger from debug import logger
global routers
for i in range(50):
try:
routerIP, = unpack('>I', socket.inet_aton(router.address))
localIP = router.localAddress
if i == 0:
extPort = self.localPort # try same port first
else:
extPort = randint(32767, 65535)
logger.debug("Requesting UPnP mapping for %s:%i on external port %i", localIP, self.localPort, extPort)
router.AddPortMapping(extPort, self.localPort, localIP, 'TCP', 'BitMessage')
shared.extPort = extPort
break
except UPnPError:
logger.debug("UPnP error: ", exc_info=True)
def deletePortMapping(self, router):
router.DeletePortMapping(router.extPort, 'TCP')
routers = searchRouter()
logger.debug("Found %i UPnP routers", len(routers))
for router in routers:
createPortMappingInternal(router)
def deletePortMapping():
for router in routers:
if hasattr(router, "extPort"):
router.DeletePortMapping(router.extPort, 'TCP')