Threads close better
- UPnP and outgoingSynSender threads close slightly better. - extPort initialisation was missing
This commit is contained in:
parent
34dd5b3793
commit
9065910a38
|
@ -2,6 +2,7 @@ import threading
|
||||||
import time
|
import time
|
||||||
import random
|
import random
|
||||||
import shared
|
import shared
|
||||||
|
import select
|
||||||
import socks
|
import socks
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
|
@ -9,14 +10,16 @@ import tr
|
||||||
|
|
||||||
from class_sendDataThread import *
|
from class_sendDataThread import *
|
||||||
from class_receiveDataThread import *
|
from class_receiveDataThread import *
|
||||||
|
from helper_threading import *
|
||||||
|
|
||||||
# For each stream to which we connect, several outgoingSynSender threads
|
# For each stream to which we connect, several outgoingSynSender threads
|
||||||
# will exist and will collectively create 8 connections with peers.
|
# will exist and will collectively create 8 connections with peers.
|
||||||
|
|
||||||
class outgoingSynSender(threading.Thread):
|
class outgoingSynSender(threading.Thread, StoppableThread):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="outgoingSynSender")
|
threading.Thread.__init__(self, name="outgoingSynSender")
|
||||||
|
self.initStop()
|
||||||
|
|
||||||
def setup(self, streamNumber, selfInitiatedConnections):
|
def setup(self, streamNumber, selfInitiatedConnections):
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
|
@ -35,15 +38,22 @@ class outgoingSynSender(threading.Thread):
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
|
|
||||||
return peer
|
return peer
|
||||||
|
|
||||||
|
def stopThread(self):
|
||||||
|
super(outgoingSynSender, self).stopThread()
|
||||||
|
try:
|
||||||
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'):
|
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect') and not self._stopped:
|
||||||
time.sleep(2)
|
self.stop.wait(2)
|
||||||
while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'):
|
while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections') and not self._stopped:
|
||||||
self.name = "outgoingSynSender"
|
self.name = "outgoingSynSender"
|
||||||
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8
|
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8
|
||||||
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
|
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
|
||||||
time.sleep(10)
|
self.stop.wait(10)
|
||||||
if shared.shutdown:
|
if shared.shutdown:
|
||||||
break
|
break
|
||||||
random.seed()
|
random.seed()
|
||||||
|
@ -54,7 +64,9 @@ class outgoingSynSender(threading.Thread):
|
||||||
# print 'choosing new sample'
|
# print 'choosing new sample'
|
||||||
random.seed()
|
random.seed()
|
||||||
peer = self._getPeer()
|
peer = self._getPeer()
|
||||||
time.sleep(1)
|
self.stop.wait(1)
|
||||||
|
if shared.shutdown:
|
||||||
|
break
|
||||||
# Clear out the shared.alreadyAttemptedConnectionsList every half
|
# Clear out the shared.alreadyAttemptedConnectionsList every half
|
||||||
# hour so that this program will again attempt a connection
|
# hour so that this program will again attempt a connection
|
||||||
# to any nodes, even ones it has already tried.
|
# to any nodes, even ones it has already tried.
|
||||||
|
@ -71,7 +83,7 @@ class outgoingSynSender(threading.Thread):
|
||||||
else:
|
else:
|
||||||
address_family = socket.AF_INET6
|
address_family = socket.AF_INET6
|
||||||
try:
|
try:
|
||||||
sock = socks.socksocket(address_family, socket.SOCK_STREAM)
|
self.sock = socks.socksocket(address_family, socket.SOCK_STREAM)
|
||||||
except:
|
except:
|
||||||
"""
|
"""
|
||||||
The line can fail on Windows systems which aren't
|
The line can fail on Windows systems which aren't
|
||||||
|
@ -92,8 +104,8 @@ class outgoingSynSender(threading.Thread):
|
||||||
continue
|
continue
|
||||||
# This option apparently avoids the TIME_WAIT state so that we
|
# This option apparently avoids the TIME_WAIT state so that we
|
||||||
# can rebind faster
|
# can rebind faster
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
sock.settimeout(20)
|
self.sock.settimeout(20)
|
||||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
||||||
logger.debug('Trying an outgoing connection to ' + str(peer))
|
logger.debug('Trying an outgoing connection to ' + str(peer))
|
||||||
|
|
||||||
|
@ -113,10 +125,10 @@ class outgoingSynSender(threading.Thread):
|
||||||
'bitmessagesettings', 'socksusername')
|
'bitmessagesettings', 'socksusername')
|
||||||
sockspassword = shared.config.get(
|
sockspassword = shared.config.get(
|
||||||
'bitmessagesettings', 'sockspassword')
|
'bitmessagesettings', 'sockspassword')
|
||||||
sock.setproxy(
|
self.sock.setproxy(
|
||||||
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
|
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
|
||||||
else:
|
else:
|
||||||
sock.setproxy(
|
self.sock.setproxy(
|
||||||
proxytype, sockshostname, socksport, rdns)
|
proxytype, sockshostname, socksport, rdns)
|
||||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
|
@ -133,19 +145,19 @@ class outgoingSynSender(threading.Thread):
|
||||||
'bitmessagesettings', 'socksusername')
|
'bitmessagesettings', 'socksusername')
|
||||||
sockspassword = shared.config.get(
|
sockspassword = shared.config.get(
|
||||||
'bitmessagesettings', 'sockspassword')
|
'bitmessagesettings', 'sockspassword')
|
||||||
sock.setproxy(
|
self.sock.setproxy(
|
||||||
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
|
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
|
||||||
else:
|
else:
|
||||||
sock.setproxy(
|
self.sock.setproxy(
|
||||||
proxytype, sockshostname, socksport, rdns)
|
proxytype, sockshostname, socksport, rdns)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sock.connect((peer.host, peer.port))
|
self.sock.connect((peer.host, peer.port))
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
rd.daemon = True # close the main program even if there are threads left
|
rd.daemon = True # close the main program even if there are threads left
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
||||||
rd.setup(sock,
|
rd.setup(self.sock,
|
||||||
peer.host,
|
peer.host,
|
||||||
peer.port,
|
peer.port,
|
||||||
self.streamNumber,
|
self.streamNumber,
|
||||||
|
@ -157,7 +169,7 @@ class outgoingSynSender(threading.Thread):
|
||||||
|
|
||||||
|
|
||||||
sd = sendDataThread(sendDataThreadQueue)
|
sd = sendDataThread(sendDataThreadQueue)
|
||||||
sd.setup(sock, peer.host, peer.port, self.streamNumber,
|
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
sd.sendVersionMessage()
|
sd.sendVersionMessage()
|
||||||
|
@ -221,4 +233,4 @@ class outgoingSynSender(threading.Thread):
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
import traceback
|
import traceback
|
||||||
logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:')
|
logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:')
|
||||||
time.sleep(0.1)
|
self.stop.wait(0.1)
|
||||||
|
|
10
src/helper_threading.py
Normal file
10
src/helper_threading.py
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
import threading
|
||||||
|
|
||||||
|
class StoppableThread(object):
|
||||||
|
def initStop(self):
|
||||||
|
self.stop = threading.Event()
|
||||||
|
self._stopped = False
|
||||||
|
|
||||||
|
def stopThread(self):
|
||||||
|
self._stopped = True
|
||||||
|
self.stop.set()
|
|
@ -32,6 +32,7 @@ import highlevelcrypto
|
||||||
import shared
|
import shared
|
||||||
#import helper_startup
|
#import helper_startup
|
||||||
from helper_sql import *
|
from helper_sql import *
|
||||||
|
from helper_threading import *
|
||||||
|
|
||||||
|
|
||||||
config = ConfigParser.SafeConfigParser()
|
config = ConfigParser.SafeConfigParser()
|
||||||
|
@ -116,6 +117,9 @@ frozen = getattr(sys,'frozen', None)
|
||||||
# security.
|
# security.
|
||||||
trustedPeer = None
|
trustedPeer = None
|
||||||
|
|
||||||
|
# For UPnP
|
||||||
|
extPort = None
|
||||||
|
|
||||||
#Compiled struct for packing/unpacking headers
|
#Compiled struct for packing/unpacking headers
|
||||||
#New code should use CreatePacket instead of Header.pack
|
#New code should use CreatePacket instead of Header.pack
|
||||||
Header = Struct('!L12sL4s')
|
Header = Struct('!L12sL4s')
|
||||||
|
@ -160,7 +164,7 @@ def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
|
||||||
payload += pack('>q', 1) # bitflags of the services I offer.
|
payload += pack('>q', 1) # bitflags of the services I offer.
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
|
||||||
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
|
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
|
||||||
if safeConfigGetBoolean('bitmessagesettings', 'upnp'):
|
if safeConfigGetBoolean('bitmessagesettings', 'upnp' and extPort):
|
||||||
payload += pack('>H', extPort)
|
payload += pack('>H', extPort)
|
||||||
else:
|
else:
|
||||||
payload += pack('>H', shared.config.getint('bitmessagesettings', 'port'))
|
payload += pack('>H', shared.config.getint('bitmessagesettings', 'port'))
|
||||||
|
@ -401,6 +405,11 @@ def doCleanShutdown():
|
||||||
# shutdown variable and exit. If the main thread closes before they do then they won't stop.
|
# shutdown variable and exit. If the main thread closes before they do then they won't stop.
|
||||||
time.sleep(.25)
|
time.sleep(.25)
|
||||||
|
|
||||||
|
from class_outgoingSynSender import outgoingSynSender
|
||||||
|
for thread in threading.enumerate():
|
||||||
|
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
|
||||||
|
if thread.isAlive() and isinstance(thread, StoppableThread):
|
||||||
|
thread.stopThread()
|
||||||
for thread in threading.enumerate():
|
for thread in threading.enumerate():
|
||||||
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
|
if thread.name == "uPnPThread" or "outgoingSynSender" in thread.name:
|
||||||
logger.debug("Waiting for thread %s", thread.name)
|
logger.debug("Waiting for thread %s", thread.name)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import socket
|
||||||
from struct import unpack, pack
|
from struct import unpack, pack
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from helper_threading import *
|
||||||
import shared
|
import shared
|
||||||
|
|
||||||
def createRequestXML(service, action, arguments=[]):
|
def createRequestXML(service, action, arguments=[]):
|
||||||
|
@ -168,7 +169,7 @@ class Router:
|
||||||
raise UPnPError(errinfo[0].childNodes[0].data)
|
raise UPnPError(errinfo[0].childNodes[0].data)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
class uPnPThread(threading.Thread):
|
class uPnPThread(threading.Thread, StoppableThread):
|
||||||
def __init__ (self):
|
def __init__ (self):
|
||||||
threading.Thread.__init__(self, name="uPnPThread")
|
threading.Thread.__init__(self, name="uPnPThread")
|
||||||
self.localPort = shared.config.getint('bitmessagesettings', 'port')
|
self.localPort = shared.config.getint('bitmessagesettings', 'port')
|
||||||
|
@ -178,6 +179,7 @@ class uPnPThread(threading.Thread):
|
||||||
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
|
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
|
||||||
self.sock.settimeout(2)
|
self.sock.settimeout(2)
|
||||||
self.sendSleep = 60
|
self.sendSleep = 60
|
||||||
|
self.initStop()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
from debug import logger
|
from debug import logger
|
||||||
|
@ -213,6 +215,7 @@ class uPnPThread(threading.Thread):
|
||||||
for router in self.routers:
|
for router in self.routers:
|
||||||
if router.extPort is not None:
|
if router.extPort is not None:
|
||||||
self.deletePortMapping(router)
|
self.deletePortMapping(router)
|
||||||
|
shared.extPort = None
|
||||||
logger.debug("UPnP thread done")
|
logger.debug("UPnP thread done")
|
||||||
|
|
||||||
def sendSearchRouter(self):
|
def sendSearchRouter(self):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user