2016-01-22 11:17:10 +01:00
doTimingAttackMitigation = False
2013-06-24 23:29:04 +02:00
2016-03-18 16:39:29 +01:00
import base64
2015-11-13 12:32:10 +01:00
import errno
2016-02-13 12:54:23 +01:00
import math
2013-06-22 00:29:04 +02:00
import time
import threading
import shared
2013-06-22 01:49:50 +02:00
import hashlib
2015-11-13 12:32:10 +01:00
import os
import select
2013-06-22 01:49:50 +02:00
import socket
import random
2015-11-13 12:32:10 +01:00
import ssl
2013-06-22 01:49:50 +02:00
from struct import unpack , pack
import sys
2014-08-27 09:14:32 +02:00
import traceback
2016-03-23 23:26:57 +01:00
from binascii import hexlify
2013-11-20 07:29:37 +01:00
#import string
#from subprocess import call # used when the API must execute an outside program
#from pyelliptic.openssl import OpenSSL
2013-06-22 01:49:50 +02:00
2013-11-20 07:29:37 +01:00
#import highlevelcrypto
2013-06-22 01:49:50 +02:00
from addresses import *
2016-02-13 12:54:23 +01:00
from class_objectHashHolder import objectHashHolder
2014-05-02 16:46:36 +02:00
from helper_generic import addDataPadding , isHostInPrivateIPRange
2015-03-09 07:35:32 +01:00
from helper_sql import sqlQuery
2013-09-04 04:45:45 +02:00
from debug import logger
2013-06-22 00:29:04 +02:00
# This thread is created either by the synSenderThread(for outgoing
2013-11-20 07:29:37 +01:00
# connections) or the singleListenerThread(for incoming connections).
2013-06-22 00:29:04 +02:00
class receiveDataThread ( threading . Thread ) :
def __init__ ( self ) :
2015-11-18 16:22:17 +01:00
threading . Thread . __init__ ( self , name = " receiveData " )
2013-06-22 00:29:04 +02:00
self . data = ' '
self . verackSent = False
self . verackReceived = False
def setup (
self ,
sock ,
HOST ,
port ,
streamNumber ,
2013-12-30 04:36:23 +01:00
someObjectsOfWhichThisRemoteNodeIsAlreadyAware ,
selfInitiatedConnections ,
2016-02-18 00:53:13 +01:00
sendDataThreadQueue ,
objectHashHolderInstance ) :
2013-12-30 04:36:23 +01:00
2013-06-22 00:29:04 +02:00
self . sock = sock
2013-08-01 12:32:07 +02:00
self . peer = shared . Peer ( HOST , port )
2016-01-26 11:54:11 +01:00
self . name = " receiveData- " + self . peer . host . replace ( " : " , " . " ) # ":" log parser field separator
2013-06-22 00:29:04 +02:00
self . streamNumber = streamNumber
2013-09-04 04:45:45 +02:00
self . objectsThatWeHaveYetToGetFromThisPeer = { }
2013-06-22 01:49:50 +02:00
self . selfInitiatedConnections = selfInitiatedConnections
2013-12-30 04:36:23 +01:00
self . sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
2013-06-22 00:29:04 +02:00
shared . connectedHostsList [
2013-08-01 18:16:31 +02:00
self . peer . host ] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
2013-06-22 00:29:04 +02:00
self . connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
2015-11-13 12:32:10 +01:00
self . services = 0
2013-06-22 00:29:04 +02:00
if self . streamNumber == - 1 : # This was an incoming connection. Send out a version message if we accept the other node's version message.
self . initiatedConnection = False
else :
self . initiatedConnection = True
2013-06-22 01:49:50 +02:00
self . selfInitiatedConnections [ streamNumber ] [ self ] = 0
2013-06-24 21:51:01 +02:00
self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
2016-02-18 00:53:13 +01:00
self . objectHashHolderInstance = objectHashHolderInstance
2016-02-13 12:54:23 +01:00
self . startTime = time . time ( )
2013-06-22 00:29:04 +02:00
def run ( self ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' receiveDataThread starting. ID ' + str ( id ( self ) ) + ' . The size of the shared.connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
while True :
2014-09-10 22:47:51 +02:00
if shared . config . getint ( ' bitmessagesettings ' , ' maxdownloadrate ' ) == 0 :
downloadRateLimitBytes = float ( " inf " )
else :
downloadRateLimitBytes = shared . config . getint ( ' bitmessagesettings ' , ' maxdownloadrate ' ) * 1000
with shared . receiveDataLock :
while shared . numberOfBytesReceivedLastSecond > = downloadRateLimitBytes :
if int ( time . time ( ) ) == shared . lastTimeWeResetBytesReceived :
# If it's still the same second that it was last time then sleep.
time . sleep ( 0.3 )
else :
# It's a new second. Let us clear the shared.numberOfBytesReceivedLastSecond.
shared . lastTimeWeResetBytesReceived = int ( time . time ( ) )
shared . numberOfBytesReceivedLastSecond = 0
2013-07-02 17:43:54 +02:00
dataLen = len ( self . data )
2013-06-22 00:29:04 +02:00
try :
2015-11-22 16:43:53 +01:00
if ( ( self . services & shared . NODE_SSL == shared . NODE_SSL ) and
self . connectionIsOrWasFullyEstablished and
shared . haveSSL ( not self . initiatedConnection ) ) :
2015-11-13 12:32:10 +01:00
dataRecv = self . sslSock . recv ( 1024 )
else :
dataRecv = self . sock . recv ( 1024 )
2014-07-07 22:30:23 +02:00
self . data + = dataRecv
2014-09-10 22:47:51 +02:00
shared . numberOfBytesReceived + = len ( dataRecv ) # for the 'network status' UI tab. The UI clears this value whenever it updates.
shared . numberOfBytesReceivedLastSecond + = len ( dataRecv ) # for the download rate limit
2013-06-22 00:29:04 +02:00
except socket . timeout :
2015-11-18 16:22:17 +01:00
logger . error ( ' Timeout occurred waiting for data from ' + str ( self . peer ) + ' . Closing receiveData thread. (ID: ' + str ( id ( self ) ) + ' ) ' )
2013-06-22 00:29:04 +02:00
break
except Exception as err :
2015-11-13 17:01:09 +01:00
if ( sys . platform == ' win32 ' and err . errno in ( [ 2 , 10035 ] ) ) or ( sys . platform != ' win32 ' and err . errno == errno . EWOULDBLOCK ) :
2015-11-13 12:32:10 +01:00
select . select ( [ self . sslSock ] , [ ] , [ ] )
continue
2015-11-18 16:22:17 +01:00
logger . error ( ' sock.recv error. Closing receiveData thread ( ' + str ( self . peer ) + ' , Thread ID: ' + str ( id ( self ) ) + ' ). ' + str ( err . errno ) + " / " + str ( err ) )
2013-06-22 00:29:04 +02:00
break
# print 'Received', repr(self.data)
2013-07-05 22:56:49 +02:00
if len ( self . data ) == dataLen : # If self.sock.recv returned no data:
2015-11-18 16:22:17 +01:00
logger . debug ( ' Connection to ' + str ( self . peer ) + ' closed. Closing receiveData thread. (ID: ' + str ( id ( self ) ) + ' ) ' )
2013-06-22 00:29:04 +02:00
break
else :
self . processData ( )
try :
2013-06-22 01:49:50 +02:00
del self . selfInitiatedConnections [ self . streamNumber ] [ self ]
2015-11-18 16:22:17 +01:00
logger . debug ( ' removed self (a receiveDataThread) from selfInitiatedConnections ' )
2013-06-22 00:29:04 +02:00
except :
pass
2014-08-06 21:54:59 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) ) # commands the corresponding sendDataThread to shut itself down.
2013-06-22 00:29:04 +02:00
try :
2013-08-01 18:16:31 +02:00
del shared . connectedHostsList [ self . peer . host ]
2013-06-22 00:29:04 +02:00
except Exception as err :
2015-11-18 16:22:17 +01:00
logger . error ( ' Could not delete ' + str ( self . peer . host ) + ' from shared.connectedHostsList. ' + str ( err ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
try :
2013-09-04 04:45:45 +02:00
del shared . numberOfObjectsThatWeHaveYetToGetPerPeer [
2013-07-30 22:23:18 +02:00
self . peer ]
2013-06-22 00:29:04 +02:00
except :
pass
shared . UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
2015-11-18 16:22:17 +01:00
logger . debug ( ' receiveDataThread ending. ID ' + str ( id ( self ) ) + ' . The size of the shared.connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) )
2013-06-29 19:29:35 +02:00
2016-02-13 12:54:23 +01:00
def antiIntersectionDelay ( self , initial = False ) :
# estimated time for a small object to propagate across the whole network
delay = math . ceil ( math . log ( len ( shared . knownNodes [ self . streamNumber ] ) + 2 , 20 ) ) * ( 0.2 + objectHashHolder . size / 2 )
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
# 0.2 is avg message transmission time
now = time . time ( )
if initial and now - delay < self . startTime :
2016-02-18 00:53:13 +01:00
logger . debug ( " Initial sleeping for %.2f s " , delay - ( now - self . startTime ) )
2016-02-13 12:54:23 +01:00
time . sleep ( delay - ( now - self . startTime ) )
elif not initial :
2016-02-18 00:53:13 +01:00
logger . debug ( " Sleeping due to missing object for %.2f s " , delay )
2016-02-13 12:54:23 +01:00
time . sleep ( delay )
2013-06-22 00:29:04 +02:00
def processData ( self ) :
2014-05-22 17:57:48 +02:00
if len ( self . data ) < shared . Header . size : # if so little of the data has arrived that we can't even read the checksum then wait for more data.
2013-06-22 00:29:04 +02:00
return
2014-08-06 21:54:59 +02:00
magic , command , payloadLength , checksum = shared . Header . unpack ( self . data [ : shared . Header . size ] )
2014-05-22 17:57:48 +02:00
if magic != 0xE9BEB4D9 :
2013-06-22 00:29:04 +02:00
self . data = " "
return
2014-09-10 22:47:51 +02:00
if payloadLength > 1600100 : # ~1.6 MB which is the maximum possible size of an inv message.
2014-05-22 17:57:48 +02:00
logger . info ( ' The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s ' % payloadLength )
2014-08-06 21:54:59 +02:00
self . data = self . data [ payloadLength + shared . Header . size : ]
del magic , command , payloadLength , checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
2014-02-05 08:45:10 +01:00
self . processData ( )
return
2014-08-06 21:54:59 +02:00
if len ( self . data ) < payloadLength + shared . Header . size : # check if the whole message has arrived yet.
2013-06-22 00:29:04 +02:00
return
2014-08-06 21:54:59 +02:00
payload = self . data [ shared . Header . size : payloadLength + shared . Header . size ]
2014-07-15 00:01:56 +02:00
if checksum != hashlib . sha512 ( payload ) . digest ( ) [ 0 : 4 ] : # test the checksum in the message.
2015-11-18 16:22:17 +01:00
logger . error ( ' Checksum incorrect. Clearing this message. ' )
2014-08-06 21:54:59 +02:00
self . data = self . data [ payloadLength + shared . Header . size : ]
del magic , command , payloadLength , checksum , payload # better to clean up before the recursive call
2013-06-22 00:29:04 +02:00
self . processData ( )
return
2014-08-06 21:54:59 +02:00
2013-06-22 00:29:04 +02:00
# The time we've last seen this node is obviously right now since we
# just received valid data from it. So update the knownNodes list so
# that other peers can be made aware of its existance.
if self . initiatedConnection and self . connectionIsOrWasFullyEstablished : # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
2015-02-20 23:33:17 +01:00
with shared . knownNodesLock :
shared . knownNodes [ self . streamNumber ] [ self . peer ] = int ( time . time ( ) )
2014-02-05 08:45:10 +01:00
2014-05-22 17:57:48 +02:00
#Strip the nulls
command = command . rstrip ( ' \x00 ' )
2015-11-18 16:22:17 +01:00
logger . debug ( ' remoteCommand ' + repr ( command ) + ' from ' + str ( self . peer ) )
2014-05-22 17:57:48 +02:00
2014-08-27 09:14:32 +02:00
try :
#TODO: Use a dispatcher here
2014-09-10 22:47:51 +02:00
if command == ' error ' :
self . recerror ( payload )
elif not self . connectionIsOrWasFullyEstablished :
2014-08-27 09:14:32 +02:00
if command == ' version ' :
self . recversion ( payload )
elif command == ' verack ' :
self . recverack ( )
else :
if command == ' addr ' :
self . recaddr ( payload )
elif command == ' inv ' :
self . recinv ( payload )
elif command == ' getdata ' :
self . recgetdata ( payload )
elif command == ' object ' :
self . recobject ( payload )
elif command == ' ping ' :
self . sendpong ( payload )
#elif command == 'pong':
# pass
except varintDecodeError as e :
logger . debug ( " There was a problem with a varint while processing a message from the wire. Some details: %s " % e )
except Exception as e :
logger . critical ( " Critical error in a receiveDataThread: \n %s " % traceback . format_exc ( ) )
2014-08-06 21:54:59 +02:00
del payload
self . data = self . data [ payloadLength + shared . Header . size : ] # take this message out and then process the next message
2013-06-22 00:29:04 +02:00
2014-08-06 21:54:59 +02:00
if self . data == ' ' : # if there are no more messages
2013-09-04 04:45:45 +02:00
while len ( self . objectsThatWeHaveYetToGetFromThisPeer ) > 0 :
2013-09-04 00:08:29 +02:00
shared . numberOfInventoryLookupsPerformed + = 1
2013-06-22 00:29:04 +02:00
objectHash , = random . sample (
2013-09-04 04:45:45 +02:00
self . objectsThatWeHaveYetToGetFromThisPeer , 1 )
2013-06-22 00:29:04 +02:00
if objectHash in shared . inventory :
2016-03-18 02:01:59 +01:00
logger . debug ( ' Inventory already has object listed in inv message. ' )
del self . objectsThatWeHaveYetToGetFromThisPeer [ objectHash ]
2013-06-22 00:29:04 +02:00
else :
2014-08-06 21:54:59 +02:00
# We don't have the object in our inventory. Let's request it.
2013-06-22 00:29:04 +02:00
self . sendgetdata ( objectHash )
2013-09-04 04:45:45 +02:00
del self . objectsThatWeHaveYetToGetFromThisPeer [
2013-11-20 07:29:37 +01:00
objectHash ] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
2013-09-04 04:45:45 +02:00
if len ( self . objectsThatWeHaveYetToGetFromThisPeer ) == 0 :
2015-11-18 16:22:17 +01:00
logger . debug ( ' (concerning ' + str ( self . peer ) + ' ) number of objectsThatWeHaveYetToGetFromThisPeer is now 0 ' )
2013-06-22 00:29:04 +02:00
try :
2013-09-04 04:45:45 +02:00
del shared . numberOfObjectsThatWeHaveYetToGetPerPeer [
2013-07-30 22:23:18 +02:00
self . peer ] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
2013-06-22 00:29:04 +02:00
except :
pass
break
2013-09-04 04:45:45 +02:00
if len ( self . objectsThatWeHaveYetToGetFromThisPeer ) == 0 :
2014-08-06 21:54:59 +02:00
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
2015-11-18 16:22:17 +01:00
logger . debug ( ' (concerning ' + str ( self . peer ) + ' ) number of objectsThatWeHaveYetToGetFromThisPeer is now 0 ' )
2013-06-22 00:29:04 +02:00
try :
2013-09-04 04:45:45 +02:00
del shared . numberOfObjectsThatWeHaveYetToGetPerPeer [
2013-07-30 22:23:18 +02:00
self . peer ] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
2013-06-22 00:29:04 +02:00
except :
pass
2013-09-04 04:45:45 +02:00
if len ( self . objectsThatWeHaveYetToGetFromThisPeer ) > 0 :
2015-11-18 16:22:17 +01:00
logger . debug ( ' (concerning ' + str ( self . peer ) + ' ) number of objectsThatWeHaveYetToGetFromThisPeer is now ' + str ( len ( self . objectsThatWeHaveYetToGetFromThisPeer ) ) )
2013-06-29 19:29:35 +02:00
2013-09-04 04:45:45 +02:00
shared . numberOfObjectsThatWeHaveYetToGetPerPeer [ self . peer ] = len (
self . objectsThatWeHaveYetToGetFromThisPeer ) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
2013-06-22 00:29:04 +02:00
self . processData ( )
def sendpong ( self ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending pong ' )
2014-05-22 17:57:48 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' pong ' ) ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
def recverack ( self ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' verack received ' )
2013-06-22 00:29:04 +02:00
self . verackReceived = True
if self . verackSent :
# We have thus both sent and received a verack.
self . connectionFullyEstablished ( )
def connectionFullyEstablished ( self ) :
2014-04-30 21:39:25 +02:00
if self . connectionIsOrWasFullyEstablished :
# there is no reason to run this function a second time
return
2013-06-22 00:29:04 +02:00
self . connectionIsOrWasFullyEstablished = True
2015-11-13 12:32:10 +01:00
self . sslSock = self . sock
2015-11-22 22:44:58 +01:00
if ( ( self . services & shared . NODE_SSL == shared . NODE_SSL ) and
2015-11-22 16:43:53 +01:00
shared . haveSSL ( not self . initiatedConnection ) ) :
2015-11-22 22:44:58 +01:00
logger . debug ( " Initialising TLS " )
2015-11-13 17:01:09 +01:00
self . sslSock = ssl . wrap_socket ( self . sock , keyfile = os . path . join ( shared . codePath ( ) , ' sslkeys ' , ' key.pem ' ) , certfile = os . path . join ( shared . codePath ( ) , ' sslkeys ' , ' cert.pem ' ) , server_side = not self . initiatedConnection , ssl_version = ssl . PROTOCOL_TLSv1 , do_handshake_on_connect = False , ciphers = ' AECDH-AES256-SHA ' )
2015-11-15 02:20:08 +01:00
if hasattr ( self . sslSock , " context " ) :
self . sslSock . context . set_ecdh_curve ( " secp256k1 " )
2015-11-13 12:32:10 +01:00
while True :
try :
self . sslSock . do_handshake ( )
break
except ssl . SSLError as e :
if e . errno == 2 :
select . select ( [ self . sslSock ] , [ self . sslSock ] , [ ] )
else :
break
except :
break
2014-04-30 21:39:25 +02:00
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
2015-11-22 22:44:58 +01:00
self . sendDataThreadQueue . put ( ( 0 , ' connectionIsOrWasFullyEstablished ' , ( self . services , self . sslSock ) ) )
2015-11-13 12:32:10 +01:00
2013-06-22 00:29:04 +02:00
if not self . initiatedConnection :
2013-08-25 01:40:48 +02:00
shared . clientHasReceivedIncomingConnections = True
2013-06-22 00:29:04 +02:00
shared . UISignalQueue . put ( ( ' setStatusIcon ' , ' green ' ) )
self . sock . settimeout (
600 ) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
shared . UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
2015-11-18 16:22:17 +01:00
logger . debug ( ' Connection fully established with ' + str ( self . peer ) + " \n " + \
' The size of the connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) + " \n " + \
' The length of sendDataQueues is now: ' + str ( len ( shared . sendDataQueues ) ) + " \n " + \
' broadcasting addr from within connectionFullyEstablished function. ' )
2013-06-29 19:29:35 +02:00
2013-11-20 07:29:37 +01:00
# Let all of our peers know about this new node.
2013-09-10 01:26:32 +02:00
dataToSend = ( int ( time . time ( ) ) , self . streamNumber , 1 , self . peer . host , self . remoteNodeIncomingPort )
shared . broadcastToSendDataQueues ( (
self . streamNumber , ' advertisepeer ' , dataToSend ) )
2013-06-22 00:29:04 +02:00
self . sendaddr ( ) # This is one large addr message to this one peer.
if not self . initiatedConnection and len ( shared . connectedHostsList ) > 200 :
2015-11-18 16:22:17 +01:00
logger . info ( ' We are connected to too many people. Closing connection. ' )
2013-06-29 19:29:35 +02:00
2014-08-06 21:54:59 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
2013-06-22 00:29:04 +02:00
return
self . sendBigInv ( )
def sendBigInv ( self ) :
2014-08-27 09:14:32 +02:00
# Select all hashes for objects in this stream.
2013-06-22 00:29:04 +02:00
bigInvList = { }
2016-03-18 02:01:59 +01:00
for hash in shared . inventory . unexpired_hashes_by_stream ( self . streamNumber ) :
2016-02-18 00:53:13 +01:00
if hash not in self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self . objectHashHolderInstance . hasHash ( hash ) :
2013-06-22 00:29:04 +02:00
bigInvList [ hash ] = 0
numberOfObjectsInInvMessage = 0
payload = ' '
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash , storedValue in bigInvList . items ( ) :
payload + = hash
numberOfObjectsInInvMessage + = 1
2014-05-22 17:57:48 +02:00
if numberOfObjectsInInvMessage == 50000 : # We can only send a max of 50000 items per inv message but we may have more objects to advertise. They must be split up into multiple inv messages.
2013-06-22 00:29:04 +02:00
self . sendinvMessageToJustThisOnePeer (
numberOfObjectsInInvMessage , payload )
payload = ' '
numberOfObjectsInInvMessage = 0
if numberOfObjectsInInvMessage > 0 :
self . sendinvMessageToJustThisOnePeer (
numberOfObjectsInInvMessage , payload )
2014-04-30 21:39:25 +02:00
# Used to send a big inv message when the connection with a node is
# first fully established. Notice that there is also a broadcastinv
# function for broadcasting invs to everyone in our stream.
2013-06-22 00:29:04 +02:00
def sendinvMessageToJustThisOnePeer ( self , numberOfObjects , payload ) :
payload = encodeVarint ( numberOfObjects ) + payload
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending huge inv message with ' + str ( numberOfObjects ) + ' objects to just this one peer ' )
2014-05-22 17:57:48 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' inv ' , payload ) ) )
2013-06-29 19:29:35 +02:00
2014-02-06 14:16:07 +01:00
def _sleepForTimingAttackMitigation ( self , sleepTime ) :
# We don't need to do the timing attack mitigation if we are
# only connected to the trusted peer because we can trust the
# peer not to attack
if sleepTime > 0 and doTimingAttackMitigation and shared . trustedPeer == None :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Timing attack mitigation: Sleeping for ' + str ( sleepTime ) + ' seconds. ' )
2014-02-06 14:16:07 +01:00
time . sleep ( sleepTime )
2014-09-10 22:47:51 +02:00
def recerror ( self , data ) :
"""
The remote node has been polite enough to send you an error message .
"""
fatalStatus , readPosition = decodeVarint ( data [ : 10 ] )
banTime , banTimeLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
readPosition + = banTimeLength
inventoryVectorLength , inventoryVectorLengthLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
if inventoryVectorLength > 100 :
return
readPosition + = inventoryVectorLengthLength
inventoryVector = data [ readPosition : readPosition + inventoryVectorLength ]
readPosition + = inventoryVectorLength
errorTextLength , errorTextLengthLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
if errorTextLength > 1000 :
return
readPosition + = errorTextLengthLength
errorText = data [ readPosition : readPosition + errorTextLength ]
if fatalStatus == 0 :
fatalHumanFriendly = ' Warning '
elif fatalStatus == 1 :
fatalHumanFriendly = ' Error '
elif fatalStatus == 2 :
fatalHumanFriendly = ' Fatal '
message = ' %s message received from %s : %s . ' % ( fatalHumanFriendly , self . peer , errorText )
if inventoryVector :
2016-03-23 23:26:57 +01:00
message + = " This concerns object %s " % hexlify ( inventoryVector )
2014-09-10 22:47:51 +02:00
if banTime > 0 :
message + = " Remote node says that the ban time is %s " % banTime
logger . error ( message )
2013-06-22 00:29:04 +02:00
2013-11-20 07:29:37 +01:00
2014-08-27 09:14:32 +02:00
def recobject ( self , data ) :
2013-06-22 00:29:04 +02:00
self . messageProcessingStartTime = time . time ( )
2014-08-27 09:14:32 +02:00
lengthOfTimeWeShouldUseToProcessThisMessage = shared . checkAndShareObjectWithPeers ( data )
2013-11-20 07:29:37 +01:00
"""
2014-08-27 09:14:32 +02:00
Sleeping will help guarantee that we can process messages faster than a
remote node can send them . If we fall behind , the attacker could observe
that we are are slowing down the rate at which we request objects from the
2013-11-20 07:29:37 +01:00
network which would indicate that we own a particular address ( whichever
one to which they are sending all of their attack messages ) . Note
that if an attacker connects to a target with many connections , this
mitigation mechanism might not be sufficient .
"""
2014-08-27 09:14:32 +02:00
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - ( time . time ( ) - self . messageProcessingStartTime )
2014-02-06 14:16:07 +01:00
self . _sleepForTimingAttackMitigation ( sleepTime )
2014-08-27 09:14:32 +02:00
2013-06-22 00:29:04 +02:00
# We have received an inv message
def recinv ( self , data ) :
2014-08-27 09:14:32 +02:00
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates separately because they take up memory
2013-09-04 04:45:45 +02:00
if len ( shared . numberOfObjectsThatWeHaveYetToGetPerPeer ) > 0 :
for key , value in shared . numberOfObjectsThatWeHaveYetToGetPerPeer . items ( ) :
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers + = value
2015-11-18 16:22:17 +01:00
logger . debug ( ' number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: ' + str ( len ( shared . numberOfObjectsThatWeHaveYetToGetPerPeer ) ) + " \n " + \
' totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ' + str ( totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
numberOfItemsInInv , lengthOfVarint = decodeVarint ( data [ : 10 ] )
if numberOfItemsInInv > 50000 :
sys . stderr . write ( ' Too many items in inv message! ' )
return
if len ( data ) < lengthOfVarint + ( numberOfItemsInInv * 32 ) :
2015-11-18 16:22:17 +01:00
logger . info ( ' inv message doesn \' t contain enough data. Ignoring. ' )
2013-06-22 00:29:04 +02:00
return
if numberOfItemsInInv == 1 : # we'll just request this data from the person who advertised the object.
2015-11-19 19:48:18 +01:00
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len ( self . objectsThatWeHaveYetToGetFromThisPeer ) > 1000 and shared . trustedPeer == None : # inv flooding attack mitigation
2015-11-18 16:22:17 +01:00
logger . debug ( ' We already have ' + str ( totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers ) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message. ' )
2013-06-22 00:29:04 +02:00
return
2013-06-24 21:51:01 +02:00
self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware [
2013-06-22 00:29:04 +02:00
data [ lengthOfVarint : 32 + lengthOfVarint ] ] = 0
2013-09-04 00:08:29 +02:00
shared . numberOfInventoryLookupsPerformed + = 1
2013-06-22 00:29:04 +02:00
if data [ lengthOfVarint : 32 + lengthOfVarint ] in shared . inventory :
2016-03-18 02:01:59 +01:00
logger . debug ( ' Inventory has inventory item already. ' )
2013-06-22 00:29:04 +02:00
else :
self . sendgetdata ( data [ lengthOfVarint : 32 + lengthOfVarint ] )
else :
2013-09-04 04:45:45 +02:00
# There are many items listed in this inv message. Let us create a
# 'set' of objects we are aware of and a set of objects in this inv
# message so that we can diff one from the other cheaply.
startTime = time . time ( )
advertisedSet = set ( )
for i in range ( numberOfItemsInInv ) :
advertisedSet . add ( data [ lengthOfVarint + ( 32 * i ) : 32 + lengthOfVarint + ( 32 * i ) ] )
2016-03-18 02:01:59 +01:00
objectsNewToMe = advertisedSet - shared . inventory . hashes_by_stream ( self . streamNumber )
2013-09-04 04:45:45 +02:00
logger . info ( ' inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out. ' , numberOfItemsInInv , len ( objectsNewToMe ) , time . time ( ) - startTime )
for item in objectsNewToMe :
2015-11-19 19:48:18 +01:00
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len ( self . objectsThatWeHaveYetToGetFromThisPeer ) > 1000 and shared . trustedPeer == None : # inv flooding attack mitigation
2015-11-18 16:22:17 +01:00
logger . debug ( ' We already have ' + str ( totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers ) + ' items yet to retrieve from peers and over ' + str ( len ( self . objectsThatWeHaveYetToGetFromThisPeer ) ) , ' from this node in particular. Ignoring the rest of this inv message. ' )
2013-09-04 04:45:45 +02:00
break
self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware [ item ] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
self . objectsThatWeHaveYetToGetFromThisPeer [ item ] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
if len ( self . objectsThatWeHaveYetToGetFromThisPeer ) > 0 :
shared . numberOfObjectsThatWeHaveYetToGetPerPeer [
self . peer ] = len ( self . objectsThatWeHaveYetToGetFromThisPeer )
2013-06-22 00:29:04 +02:00
# Send a getdata message to our peer to request the object with the given
# hash
def sendgetdata ( self , hash ) :
2016-03-23 23:26:57 +01:00
logger . debug ( ' sending getdata to retrieve object with hash: ' + hexlify ( hash ) )
2013-06-22 00:29:04 +02:00
payload = ' \x01 ' + hash
2014-05-22 17:57:48 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' getdata ' , payload ) ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
# We have received a getdata request from our peer
def recgetdata ( self , data ) :
numberOfRequestedInventoryItems , lengthOfVarint = decodeVarint (
data [ : 10 ] )
if len ( data ) < lengthOfVarint + ( 32 * numberOfRequestedInventoryItems ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' getdata message does not contain enough data. Ignoring. ' )
2013-06-22 00:29:04 +02:00
return
2016-02-18 00:53:13 +01:00
self . antiIntersectionDelay ( True ) # only handle getdata requests if we have been connected long enough
2013-06-22 00:29:04 +02:00
for i in xrange ( numberOfRequestedInventoryItems ) :
hash = data [ lengthOfVarint + (
i * 32 ) : 32 + lengthOfVarint + ( i * 32 ) ]
2016-03-23 23:26:57 +01:00
logger . debug ( ' received getdata request for item: ' + hexlify ( hash ) )
2013-06-29 19:29:35 +02:00
2013-09-04 00:08:29 +02:00
shared . numberOfInventoryLookupsPerformed + = 1
2013-09-08 00:23:20 +02:00
shared . inventoryLock . acquire ( )
2016-02-18 00:53:13 +01:00
if self . objectHashHolderInstance . hasHash ( hash ) :
shared . inventoryLock . release ( )
self . antiIntersectionDelay ( )
2013-06-22 00:29:04 +02:00
else :
2013-09-08 00:23:20 +02:00
shared . inventoryLock . release ( )
2016-03-18 02:01:59 +01:00
if hash in shared . inventory :
self . sendObject ( shared . inventory [ hash ] . payload )
2013-06-22 00:29:04 +02:00
else :
2016-02-13 12:54:23 +01:00
self . antiIntersectionDelay ( )
2015-01-28 20:45:29 +01:00
logger . warning ( ' %s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it. ' % ( self . peer , ) )
2013-06-22 00:29:04 +02:00
# Our peer has requested (in a getdata message) that we send an object.
2014-08-27 09:14:32 +02:00
def sendObject ( self , payload ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' sending an object. ' )
2014-08-27 09:14:32 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' object ' , payload ) ) )
2013-06-29 19:29:35 +02:00
2016-01-26 12:04:06 +01:00
def _checkIPAddress ( self , host ) :
if host [ 0 : 12 ] == ' \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \xFF \xFF ' :
hostStandardFormat = socket . inet_ntop ( socket . AF_INET , host [ 12 : ] )
return self . _checkIPv4Address ( host [ 12 : ] , hostStandardFormat )
2016-03-18 16:39:29 +01:00
elif host [ 0 : 6 ] == ' \xfd \x87 \xd8 \x7e \xeb \x43 ' :
# Onion, based on BMD/bitcoind
hostStandardFormat = base64 . b32encode ( host [ 6 : ] ) . lower ( ) + " .onion "
return hostStandardFormat
2016-01-26 12:04:06 +01:00
else :
hostStandardFormat = socket . inet_ntop ( socket . AF_INET6 , host )
if hostStandardFormat == " " :
# This can happen on Windows systems which are not 64-bit compatible
# so let us drop the IPv6 address.
return False
return self . _checkIPv6Address ( host , hostStandardFormat )
2014-02-16 17:21:20 +01:00
2015-02-20 23:33:17 +01:00
def _checkIPv4Address ( self , host , hostStandardFormat ) :
2015-11-12 17:36:12 +01:00
if host [ 0 ] == ' \x7F ' : # 127/8
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring IP address in loopback range: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
2015-11-12 17:36:12 +01:00
if host [ 0 ] == ' \x0A ' : # 10/8
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring IP address in private range: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
2015-11-12 17:36:12 +01:00
if host [ 0 : 2 ] == ' \xC0 \xA8 ' : # 192.168/16
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring IP address in private range: ' + hostStandardFormat )
2015-11-12 17:36:12 +01:00
return False
if host [ 0 : 2 ] > = ' \xAC \x10 ' and host [ 0 : 2 ] < ' \xAC \x20 ' : # 172.16/12
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring IP address in private range: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
2016-01-26 12:04:06 +01:00
return hostStandardFormat
2014-02-16 17:21:20 +01:00
2015-02-20 23:33:17 +01:00
def _checkIPv6Address ( self , host , hostStandardFormat ) :
2014-02-16 17:21:20 +01:00
if host == ( ' \x00 ' * 15 ) + ' \x01 ' :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring loopback address: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
if host [ 0 ] == ' \xFE ' and ( ord ( host [ 1 ] ) & 0xc0 ) == 0x80 :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring local address: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
if ( ord ( host [ 0 ] ) & 0xfe ) == 0xfc :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Ignoring unique local address: ' + hostStandardFormat )
2014-02-16 17:21:20 +01:00
return False
2016-01-26 12:04:06 +01:00
return hostStandardFormat
2014-02-16 17:21:20 +01:00
2013-06-22 00:29:04 +02:00
# We have received an addr message.
def recaddr ( self , data ) :
numberOfAddressesIncluded , lengthOfNumberOfAddresses = decodeVarint (
data [ : 10 ] )
2013-06-24 21:51:01 +02:00
if shared . verbose > = 1 :
2015-11-18 16:22:17 +01:00
logger . debug ( ' addr message contains ' + str ( numberOfAddressesIncluded ) + ' IP addresses. ' )
2013-06-29 19:29:35 +02:00
2013-09-10 01:26:32 +02:00
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0 :
return
if len ( data ) != lengthOfNumberOfAddresses + ( 38 * numberOfAddressesIncluded ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' addr message does not contain the correct amount of data. Ignoring. ' )
2013-09-10 01:26:32 +02:00
return
2013-06-22 00:29:04 +02:00
2013-09-10 01:26:32 +02:00
for i in range ( 0 , numberOfAddressesIncluded ) :
2015-02-20 23:33:17 +01:00
fullHost = data [ 20 + lengthOfNumberOfAddresses + ( 38 * i ) : 36 + lengthOfNumberOfAddresses + ( 38 * i ) ]
recaddrStream , = unpack ( ' >I ' , data [ 8 + lengthOfNumberOfAddresses + (
38 * i ) : 12 + lengthOfNumberOfAddresses + ( 38 * i ) ] )
2013-09-10 01:26:32 +02:00
if recaddrStream == 0 :
continue
if recaddrStream != self . streamNumber and recaddrStream != ( self . streamNumber * 2 ) and recaddrStream != ( ( self . streamNumber * 2 ) + 1 ) : # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
continue
2015-02-20 23:33:17 +01:00
recaddrServices , = unpack ( ' >Q ' , data [ 12 + lengthOfNumberOfAddresses + (
38 * i ) : 20 + lengthOfNumberOfAddresses + ( 38 * i ) ] )
recaddrPort , = unpack ( ' >H ' , data [ 36 + lengthOfNumberOfAddresses + (
38 * i ) : 38 + lengthOfNumberOfAddresses + ( 38 * i ) ] )
2016-01-26 12:04:06 +01:00
hostStandardFormat = self . _checkIPAddress ( fullHost )
if hostStandardFormat is False :
continue
2016-02-20 11:14:42 +01:00
if recaddrPort == 0 :
continue
2013-09-10 01:26:32 +02:00
timeSomeoneElseReceivedMessageFromThisNode , = unpack ( ' >Q ' , data [ lengthOfNumberOfAddresses + (
38 * i ) : 8 + lengthOfNumberOfAddresses + ( 38 * i ) ] ) # This is the 'time' value in the received addr message. 64-bit.
if recaddrStream not in shared . knownNodes : # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
2015-02-20 23:33:17 +01:00
with shared . knownNodesLock :
shared . knownNodes [ recaddrStream ] = { }
peerFromAddrMessage = shared . Peer ( hostStandardFormat , recaddrPort )
2013-09-10 01:26:32 +02:00
if peerFromAddrMessage not in shared . knownNodes [ recaddrStream ] :
if len ( shared . knownNodes [ recaddrStream ] ) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > ( int ( time . time ( ) ) - 10800 ) and timeSomeoneElseReceivedMessageFromThisNode < ( int ( time . time ( ) ) + 10800 ) : # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
2015-02-20 23:33:17 +01:00
with shared . knownNodesLock :
shared . knownNodes [ recaddrStream ] [ peerFromAddrMessage ] = timeSomeoneElseReceivedMessageFromThisNode
2015-11-18 16:22:17 +01:00
logger . debug ( ' added new node ' + str ( peerFromAddrMessage ) + ' to knownNodes in stream ' + str ( recaddrStream ) )
2013-09-10 01:26:32 +02:00
shared . needToWriteKnownNodesToDisk = True
hostDetails = (
timeSomeoneElseReceivedMessageFromThisNode ,
2015-02-20 23:33:17 +01:00
recaddrStream , recaddrServices , hostStandardFormat , recaddrPort )
2013-09-10 01:26:32 +02:00
shared . broadcastToSendDataQueues ( (
self . streamNumber , ' advertisepeer ' , hostDetails ) )
else :
timeLastReceivedMessageFromThisNode = shared . knownNodes [ recaddrStream ] [
2015-02-20 23:33:17 +01:00
peerFromAddrMessage ]
if ( timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode ) and ( timeSomeoneElseReceivedMessageFromThisNode < int ( time . time ( ) ) + 900 ) : # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right.
with shared . knownNodesLock :
shared . knownNodes [ recaddrStream ] [ peerFromAddrMessage ] = timeSomeoneElseReceivedMessageFromThisNode
2015-11-18 16:22:17 +01:00
logger . debug ( ' knownNodes currently has ' + str ( len ( shared . knownNodes [ self . streamNumber ] ) ) + ' nodes for this stream. ' )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
2014-04-30 21:39:25 +02:00
# Send a huge addr message to our peer. This is only used
# when we fully establish a connection with a
# peer (with the full exchange of version and verack
# messages).
2013-06-22 00:29:04 +02:00
def sendaddr ( self ) :
addrsInMyStream = { }
addrsInChildStreamLeft = { }
addrsInChildStreamRight = { }
# print 'knownNodes', shared.knownNodes
# We are going to share a maximum number of 1000 addrs with our peer.
# 500 from this stream, 250 from the left child stream, and 250 from
# the right child stream.
2015-02-20 23:33:17 +01:00
with shared . knownNodesLock :
if len ( shared . knownNodes [ self . streamNumber ] ) > 0 :
for i in range ( 500 ) :
peer , = random . sample ( shared . knownNodes [ self . streamNumber ] , 1 )
if isHostInPrivateIPRange ( peer . host ) :
continue
addrsInMyStream [ peer ] = shared . knownNodes [
self . streamNumber ] [ peer ]
if len ( shared . knownNodes [ self . streamNumber * 2 ] ) > 0 :
for i in range ( 250 ) :
peer , = random . sample ( shared . knownNodes [
self . streamNumber * 2 ] , 1 )
if isHostInPrivateIPRange ( peer . host ) :
continue
addrsInChildStreamLeft [ peer ] = shared . knownNodes [
self . streamNumber * 2 ] [ peer ]
if len ( shared . knownNodes [ ( self . streamNumber * 2 ) + 1 ] ) > 0 :
for i in range ( 250 ) :
peer , = random . sample ( shared . knownNodes [
( self . streamNumber * 2 ) + 1 ] , 1 )
if isHostInPrivateIPRange ( peer . host ) :
continue
addrsInChildStreamRight [ peer ] = shared . knownNodes [
( self . streamNumber * 2 ) + 1 ] [ peer ]
2013-06-22 00:29:04 +02:00
numberOfAddressesInAddrMessage = 0
payload = ' '
# print 'addrsInMyStream.items()', addrsInMyStream.items()
2013-07-30 22:23:18 +02:00
for ( HOST , PORT ) , value in addrsInMyStream . items ( ) :
timeLastReceivedMessageFromThisNode = value
2013-06-24 21:51:01 +02:00
if timeLastReceivedMessageFromThisNode > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) : # If it is younger than 3 hours old..
2013-06-22 00:29:04 +02:00
numberOfAddressesInAddrMessage + = 1
payload + = pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = pack ( ' >I ' , self . streamNumber )
payload + = pack (
' >q ' , 1 ) # service bit flags offered by this node
2014-02-16 17:21:20 +01:00
payload + = shared . encodeHost ( HOST )
2013-06-22 00:29:04 +02:00
payload + = pack ( ' >H ' , PORT ) # remote port
2013-08-01 12:32:07 +02:00
for ( HOST , PORT ) , value in addrsInChildStreamLeft . items ( ) :
timeLastReceivedMessageFromThisNode = value
2013-06-24 21:51:01 +02:00
if timeLastReceivedMessageFromThisNode > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) : # If it is younger than 3 hours old..
2013-06-22 00:29:04 +02:00
numberOfAddressesInAddrMessage + = 1
payload + = pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = pack ( ' >I ' , self . streamNumber * 2 )
payload + = pack (
' >q ' , 1 ) # service bit flags offered by this node
2014-02-16 17:21:20 +01:00
payload + = shared . encodeHost ( HOST )
2013-06-22 00:29:04 +02:00
payload + = pack ( ' >H ' , PORT ) # remote port
2013-08-01 12:32:07 +02:00
for ( HOST , PORT ) , value in addrsInChildStreamRight . items ( ) :
timeLastReceivedMessageFromThisNode = value
2013-06-24 21:51:01 +02:00
if timeLastReceivedMessageFromThisNode > ( int ( time . time ( ) ) - shared . maximumAgeOfNodesThatIAdvertiseToOthers ) : # If it is younger than 3 hours old..
2013-06-22 00:29:04 +02:00
numberOfAddressesInAddrMessage + = 1
payload + = pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # 64-bit time
payload + = pack ( ' >I ' , ( self . streamNumber * 2 ) + 1 )
payload + = pack (
' >q ' , 1 ) # service bit flags offered by this node
2014-02-16 17:21:20 +01:00
payload + = shared . encodeHost ( HOST )
2013-06-22 00:29:04 +02:00
payload + = pack ( ' >H ' , PORT ) # remote port
payload = encodeVarint ( numberOfAddressesInAddrMessage ) + payload
2014-05-22 17:57:48 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' addr ' , payload ) ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
# We have received a version message
def recversion ( self , data ) :
if len ( data ) < 83 :
# This version message is unreasonably short. Forget it.
return
2014-04-30 21:39:25 +02:00
if self . verackSent :
"""
We must have already processed the remote node ' s version message.
There might be a time in the future when we Do want to process
a new version message , like if the remote node wants to update
the streams in which they are interested . But for now we ' ll
ignore this version message
"""
return
self . remoteProtocolVersion , = unpack ( ' >L ' , data [ : 4 ] )
2015-11-13 12:32:10 +01:00
self . services , = unpack ( ' >q ' , data [ 4 : 12 ] )
2014-08-27 09:14:32 +02:00
if self . remoteProtocolVersion < 3 :
2014-08-06 21:54:59 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
2015-11-18 16:22:17 +01:00
logger . debug ( ' Closing connection to old protocol version ' + str ( self . remoteProtocolVersion ) + ' node: ' + str ( self . peer ) )
2014-04-30 21:39:25 +02:00
return
2014-09-10 22:47:51 +02:00
timestamp , = unpack ( ' >Q ' , data [ 12 : 20 ] )
timeOffset = timestamp - int ( time . time ( ) )
if timeOffset > 3600 :
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . assembleErrorMessage ( fatal = 2 , errorText = " Your time is too far in the future compared to mine. Closing connection. " ) ) )
logger . info ( " %s ' s time is too far in the future ( %s seconds). Closing connection to it. " % ( self . peer , timeOffset ) )
time . sleep ( 2 )
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
return
if timeOffset < - 3600 :
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . assembleErrorMessage ( fatal = 2 , errorText = " Your time is too far in the past compared to mine. Closing connection. " ) ) )
logger . info ( " %s ' s time is too far in the past (timeOffset %s seconds). Closing connection to it. " % ( self . peer , timeOffset ) )
time . sleep ( 2 )
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
return
2014-04-30 21:39:25 +02:00
self . myExternalIP = socket . inet_ntoa ( data [ 40 : 44 ] )
# print 'myExternalIP', self.myExternalIP
self . remoteNodeIncomingPort , = unpack ( ' >H ' , data [ 70 : 72 ] )
# print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
useragentLength , lengthOfUseragentVarint = decodeVarint (
data [ 80 : 84 ] )
readPosition = 80 + lengthOfUseragentVarint
useragent = data [ readPosition : readPosition + useragentLength ]
2015-10-19 22:33:18 +02:00
# version check
2016-03-26 18:42:22 +01:00
try :
userAgentName , userAgentVersion = useragent [ 1 : - 1 ] . split ( " : " , 2 )
except :
userAgentName = useragent
userAgentVersion = " 0.0.0 "
2015-10-19 22:33:18 +02:00
if userAgentName == " PyBitmessage " :
myVersion = [ int ( n ) for n in shared . softwareVersion . split ( " . " ) ]
2016-03-26 18:42:22 +01:00
try :
remoteVersion = [ int ( n ) for n in userAgentVersion . split ( " . " ) ]
except :
remoteVersion = 0
2015-10-19 22:33:18 +02:00
# remote is newer, but do not cross between stable and unstable
2016-03-26 18:42:22 +01:00
try :
if cmp ( remoteVersion , myVersion ) > 0 and \
( myVersion [ 1 ] % 2 == remoteVersion [ 1 ] % 2 ) :
shared . UISignalQueue . put ( ( ' newVersionAvailable ' , remoteVersion ) )
except :
pass
2015-10-19 22:33:18 +02:00
2014-04-30 21:39:25 +02:00
readPosition + = useragentLength
numberOfStreamsInVersionMessage , lengthOfNumberOfStreamsInVersionMessage = decodeVarint (
data [ readPosition : ] )
readPosition + = lengthOfNumberOfStreamsInVersionMessage
self . streamNumber , lengthOfRemoteStreamNumber = decodeVarint (
data [ readPosition : ] )
2015-11-18 16:22:17 +01:00
logger . debug ( ' Remote node useragent: ' + useragent + ' stream number: ' + str ( self . streamNumber ) + ' time offset: ' + str ( timeOffset ) + ' seconds. ' )
2013-06-29 19:29:35 +02:00
2014-04-30 21:39:25 +02:00
if self . streamNumber != 1 :
2014-08-06 21:54:59 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
2015-11-18 16:22:17 +01:00
logger . debug ( ' Closed connection to ' + str ( self . peer ) + ' because they are interested in stream ' + str ( self . streamNumber ) + ' . ' )
2014-04-30 21:39:25 +02:00
return
shared . connectedHostsList [
self . peer . host ] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
2014-08-27 09:14:32 +02:00
# If this was an incoming connection, then the sendDataThread
2014-04-30 21:39:25 +02:00
# doesn't know the stream. We have to set it.
if not self . initiatedConnection :
self . sendDataThreadQueue . put ( ( 0 , ' setStreamNumber ' , self . streamNumber ) )
if data [ 72 : 80 ] == shared . eightBytesOfRandomDataUsedToDetectConnectionsToSelf :
2014-08-06 21:54:59 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
2015-11-18 16:22:17 +01:00
logger . debug ( ' Closing connection to myself: ' + str ( self . peer ) )
2014-04-30 21:39:25 +02:00
return
# The other peer's protocol version is of interest to the sendDataThread but we learn of it
# in this version message. Let us inform the sendDataThread.
self . sendDataThreadQueue . put ( ( 0 , ' setRemoteProtocolVersion ' , self . remoteProtocolVersion ) )
2013-06-22 00:29:04 +02:00
2016-01-26 12:04:06 +01:00
if not isHostInPrivateIPRange ( self . peer . host ) :
with shared . knownNodesLock :
shared . knownNodes [ self . streamNumber ] [ shared . Peer ( self . peer . host , self . remoteNodeIncomingPort ) ] = int ( time . time ( ) )
shared . needToWriteKnownNodesToDisk = True
2013-06-22 00:29:04 +02:00
2014-04-30 21:39:25 +02:00
self . sendverack ( )
if self . initiatedConnection == False :
self . sendversion ( )
2013-06-22 00:29:04 +02:00
# Sends a version message
def sendversion ( self ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending version message ' )
2013-12-30 04:36:23 +01:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . assembleVersionMessage (
2015-11-22 17:04:51 +01:00
self . peer . host , self . peer . port , self . streamNumber , not self . initiatedConnection ) ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
# Sends a verack message
def sendverack ( self ) :
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending verack ' )
2014-05-22 17:57:48 +02:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , shared . CreatePacket ( ' verack ' ) ) )
2013-06-22 00:29:04 +02:00
self . verackSent = True
if self . verackReceived :
self . connectionFullyEstablished ( )