2013-06-22 01:49:50 +02:00
import threading
import time
import random
import shared
import socks
import socket
import sys
2013-06-24 21:51:01 +02:00
import tr
2013-06-22 01:49:50 +02:00
from class_sendDataThread import *
from class_receiveDataThread import *
# For each stream to which we connect, several outgoingSynSender threads
# will exist and will collectively create 8 connections with peers.
class outgoingSynSender ( threading . Thread ) :
def __init__ ( self ) :
threading . Thread . __init__ ( self )
def setup ( self , streamNumber , selfInitiatedConnections ) :
self . streamNumber = streamNumber
self . selfInitiatedConnections = selfInitiatedConnections
2014-02-06 14:16:07 +01:00
def _getPeer ( self ) :
# If the user has specified a trusted peer then we'll only
# ever connect to that. Otherwise we'll pick a random one from
# the known nodes
shared . knownNodesLock . acquire ( )
if shared . trustedPeer :
peer = shared . trustedPeer
shared . knownNodes [ self . streamNumber ] [ peer ] = time . time ( )
else :
peer , = random . sample ( shared . knownNodes [ self . streamNumber ] , 1 )
shared . knownNodesLock . release ( )
return peer
2013-06-22 01:49:50 +02:00
def run ( self ) :
2013-07-24 17:46:28 +02:00
while shared . safeConfigGetBoolean ( ' bitmessagesettings ' , ' dontconnect ' ) :
time . sleep ( 2 )
2013-08-28 04:29:39 +02:00
while shared . safeConfigGetBoolean ( ' bitmessagesettings ' , ' sendoutgoingconnections ' ) :
2014-02-06 14:16:07 +01:00
maximumConnections = 1 if shared . trustedPeer else 8 # maximum number of outgoing connections = 8
while len ( self . selfInitiatedConnections [ self . streamNumber ] ) > = maximumConnections :
2013-06-22 01:49:50 +02:00
time . sleep ( 10 )
if shared . shutdown :
break
random . seed ( )
2014-02-06 14:16:07 +01:00
peer = self . _getPeer ( )
2013-06-24 21:51:01 +02:00
shared . alreadyAttemptedConnectionsListLock . acquire ( )
2013-08-01 18:16:31 +02:00
while peer in shared . alreadyAttemptedConnectionsList or peer . host in shared . connectedHostsList :
2013-06-24 21:51:01 +02:00
shared . alreadyAttemptedConnectionsListLock . release ( )
2013-06-22 01:49:50 +02:00
# print 'choosing new sample'
random . seed ( )
2014-02-06 14:16:07 +01:00
peer = self . _getPeer ( )
2013-06-22 01:49:50 +02:00
time . sleep ( 1 )
2013-06-24 21:51:01 +02:00
# Clear out the shared.alreadyAttemptedConnectionsList every half
2013-06-22 01:49:50 +02:00
# hour so that this program will again attempt a connection
# to any nodes, even ones it has already tried.
2013-06-24 21:51:01 +02:00
if ( time . time ( ) - shared . alreadyAttemptedConnectionsListResetTime ) > 1800 :
shared . alreadyAttemptedConnectionsList . clear ( )
shared . alreadyAttemptedConnectionsListResetTime = int (
2013-06-22 01:49:50 +02:00
time . time ( ) )
2013-06-24 21:51:01 +02:00
shared . alreadyAttemptedConnectionsListLock . acquire ( )
2013-07-30 22:23:18 +02:00
shared . alreadyAttemptedConnectionsList [ peer ] = 0
2013-06-24 21:51:01 +02:00
shared . alreadyAttemptedConnectionsListLock . release ( )
2013-07-30 22:23:18 +02:00
timeNodeLastSeen = shared . knownNodes [
self . streamNumber ] [ peer ]
2013-06-22 01:49:50 +02:00
sock = socks . socksocket ( socket . AF_INET , socket . SOCK_STREAM )
# This option apparently avoids the TIME_WAIT state so that we
# can rebind faster
sock . setsockopt ( socket . SOL_SOCKET , socket . SO_REUSEADDR , 1 )
sock . settimeout ( 20 )
2013-06-24 21:51:01 +02:00
if shared . config . get ( ' bitmessagesettings ' , ' socksproxytype ' ) == ' none ' and shared . verbose > = 2 :
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' Trying an outgoing connection to ' , peer
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared . config . get ( ' bitmessagesettings ' , ' socksproxytype ' ) == ' SOCKS4a ' :
2013-06-24 21:51:01 +02:00
if shared . verbose > = 2 :
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' (Using SOCKS4a) Trying an outgoing connection to ' , peer
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
proxytype = socks . PROXY_TYPE_SOCKS4
sockshostname = shared . config . get (
' bitmessagesettings ' , ' sockshostname ' )
socksport = shared . config . getint (
' bitmessagesettings ' , ' socksport ' )
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared . config . getboolean ( ' bitmessagesettings ' , ' socksauthentication ' ) :
socksusername = shared . config . get (
' bitmessagesettings ' , ' socksusername ' )
sockspassword = shared . config . get (
' bitmessagesettings ' , ' sockspassword ' )
sock . setproxy (
proxytype , sockshostname , socksport , rdns , socksusername , sockspassword )
else :
sock . setproxy (
proxytype , sockshostname , socksport , rdns )
elif shared . config . get ( ' bitmessagesettings ' , ' socksproxytype ' ) == ' SOCKS5 ' :
2013-06-24 21:51:01 +02:00
if shared . verbose > = 2 :
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' (Using SOCKS5) Trying an outgoing connection to ' , peer
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
proxytype = socks . PROXY_TYPE_SOCKS5
sockshostname = shared . config . get (
' bitmessagesettings ' , ' sockshostname ' )
socksport = shared . config . getint (
' bitmessagesettings ' , ' socksport ' )
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared . config . getboolean ( ' bitmessagesettings ' , ' socksauthentication ' ) :
socksusername = shared . config . get (
' bitmessagesettings ' , ' socksusername ' )
sockspassword = shared . config . get (
' bitmessagesettings ' , ' sockspassword ' )
sock . setproxy (
proxytype , sockshostname , socksport , rdns , socksusername , sockspassword )
else :
sock . setproxy (
proxytype , sockshostname , socksport , rdns )
try :
2013-07-30 22:23:18 +02:00
sock . connect ( ( peer . host , peer . port ) )
2013-06-22 01:49:50 +02:00
rd = receiveDataThread ( )
rd . daemon = True # close the main program even if there are threads left
2013-06-24 21:51:01 +02:00
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = { } # This is not necessairly a complete list; we clear it from time to time to save memory.
2013-12-30 04:36:23 +01:00
sendDataThreadQueue = Queue . Queue ( ) # Used to submit information to the send data thread for this connection.
rd . setup ( sock ,
peer . host ,
peer . port ,
self . streamNumber ,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware ,
self . selfInitiatedConnections ,
sendDataThreadQueue )
2013-06-22 01:49:50 +02:00
rd . start ( )
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print self , ' connected to ' , peer , ' during an outgoing attempt. '
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
2013-12-30 04:36:23 +01:00
sd = sendDataThread ( sendDataThreadQueue )
2013-07-30 22:23:18 +02:00
sd . setup ( sock , peer . host , peer . port , self . streamNumber ,
2013-06-24 21:51:01 +02:00
someObjectsOfWhichThisRemoteNodeIsAlreadyAware )
2013-06-22 01:49:50 +02:00
sd . start ( )
sd . sendVersionMessage ( )
except socks . GeneralProxyError as err :
2013-06-24 21:51:01 +02:00
if shared . verbose > = 2 :
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' Could NOT connect to ' , peer , ' during outgoing attempt. ' , err
2013-06-29 19:29:35 +02:00
2013-07-30 22:23:18 +02:00
timeLastSeen = shared . knownNodes [
self . streamNumber ] [ peer ]
2013-06-22 01:49:50 +02:00
if ( int ( time . time ( ) ) - timeLastSeen ) > 172800 and len ( shared . knownNodes [ self . streamNumber ] ) > 1000 : # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
shared . knownNodesLock . acquire ( )
2013-07-30 22:23:18 +02:00
del shared . knownNodes [ self . streamNumber ] [ peer ]
2013-06-22 01:49:50 +02:00
shared . knownNodesLock . release ( )
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' deleting ' , peer , ' from shared.knownNodes because it is more than 48 hours old and we could not connect to it. '
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
except socks . Socks5AuthError as err :
shared . UISignalQueue . put ( (
2013-06-24 21:51:01 +02:00
' updateStatusBar ' , tr . translateText (
2013-06-22 01:49:50 +02:00
" MainWindow " , " SOCKS5 Authentication problem: % 1 " ) . arg ( str ( err ) ) ) )
except socks . Socks5Error as err :
pass
print ' SOCKS5 error. (It is possible that the server wants authentication).) ' , str ( err )
except socks . Socks4Error as err :
print ' Socks4Error: ' , err
except socket . error as err :
if shared . config . get ( ' bitmessagesettings ' , ' socksproxytype ' ) [ 0 : 5 ] == ' SOCKS ' :
print ' Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str ( err )
else :
2013-06-24 21:51:01 +02:00
if shared . verbose > = 1 :
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' Could NOT connect to ' , peer , ' during outgoing attempt. ' , err
2013-06-29 19:29:35 +02:00
2013-07-30 22:23:18 +02:00
timeLastSeen = shared . knownNodes [
self . streamNumber ] [ peer ]
2013-06-22 01:49:50 +02:00
if ( int ( time . time ( ) ) - timeLastSeen ) > 172800 and len ( shared . knownNodes [ self . streamNumber ] ) > 1000 : # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
shared . knownNodesLock . acquire ( )
2013-07-30 22:23:18 +02:00
del shared . knownNodes [ self . streamNumber ] [ peer ]
2013-06-22 01:49:50 +02:00
shared . knownNodesLock . release ( )
2013-06-29 19:29:35 +02:00
with shared . printLock :
2013-07-30 22:23:18 +02:00
print ' deleting ' , peer , ' from knownNodes because it is more than 48 hours old and we could not connect to it. '
2013-06-29 19:29:35 +02:00
2013-06-22 01:49:50 +02:00
except Exception as err :
sys . stderr . write (
' An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ' )
import traceback
traceback . print_exc ( )
time . sleep ( 0.1 )