fast bootstrap
This commit is contained in:
parent
83678190fe
commit
5ba6c1233b
|
@ -60,47 +60,49 @@ from SimpleXMLRPCServer import *
|
|||
import json
|
||||
from subprocess import call #used when the API must execute an outside program
|
||||
|
||||
#For each stream to which we connect, one outgoingSynSender thread will exist and will create 8 connections with peers.
|
||||
#For each stream to which we connect, several outgoingSynSender threads will exist and will collectively create 8 connections with peers.
|
||||
class outgoingSynSender(QThread):
|
||||
def __init__(self, parent = None):
|
||||
QThread.__init__(self, parent)
|
||||
self.selfInitiatedConnectionList = [] #This is a list of current connections (the thread pointers at least)
|
||||
self.alreadyAttemptedConnectionsList = [] #This is a list of nodes to which we have already attempted a connection
|
||||
|
||||
def setup(self,streamNumber):
|
||||
self.streamNumber = streamNumber
|
||||
|
||||
|
||||
def run(self):
|
||||
time.sleep(1)
|
||||
resetTime = int(time.time()) #used below to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
|
||||
global alreadyAttemptedConnectionsListResetTime
|
||||
while True:
|
||||
#time.sleep(999999)#I sometimes use this to prevent connections for testing.
|
||||
if len(self.selfInitiatedConnectionList) < 8: #maximum number of outgoing connections = 8
|
||||
if len(selfInitiatedConnections[self.streamNumber]) < 8: #maximum number of outgoing connections = 8
|
||||
random.seed()
|
||||
HOST, = random.sample(knownNodes[self.streamNumber], 1)
|
||||
while HOST in self.alreadyAttemptedConnectionsList or HOST in connectedHostsList:
|
||||
alreadyAttemptedConnectionsListLock.acquire()
|
||||
while HOST in alreadyAttemptedConnectionsList or HOST in connectedHostsList:
|
||||
alreadyAttemptedConnectionsListLock.release()
|
||||
#print 'choosing new sample'
|
||||
random.seed()
|
||||
HOST, = random.sample(knownNodes[self.streamNumber], 1)
|
||||
time.sleep(1)
|
||||
#Clear out the alreadyAttemptedConnectionsList every half hour so that this program will again attempt a connection to any nodes, even ones it has already tried.
|
||||
if (int(time.time()) - resetTime) > 1800:
|
||||
self.alreadyAttemptedConnectionsList = []
|
||||
resetTime = int(time.time())
|
||||
self.alreadyAttemptedConnectionsList.append(HOST)
|
||||
if (time.time() - alreadyAttemptedConnectionsListResetTime) > 1800:
|
||||
alreadyAttemptedConnectionsList.clear()
|
||||
alreadyAttemptedConnectionsListResetTime = int(time.time())
|
||||
alreadyAttemptedConnectionsListLock.acquire()
|
||||
alreadyAttemptedConnectionsList[HOST] = 0
|
||||
alreadyAttemptedConnectionsListLock.release()
|
||||
PORT, timeNodeLastSeen = knownNodes[self.streamNumber][HOST]
|
||||
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(20)
|
||||
if config.get('bitmessagesettings', 'socksproxytype') == 'none':
|
||||
if config.get('bitmessagesettings', 'socksproxytype') == 'none' and verbose >= 2:
|
||||
printLock.acquire()
|
||||
print 'Trying an outgoing connection to', HOST, ':', PORT
|
||||
printLock.release()
|
||||
#sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
elif config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
||||
printLock.acquire()
|
||||
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
||||
printLock.release()
|
||||
if verbose >= 2:
|
||||
printLock.acquire()
|
||||
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
||||
printLock.release()
|
||||
proxytype = socks.PROXY_TYPE_SOCKS4
|
||||
sockshostname = config.get('bitmessagesettings', 'sockshostname')
|
||||
socksport = config.getint('bitmessagesettings', 'socksport')
|
||||
|
@ -112,9 +114,10 @@ class outgoingSynSender(QThread):
|
|||
else:
|
||||
sock.setproxy(proxytype, sockshostname, socksport, rdns)
|
||||
elif config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||
printLock.acquire()
|
||||
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
||||
printLock.release()
|
||||
if verbose >= 2:
|
||||
printLock.acquire()
|
||||
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
||||
printLock.release()
|
||||
proxytype = socks.PROXY_TYPE_SOCKS5
|
||||
sockshostname = config.get('bitmessagesettings', 'sockshostname')
|
||||
socksport = config.getint('bitmessagesettings', 'socksport')
|
||||
|
@ -131,7 +134,7 @@ class outgoingSynSender(QThread):
|
|||
rd = receiveDataThread()
|
||||
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
||||
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
|
||||
rd.setup(sock,HOST,PORT,self.streamNumber,self.selfInitiatedConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
rd.setup(sock,HOST,PORT,self.streamNumber,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
rd.start()
|
||||
printLock.acquire()
|
||||
print self, 'connected to', HOST, 'during an outgoing attempt.'
|
||||
|
@ -143,9 +146,10 @@ class outgoingSynSender(QThread):
|
|||
sd.sendVersionMessage()
|
||||
|
||||
except socks.GeneralProxyError, err:
|
||||
printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
printLock.release()
|
||||
if verbose >= 2:
|
||||
printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
printLock.release()
|
||||
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
|
||||
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
||||
knownNodesLock.acquire()
|
||||
|
@ -166,9 +170,10 @@ class outgoingSynSender(QThread):
|
|||
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. '+str(err)
|
||||
#self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Problem: Bitmessage can not connect to the SOCKS server. "+str(err))
|
||||
else:
|
||||
printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
printLock.release()
|
||||
if verbose >= 1:
|
||||
printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
printLock.release()
|
||||
PORT, timeLastSeen = knownNodes[self.streamNumber][HOST]
|
||||
if (int(time.time())-timeLastSeen) > 172800 and len(knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
||||
knownNodesLock.acquire()
|
||||
|
@ -198,7 +203,6 @@ class singleListener(QThread):
|
|||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind((HOST, PORT))
|
||||
sock.listen(2)
|
||||
self.incomingConnectionList = [] #This list isn't used for anything. The reason it exists is because receiveData threads expect that a list be passed to them. They expect this because the outgoingSynSender thread DOES use a similar list to keep track of the number of outgoing connections it has created.
|
||||
|
||||
|
||||
while True:
|
||||
|
@ -214,7 +218,7 @@ class singleListener(QThread):
|
|||
rd = receiveDataThread()
|
||||
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
||||
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
|
||||
rd.setup(a,HOST,PORT,-1,self.incomingConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
rd.setup(a,HOST,PORT,-1,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
printLock.acquire()
|
||||
print self, 'connected to', HOST,'during INCOMING request.'
|
||||
printLock.release()
|
||||
|
@ -233,23 +237,22 @@ class receiveDataThread(QThread):
|
|||
self.verackSent = False
|
||||
self.verackReceived = False
|
||||
|
||||
def setup(self,sock,HOST,port,streamNumber,selfInitiatedConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||
def setup(self,sock,HOST,port,streamNumber,objectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||
self.sock = sock
|
||||
self.HOST = HOST
|
||||
self.PORT = port
|
||||
self.sock.settimeout(600) #We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||
self.streamNumber = streamNumber
|
||||
self.selfInitiatedConnectionList = selfInitiatedConnectionList
|
||||
self.selfInitiatedConnectionList.append(self)
|
||||
self.payloadLength = 0 #This is the protocol payload length thus it doesn't include the 24 byte message header
|
||||
self.receivedgetbiginv = False #Gets set to true once we receive a getbiginv message from our peer. An abusive peer might request it too much so we use this variable to check whether they have already asked for a big inv message.
|
||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
|
||||
connectedHostsList[self.HOST] = 0 #The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that the outgoingSynSender thread doesn't try to connect to it.
|
||||
connectedHostsList[self.HOST] = 0 #The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||
self.connectionIsOrWasFullyEstablished = False #set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||
if self.streamNumber == -1: #This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||
self.initiatedConnection = False
|
||||
else:
|
||||
self.initiatedConnection = True
|
||||
selfInitiatedConnections[streamNumber][self] = 0
|
||||
self.ackDataThatWeHaveYetToSend = [] #When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
|
||||
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||
|
||||
|
@ -284,13 +287,14 @@ class receiveDataThread(QThread):
|
|||
except Exception, err:
|
||||
print 'Within receiveDataThread run(), self.sock.close() failed.', err
|
||||
|
||||
try:
|
||||
self.selfInitiatedConnectionList.remove(self)
|
||||
printLock.acquire()
|
||||
print 'removed self (a receiveDataThread) from ConnectionList'
|
||||
printLock.release()
|
||||
except:
|
||||
pass
|
||||
#try:
|
||||
del selfInitiatedConnections[streamNumber][self]
|
||||
#self.selfInitiatedConnectionList.remove(self)
|
||||
printLock.acquire()
|
||||
print 'removed self (a receiveDataThread) from ConnectionList'
|
||||
printLock.release()
|
||||
#except:
|
||||
# pass
|
||||
broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||
if self.connectionIsOrWasFullyEstablished: #We don't want to decrement the number of connections and show the result if we never incremented it in the first place (which we only do if the connection is fully established- meaning that both nodes accepted each other's version packets.)
|
||||
connectionsCountLock.acquire()
|
||||
|
@ -3681,7 +3685,7 @@ class MyForm(QtGui.QMainWindow):
|
|||
|
||||
self.rerenderComboBoxSendFrom()
|
||||
|
||||
self.listOfOutgoingSynSenderThreads = [] #if we don't maintain this list, the threads will get garbage-collected.
|
||||
|
||||
|
||||
self.connectToStream(1)
|
||||
|
||||
|
@ -4135,7 +4139,9 @@ class MyForm(QtGui.QMainWindow):
|
|||
self.ui.comboBoxSendFrom.setCurrentIndex(0)
|
||||
|
||||
def connectToStream(self,streamNumber):
|
||||
self.listOfOutgoingSynSenderThreads = [] #if we don't maintain this list, the threads will get garbage-collected.
|
||||
connectionsCount[streamNumber] = 0
|
||||
selfInitiatedConnections[streamNumber] = {}
|
||||
|
||||
#Add a line to the Connection Count table on the Network Status tab with a 'zero' connection count. This will be updated as necessary by another function.
|
||||
self.ui.tableWidgetConnectionCount.insertRow(0)
|
||||
|
@ -4146,12 +4152,13 @@ class MyForm(QtGui.QMainWindow):
|
|||
newItem.setFlags( QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled )
|
||||
self.ui.tableWidgetConnectionCount.setItem(0,1,newItem)
|
||||
|
||||
a = outgoingSynSender()
|
||||
self.listOfOutgoingSynSenderThreads.append(a)
|
||||
QtCore.QObject.connect(a, QtCore.SIGNAL("passObjectThrough(PyQt_PyObject)"), self.connectObjectToSignals)
|
||||
QtCore.QObject.connect(a, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
|
||||
a.setup(streamNumber)
|
||||
a.start()
|
||||
for i in range(32):
|
||||
a = outgoingSynSender()
|
||||
self.listOfOutgoingSynSenderThreads.append(a)
|
||||
QtCore.QObject.connect(a, QtCore.SIGNAL("passObjectThrough(PyQt_PyObject)"), self.connectObjectToSignals)
|
||||
QtCore.QObject.connect(a, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
|
||||
a.setup(streamNumber)
|
||||
a.start()
|
||||
|
||||
def connectObjectToSignals(self,object):
|
||||
QtCore.QObject.connect(object, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
|
||||
|
@ -5045,6 +5052,8 @@ class myTableWidgetItem(QTableWidgetItem):
|
|||
return int(self.data(33).toPyObject()) < int(other.data(33).toPyObject())
|
||||
|
||||
|
||||
selfInitiatedConnections = {} #This is a list of current connections (the thread pointers at least)
|
||||
alreadyAttemptedConnectionsList = {} #This is a list of nodes to which we have already attempted a connection
|
||||
sendDataQueues = [] #each sendData thread puts its queue in this list.
|
||||
myRSAAddressHashes = {}
|
||||
myECAddressHashes = {}
|
||||
|
@ -5061,13 +5070,15 @@ broadcastSendersForWhichImWatching = {}
|
|||
statusIconColor = 'red'
|
||||
connectionsCount = {} #Used for the 'network status' tab.
|
||||
connectionsCountLock = threading.Lock()
|
||||
alreadyAttemptedConnectionsListLock = threading.Lock()
|
||||
inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
|
||||
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack('>Q',random.randrange(1, 18446744073709551615))
|
||||
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender thread won't connect to the same remote node twice.
|
||||
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
|
||||
neededPubkeys = {}
|
||||
successfullyDecryptMessageTimings = [] #A list of the amounts of time it took to successfully decrypt msg messages
|
||||
apiSignalQueue = Queue.Queue() #The singleAPI thread uses this queue to pass messages to a QT thread which can emit signals to do things like display a message in the UI.
|
||||
apiAddressGeneratorReturnQueue = Queue.Queue() #The address generator thread uses this queue to get information back to the API thread.
|
||||
alreadyAttemptedConnectionsListResetTime = int(time.time()) #used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
|
||||
|
||||
#These constants are not at the top because if changed they will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||
averageProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||
|
|
Loading…
Reference in New Issue
Block a user