Threads close better

- UPnP and outgoingSynSender threads close slightly better.
- extPort initialisation was missing
This commit is contained in:
mailchuck 2015-11-22 16:18:59 +01:00 committed by Peter Surda
parent 48b9e50397
commit b00c4f24ec
4 changed files with 56 additions and 20 deletions

View File

@ -2,6 +2,7 @@ import threading
import time import time
import random import random
import shared import shared
import select
import socks import socks
import socket import socket
import sys import sys
@ -9,14 +10,16 @@ import tr
from class_sendDataThread import * from class_sendDataThread import *
from class_receiveDataThread import * from class_receiveDataThread import *
from helper_threading import *
# For each stream to which we connect, several outgoingSynSender threads # For each stream to which we connect, several outgoingSynSender threads
# will exist and will collectively create 8 connections with peers. # will exist and will collectively create 8 connections with peers.
class outgoingSynSender(threading.Thread): class outgoingSynSender(threading.Thread, StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="outgoingSynSender") threading.Thread.__init__(self, name="outgoingSynSender")
self.initStop()
def setup(self, streamNumber, selfInitiatedConnections): def setup(self, streamNumber, selfInitiatedConnections):
self.streamNumber = streamNumber self.streamNumber = streamNumber
@ -35,15 +38,22 @@ class outgoingSynSender(threading.Thread):
shared.knownNodesLock.release() shared.knownNodesLock.release()
return peer return peer
def stopThread(self):
super(outgoingSynSender, self).stopThread()
try:
self.sock.shutdown(socket.SHUT_RDWR)
except:
pass
def run(self): def run(self):
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'): while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect') and not self._stopped:
time.sleep(2) self.stop.wait(2)
while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections') and not self._stopped:
self.name = "outgoingSynSender" self.name = "outgoingSynSender"
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8 maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections: while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
time.sleep(10) self.stop.wait(10)
if shared.shutdown: if shared.shutdown:
break break
random.seed() random.seed()
@ -54,7 +64,9 @@ class outgoingSynSender(threading.Thread):
# print 'choosing new sample' # print 'choosing new sample'
random.seed() random.seed()
peer = self._getPeer() peer = self._getPeer()
time.sleep(1) self.stop.wait(1)
if shared.shutdown:
break
# Clear out the shared.alreadyAttemptedConnectionsList every half # Clear out the shared.alreadyAttemptedConnectionsList every half
# hour so that this program will again attempt a connection # hour so that this program will again attempt a connection
# to any nodes, even ones it has already tried. # to any nodes, even ones it has already tried.
@ -71,7 +83,7 @@ class outgoingSynSender(threading.Thread):
else: else:
address_family = socket.AF_INET6 address_family = socket.AF_INET6
try: try:
sock = socks.socksocket(address_family, socket.SOCK_STREAM) self.sock = socks.socksocket(address_family, socket.SOCK_STREAM)
except: except:
""" """
The line can fail on Windows systems which aren't The line can fail on Windows systems which aren't
@ -92,8 +104,8 @@ class outgoingSynSender(threading.Thread):
continue continue
# This option apparently avoids the TIME_WAIT state so that we # This option apparently avoids the TIME_WAIT state so that we
# can rebind faster # can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20) self.sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2: if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
logger.debug('Trying an outgoing connection to ' + str(peer)) logger.debug('Trying an outgoing connection to ' + str(peer))
@ -113,10 +125,10 @@ class outgoingSynSender(threading.Thread):
'bitmessagesettings', 'socksusername') 'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get( sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword') 'bitmessagesettings', 'sockspassword')
sock.setproxy( self.sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else: else:
sock.setproxy( self.sock.setproxy(
proxytype, sockshostname, socksport, rdns) proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if shared.verbose >= 2: if shared.verbose >= 2:
@ -133,19 +145,19 @@ class outgoingSynSender(threading.Thread):
'bitmessagesettings', 'socksusername') 'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get( sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword') 'bitmessagesettings', 'sockspassword')
sock.setproxy( self.sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else: else:
sock.setproxy( self.sock.setproxy(
proxytype, sockshostname, socksport, rdns) proxytype, sockshostname, socksport, rdns)
try: try:
sock.connect((peer.host, peer.port)) self.sock.connect((peer.host, peer.port))
rd = receiveDataThread() rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left rd.daemon = True # close the main program even if there are threads left
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
rd.setup(sock, rd.setup(self.sock,
peer.host, peer.host,
peer.port, peer.port,
self.streamNumber, self.streamNumber,
@ -157,7 +169,7 @@ class outgoingSynSender(threading.Thread):
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)
sd.setup(sock, peer.host, peer.port, self.streamNumber, sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware) someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start() sd.start()
sd.sendVersionMessage() sd.sendVersionMessage()
@ -221,4 +233,4 @@ class outgoingSynSender(threading.Thread):
except Exception as err: except Exception as err:
import traceback import traceback
logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:') logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:')
time.sleep(0.1) self.stop.wait(0.1)

10
src/helper_threading.py Normal file
View File

@ -0,0 +1,10 @@
import threading
class StoppableThread(object):
def initStop(self):
self.stop = threading.Event()
self._stopped = False
def stopThread(self):
self._stopped = True
self.stop.set()

View File

@ -32,6 +32,7 @@ import highlevelcrypto
import shared import shared
#import helper_startup #import helper_startup
from helper_sql import * from helper_sql import *
from helper_threading import *
config = ConfigParser.SafeConfigParser() config = ConfigParser.SafeConfigParser()
@ -116,6 +117,9 @@ frozen = getattr(sys,'frozen', None)
# security. # security.
trustedPeer = None trustedPeer = None
# For UPnP
extPort = None
#Compiled struct for packing/unpacking headers #Compiled struct for packing/unpacking headers
#New code should use CreatePacket instead of Header.pack #New code should use CreatePacket instead of Header.pack
Header = Struct('!L12sL4s') Header = Struct('!L12sL4s')
@ -160,8 +164,10 @@ def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
payload += pack('>q', 1) # bitflags of the services I offer. payload += pack('>q', 1) # bitflags of the services I offer.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
payload += pack('>H', shared.config.getint( if safeConfigGetBoolean('bitmessagesettings', 'upnp' and extPort):
'bitmessagesettings', 'port')) payload += pack('>H', extPort)
else:
payload += pack('>H', shared.config.getint('bitmessagesettings', 'port'))
random.seed() random.seed()
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
@ -399,6 +405,11 @@ def doCleanShutdown():
# shutdown variable and exit. If the main thread closes before they do then they won't stop. # shutdown variable and exit. If the main thread closes before they do then they won't stop.
time.sleep(.25) time.sleep(.25)
from class_outgoingSynSender import outgoingSynSender
for thread in threading.enumerate():
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
if thread.isAlive() and isinstance(thread, StoppableThread):
thread.stopThread()
for thread in threading.enumerate(): for thread in threading.enumerate():
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name: if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
logger.debug("Waiting for thread %s", thread.name) logger.debug("Waiting for thread %s", thread.name)

View File

@ -4,6 +4,7 @@ import socket
from struct import unpack, pack from struct import unpack, pack
import threading import threading
import time import time
from helper_threading import *
import shared import shared
def createRequestXML(service, action, arguments=[]): def createRequestXML(service, action, arguments=[]):
@ -160,7 +161,7 @@ class Router:
raise UPnPError(errinfo[0].childNodes[0].data) raise UPnPError(errinfo[0].childNodes[0].data)
return resp return resp
class uPnPThread(threading.Thread): class uPnPThread(threading.Thread, StoppableThread):
def __init__ (self): def __init__ (self):
threading.Thread.__init__(self, name="uPnPThread") threading.Thread.__init__(self, name="uPnPThread")
self.localPort = shared.config.getint('bitmessagesettings', 'port') self.localPort = shared.config.getint('bitmessagesettings', 'port')
@ -170,6 +171,7 @@ class uPnPThread(threading.Thread):
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
self.sock.settimeout(2) self.sock.settimeout(2)
self.sendSleep = 60 self.sendSleep = 60
self.initStop()
def run(self): def run(self):
from debug import logger from debug import logger
@ -205,6 +207,7 @@ class uPnPThread(threading.Thread):
for router in self.routers: for router in self.routers:
if router.extPort is not None: if router.extPort is not None:
self.deletePortMapping(router) self.deletePortMapping(router)
shared.extPort = None
logger.debug("UPnP thread done") logger.debug("UPnP thread done")
def sendSearchRouter(self): def sendSearchRouter(self):