@ -147,9 +147,9 @@ class receiveDataThread(threading.Thread):
with shared . printLock :
print ' remoteCommand ' , repr ( remoteCommand . replace ( ' \x00 ' , ' ' ) ) , ' from ' , self . peer
if remoteCommand == ' version \x00 \x00 \x00 \x00 \x00 ' :
if remoteCommand == ' version \x00 \x00 \x00 \x00 \x00 ' and not self . connectionIsOrWasFullyEstablished :
self . recversion ( self . data [ 24 : self . payloadLength + 24 ] )
elif remoteCommand == ' verack \x00 \x00 \x00 \x00 \x00 \x00 ' :
elif remoteCommand == ' verack \x00 \x00 \x00 \x00 \x00 \x00 ' and not self . connectionIsOrWasFullyEstablished :
self . recverack ( )
elif remoteCommand == ' addr \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 ' and self . connectionIsOrWasFullyEstablished :
self . recaddr ( self . data [ 24 : self . payloadLength + 24 ] )
@ -237,7 +237,12 @@ class receiveDataThread(threading.Thread):
self . connectionFullyEstablished ( )
def connectionFullyEstablished ( self ) :
if self . connectionIsOrWasFullyEstablished :
# there is no reason to run this function a second time
return
self . connectionIsOrWasFullyEstablished = True
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
self . sendDataThreadQueue . put ( ( 0 , ' connectionIsOrWasFullyEstablished ' , ' no data ' ) )
if not self . initiatedConnection :
shared . clientHasReceivedIncomingConnections = True
shared . UISignalQueue . put ( ( ' setStatusIcon ' , ' green ' ) )
@ -301,8 +306,9 @@ class receiveDataThread(threading.Thread):
self . sendinvMessageToJustThisOnePeer (
numberOfObjectsInInvMessage , payload )
# Self explanatory. Notice that there is also a broadcastinv function for
# broadcasting invs to everyone in our stream.
# Used to send a big inv message when the connection with a node is
# first fully established. Notice that there is also a broadcastinv
# function for broadcasting invs to everyone in our stream.
def sendinvMessageToJustThisOnePeer ( self , numberOfObjects , payload ) :
payload = encodeVarint ( numberOfObjects ) + payload
headerData = ' \xe9 \xbe \xb4 \xd9 ' # magic bits, slighly different from Bitcoin's magic bits.
@ -665,7 +671,10 @@ class receiveDataThread(threading.Thread):
print ' knownNodes currently has ' , len ( shared . knownNodes [ self . streamNumber ] ) , ' nodes for this stream. '
# Send a big addr message to our peer
# Send a huge addr message to our peer. This is only used
# when we fully establish a connection with a
# peer (with the full exchange of version and verack
# messages).
def sendaddr ( self ) :
addrsInMyStream = { }
addrsInChildStreamLeft = { }
@ -750,58 +759,68 @@ class receiveDataThread(threading.Thread):
if len ( data ) < 83 :
# This version message is unreasonably short. Forget it.
return
elif not self . verackSent :
self . remoteProtocolVersion , = unpack ( ' >L ' , data [ : 4 ] )
if self . remoteProtocolVersion < = 1 :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Closing connection to old protocol version 1 node: ' , self . peer
return
# print 'remoteProtocolVersion', self.remoteProtocolVersion
self . myExternalIP = socket . inet_ntoa ( data [ 40 : 44 ] )
# print 'myExternalIP', self.myExternalIP
self . remoteNodeIncomingPort , = unpack ( ' >H ' , data [ 70 : 72 ] )
# print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
useragentLength , lengthOfUseragentVarint = decodeVarint (
data [ 80 : 84 ] )
readPosition = 80 + lengthOfUseragentVarint
useragent = data [ readPosition : readPosition + useragentLength ]
readPosition + = useragentLength
numberOfStreamsInVersionMessage , lengthOfNumberOfStreamsInVersionMessage = decodeVarint (
data [ readPosition : ] )
readPosition + = lengthOfNumberOfStreamsInVersionMessage
self . streamNumber , lengthOfRemoteStreamNumber = decodeVarint (
data [ readPosition : ] )
if self . verackSent :
"""
We must have already processed the remote node ' s version message.
There might be a time in the future when we Do want to process
a new version message , like if the remote node wants to update
the streams in which they are interested . But for now we ' ll
ignore this version message
"""
return
self . remoteProtocolVersion , = unpack ( ' >L ' , data [ : 4 ] )
if self . remoteProtocolVersion < = 1 :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Remote node useragent: ' , useragent , ' stream number: ' , self . streamNumber
print ' Closing connection to old protocol version 1 node: ' , self . peer
return
# print 'remoteProtocolVersion', self.remoteProtocolVersion
self . myExternalIP = socket . inet_ntoa ( data [ 40 : 44 ] )
# print 'myExternalIP', self.myExternalIP
self . remoteNodeIncomingPort , = unpack ( ' >H ' , data [ 70 : 72 ] )
# print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
useragentLength , lengthOfUseragentVarint = decodeVarint (
data [ 80 : 84 ] )
readPosition = 80 + lengthOfUseragentVarint
useragent = data [ readPosition : readPosition + useragentLength ]
readPosition + = useragentLength
numberOfStreamsInVersionMessage , lengthOfNumberOfStreamsInVersionMessage = decodeVarint (
data [ readPosition : ] )
readPosition + = lengthOfNumberOfStreamsInVersionMessage
self . streamNumber , lengthOfRemoteStreamNumber = decodeVarint (
data [ readPosition : ] )
with shared . printLock :
print ' Remote node useragent: ' , useragent , ' stream number: ' , self . streamNumber
if self . streamNumber != 1 :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Closed connection to ' , self . peer , ' because they are interested in stream ' , self . streamNumber , ' . '
return
shared . connectedHostsList [
self . peer . host ] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
# If this was an incoming connection, then the sendData thread
# doesn't know the stream. We have to set it.
if not self . initiatedConnection :
shared . broadcastToSendDataQueues ( (
0 , ' setStreamNumber ' , ( self . peer , self . streamNumber ) ) )
if data [ 72 : 80 ] == shared . eightBytesOfRandomDataUsedToDetectConnectionsToSelf :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Closing connection to myself: ' , self . peer
return
self . sendDataThreadQueue . put ( ( 0 , ' setRemoteProtocolVersion ' , self . remoteProtocolVersion ) )
if self . streamNumber != 1 :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Closed connection to ' , self . peer , ' because they are interested in stream ' , self . streamNumber , ' . '
return
shared . connectedHostsList [
self . peer . host ] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
# If this was an incoming connection, then the sendData thread
# doesn't know the stream. We have to set it.
if not self . initiatedConnection :
self . sendDataThreadQueue . put ( ( 0 , ' setStreamNumber ' , self . streamNumber ) )
if data [ 72 : 80 ] == shared . eightBytesOfRandomDataUsedToDetectConnectionsToSelf :
shared . broadcastToSendDataQueues ( ( 0 , ' shutdown ' , self . peer ) )
with shared . printLock :
print ' Closing connection to myself: ' , self . peer
return
# The other peer's protocol version is of interest to the sendDataThread but we learn of it
# in this version message. Let us inform the sendDataThread.
self . sendDataThreadQueue . put ( ( 0 , ' setRemoteProtocolVersion ' , self . remoteProtocolVersion ) )
shared . knownNodesLock . acquire ( )
shared . knownNodes [ self . streamNumber ] [ shared . Peer ( self . peer . host , self . remoteNodeIncomingPort ) ] = int ( time . time ( ) )
shared . needToWriteKnownNodesToDisk = True
shared . knownNodesLock . release ( )
shared . knownNodesLock . acquire ( )
shared . knownNodes [ self . streamNumber ] [ shared . Peer ( self . peer . host , self . remoteNodeIncomingPort ) ] = int ( time . time ( ) )
shared . needToWriteKnownNodesToDisk = True
shared . knownNodesLock . release ( )
self . sendverack ( )
if self . initiatedConnection == False :
self . sendversion ( )
self . sendverack ( )
if self . initiatedConnection == False :
self . sendversion ( )
# Sends a version message
def sendversion ( self ) :