2016-01-22 10:17:10 +00:00
doTimingAttackMitigation = False
2013-06-24 21:29:04 +00:00
2016-03-18 15:39:29 +00:00
import base64
2016-10-23 08:12:49 +00:00
import datetime
2015-11-13 11:32:10 +00:00
import errno
2016-02-13 11:54:23 +00:00
import math
2013-06-21 22:29:04 +00:00
import time
import threading
import shared
2013-06-21 23:49:50 +00:00
import hashlib
2015-11-13 11:32:10 +00:00
import os
2017-01-16 18:36:58 +00:00
import Queue
2015-11-13 11:32:10 +00:00
import select
2013-06-21 23:49:50 +00:00
import socket
import random
2015-11-13 11:32:10 +00:00
import ssl
2013-06-21 23:49:50 +00:00
from struct import unpack , pack
import sys
2014-08-27 07:14:32 +00:00
import traceback
2016-03-23 22:26:57 +00:00
from binascii import hexlify
2013-11-20 06:29:37 +00:00
#import string
#from subprocess import call # used when the API must execute an outside program
#from pyelliptic.openssl import OpenSSL
2013-06-21 23:49:50 +00:00
2013-11-20 06:29:37 +00:00
#import highlevelcrypto
2013-06-21 23:49:50 +00:00
from addresses import *
2017-02-22 08:34:54 +00:00
from bmconfigparser import BMConfigParser
2016-02-13 11:54:23 +00:00
from class_objectHashHolder import objectHashHolder
2014-05-02 14:46:36 +00:00
from helper_generic import addDataPadding , isHostInPrivateIPRange
2015-03-09 06:35:32 +00:00
from helper_sql import sqlQuery
2017-02-08 12:41:56 +00:00
import knownnodes
2013-09-04 02:45:45 +00:00
from debug import logger
2017-01-11 16:00:00 +00:00
import paths
2017-01-11 13:27:19 +00:00
import protocol
2017-03-19 21:08:00 +00:00
from inventory import Inventory , PendingDownloadQueue , PendingUpload
2017-02-08 12:41:56 +00:00
import queues
2017-01-11 16:00:00 +00:00
import state
2017-01-14 22:21:00 +00:00
import throttle
2016-10-23 08:12:49 +00:00
import tr
2017-01-11 13:27:19 +00:00
from version import softwareVersion
2013-06-21 22:29:04 +00:00
# This thread is created either by the synSenderThread(for outgoing
2013-11-20 06:29:37 +00:00
# connections) or the singleListenerThread(for incoming connections).
2013-06-21 22:29:04 +00:00
class receiveDataThread ( threading . Thread ) :
def __init__ ( self ) :
2015-11-18 15:22:17 +00:00
threading . Thread . __init__ ( self , name = " receiveData " )
2013-06-21 22:29:04 +00:00
self . data = ' '
self . verackSent = False
self . verackReceived = False
def setup (
self ,
sock ,
HOST ,
port ,
streamNumber ,
2013-12-30 03:36:23 +00:00
selfInitiatedConnections ,
2016-02-17 23:53:13 +00:00
sendDataThreadQueue ,
objectHashHolderInstance ) :
2013-12-30 03:36:23 +00:00
2013-06-21 22:29:04 +00:00
self . sock = sock
2017-01-12 05:58:35 +00:00
self . peer = state . Peer ( HOST , port )
2016-01-26 10:54:11 +00:00
self . name = " receiveData- " + self . peer . host . replace ( " : " , " . " ) # ":" log parser field separator
2017-02-06 16:47:05 +00:00
self . streamNumber = state . streamsInWhichIAmParticipating
self . remoteStreams = [ ]
2013-06-21 23:49:50 +00:00
self . selfInitiatedConnections = selfInitiatedConnections
2013-12-30 03:36:23 +00:00
self . sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
2017-01-11 13:27:19 +00:00
self . hostIdent = self . peer . port if " .onion " in BMConfigParser ( ) . get ( ' bitmessagesettings ' , ' onionhostname ' ) and protocol . checkSocksIP ( self . peer . host ) else self . peer . host
2013-06-21 22:29:04 +00:00
shared . connectedHostsList [
2016-11-16 18:36:50 +00:00
self . hostIdent ] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
2013-06-21 22:29:04 +00:00
self . connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
2015-11-13 11:32:10 +00:00
self . services = 0
2017-02-06 16:47:05 +00:00
if streamNumber == - 1 : # This was an incoming connection. Send out a version message if we accept the other node's version message.
2013-06-21 22:29:04 +00:00
self . initiatedConnection = False
else :
self . initiatedConnection = True
2017-02-28 08:43:09 +00:00
for stream in self . streamNumber :
self . selfInitiatedConnections [ stream ] [ self ] = 0
2016-02-17 23:53:13 +00:00
self . objectHashHolderInstance = objectHashHolderInstance
2017-03-19 21:08:00 +00:00
self . downloadQueue = PendingDownloadQueue ( )
2016-02-13 11:54:23 +00:00
self . startTime = time . time ( )
2013-06-21 22:29:04 +00:00
def run ( self ) :
2015-11-18 15:22:17 +00:00
logger . debug ( ' receiveDataThread starting. ID ' + str ( id ( self ) ) + ' . The size of the shared.connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) )
2013-06-29 17:29:35 +00:00
2017-02-02 14:52:32 +00:00
while state . shutdown == 0 :
2013-07-02 15:43:54 +00:00
dataLen = len ( self . data )
2013-06-21 22:29:04 +00:00
try :
2017-02-07 12:00:24 +00:00
isSSL = False
2017-01-11 13:27:19 +00:00
if ( ( self . services & protocol . NODE_SSL == protocol . NODE_SSL ) and
2015-11-22 15:43:53 +00:00
self . connectionIsOrWasFullyEstablished and
2017-01-11 13:27:19 +00:00
protocol . haveSSL ( not self . initiatedConnection ) ) :
2017-02-07 12:00:24 +00:00
isSSL = True
2017-01-15 14:08:03 +00:00
dataRecv = self . sslSock . recv ( throttle . ReceiveThrottle ( ) . chunkSize )
2015-11-13 11:32:10 +00:00
else :
2017-01-15 14:08:03 +00:00
dataRecv = self . sock . recv ( throttle . ReceiveThrottle ( ) . chunkSize )
2014-07-07 20:30:23 +00:00
self . data + = dataRecv
2017-01-14 22:21:00 +00:00
throttle . ReceiveThrottle ( ) . wait ( len ( dataRecv ) )
2013-06-21 22:29:04 +00:00
except socket . timeout :
2017-02-07 15:06:24 +00:00
if self . connectionIsOrWasFullyEstablished :
self . sendping ( " Still around! " )
continue
logger . error ( " Timeout during protocol initialisation " )
2013-06-21 22:29:04 +00:00
break
2017-02-07 12:00:24 +00:00
except ssl . SSLError as err :
if err . errno == ssl . SSL_ERROR_WANT_READ :
select . select ( [ self . sslSock ] , [ ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable SSL error ' )
continue
2017-02-07 15:06:24 +00:00
if err . errno is None and ' timed out ' in str ( err ) :
if self . connectionIsOrWasFullyEstablished :
self . sendping ( " Still around! " )
continue
logger . error ( ' SSL error: %i / %s ' , err . errno if err . errno else 0 , str ( err ) )
2017-02-07 12:00:24 +00:00
break
2017-01-11 13:27:19 +00:00
except socket . error as err :
2017-02-14 00:38:58 +00:00
if err . errno in ( errno . EAGAIN , errno . EWOULDBLOCK ) or \
( sys . platform . startswith ( ' win ' ) and \
2017-02-17 20:14:39 +00:00
err . errno == errno . WSAEWOULDBLOCK ) :
2017-02-07 12:00:24 +00:00
select . select ( [ self . sslSock if isSSL else self . sock ] , [ ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable error ' )
2017-02-06 18:41:25 +00:00
continue
2017-03-09 10:26:44 +00:00
logger . error ( ' sock.recv error. Closing receiveData thread, %s ' , str ( err ) )
2013-06-21 22:29:04 +00:00
break
# print 'Received', repr(self.data)
2013-07-05 20:56:49 +00:00
if len ( self . data ) == dataLen : # If self.sock.recv returned no data:
2017-03-09 10:26:44 +00:00
logger . debug ( ' Connection to ' + str ( self . peer ) + ' closed. Closing receiveData thread ' )
2013-06-21 22:29:04 +00:00
break
else :
self . processData ( )
try :
2017-02-28 08:43:09 +00:00
for stream in self . streamNumber :
2017-02-06 16:47:05 +00:00
try :
del self . selfInitiatedConnections [ stream ] [ self ]
except KeyError :
pass
2015-11-18 15:22:17 +00:00
logger . debug ( ' removed self (a receiveDataThread) from selfInitiatedConnections ' )
2013-06-21 22:29:04 +00:00
except :
pass
2014-08-06 19:54:59 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) ) # commands the corresponding sendDataThread to shut itself down.
2013-06-21 22:29:04 +00:00
try :
2016-11-16 18:36:50 +00:00
del shared . connectedHostsList [ self . hostIdent ]
2013-06-21 22:29:04 +00:00
except Exception as err :
2016-11-16 18:36:50 +00:00
logger . error ( ' Could not delete ' + str ( self . hostIdent ) + ' from shared.connectedHostsList. ' + str ( err ) )
2013-06-29 17:29:35 +00:00
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
2016-10-23 08:12:49 +00:00
self . checkTimeOffsetNotification ( )
2015-11-18 15:22:17 +00:00
logger . debug ( ' receiveDataThread ending. ID ' + str ( id ( self ) ) + ' . The size of the shared.connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) )
2013-06-29 17:29:35 +00:00
2016-02-13 11:54:23 +00:00
def antiIntersectionDelay ( self , initial = False ) :
# estimated time for a small object to propagate across the whole network
2017-02-08 12:41:56 +00:00
delay = math . ceil ( math . log ( max ( len ( knownnodes . knownNodes [ x ] ) for x in knownnodes . knownNodes ) + 2 , 20 ) ) * ( 0.2 + objectHashHolder . size / 2 )
2017-02-06 16:47:05 +00:00
# take the stream with maximum amount of nodes
2016-02-13 11:54:23 +00:00
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
# 0.2 is avg message transmission time
now = time . time ( )
if initial and now - delay < self . startTime :
2016-02-17 23:53:13 +00:00
logger . debug ( " Initial sleeping for %.2f s " , delay - ( now - self . startTime ) )
2016-02-13 11:54:23 +00:00
time . sleep ( delay - ( now - self . startTime ) )
elif not initial :
2016-02-17 23:53:13 +00:00
logger . debug ( " Sleeping due to missing object for %.2f s " , delay )
2016-02-13 11:54:23 +00:00
time . sleep ( delay )
2013-06-21 22:29:04 +00:00
2016-10-23 08:12:49 +00:00
def checkTimeOffsetNotification ( self ) :
if shared . timeOffsetWrongCount > = 4 and not self . connectionIsOrWasFullyEstablished :
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' updateStatusBar ' , tr . _translate ( " MainWindow " , " The time on your computer, % 1, may be wrong. Please verify your settings. " ) . arg ( datetime . datetime . now ( ) . strftime ( " % H: % M: % S " ) ) ) )
2016-10-23 08:12:49 +00:00
2013-06-21 22:29:04 +00:00
def processData ( self ) :
2017-01-11 13:27:19 +00:00
if len ( self . data ) < protocol . Header . size : # if so little of the data has arrived that we can't even read the checksum then wait for more data.
2013-06-21 22:29:04 +00:00
return
2014-08-06 19:54:59 +00:00
2017-01-11 13:27:19 +00:00
magic , command , payloadLength , checksum = protocol . Header . unpack ( self . data [ : protocol . Header . size ] )
2014-05-22 15:57:48 +00:00
if magic != 0xE9BEB4D9 :
2013-06-21 22:29:04 +00:00
self . data = " "
return
2014-09-10 20:47:51 +00:00
if payloadLength > 1600100 : # ~1.6 MB which is the maximum possible size of an inv message.
2014-05-22 15:57:48 +00:00
logger . info ( ' The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s ' % payloadLength )
2017-01-11 13:27:19 +00:00
self . data = self . data [ payloadLength + protocol . Header . size : ]
2014-08-06 19:54:59 +00:00
del magic , command , payloadLength , checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
2014-02-05 07:45:10 +00:00
self . processData ( )
return
2017-01-11 13:27:19 +00:00
if len ( self . data ) < payloadLength + protocol . Header . size : # check if the whole message has arrived yet.
2013-06-21 22:29:04 +00:00
return
2017-01-11 13:27:19 +00:00
payload = self . data [ protocol . Header . size : payloadLength + protocol . Header . size ]
2014-07-14 22:01:56 +00:00
if checksum != hashlib . sha512 ( payload ) . digest ( ) [ 0 : 4 ] : # test the checksum in the message.
2015-11-18 15:22:17 +00:00
logger . error ( ' Checksum incorrect. Clearing this message. ' )
2017-01-11 13:27:19 +00:00
self . data = self . data [ payloadLength + protocol . Header . size : ]
2014-08-06 19:54:59 +00:00
del magic , command , payloadLength , checksum , payload # better to clean up before the recursive call
2013-06-21 22:29:04 +00:00
self . processData ( )
return
2014-08-06 19:54:59 +00:00
2013-06-21 22:29:04 +00:00
# The time we've last seen this node is obviously right now since we
# just received valid data from it. So update the knownNodes list so
# that other peers can be made aware of its existance.
if self . initiatedConnection and self . connectionIsOrWasFullyEstablished : # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
2017-02-08 12:41:56 +00:00
with knownnodes . knownNodesLock :
2017-02-06 16:47:05 +00:00
for stream in self . streamNumber :
2017-02-08 12:41:56 +00:00
knownnodes . knownNodes [ stream ] [ self . peer ] = int ( time . time ( ) )
2014-02-05 07:45:10 +00:00
2014-05-22 15:57:48 +00:00
#Strip the nulls
command = command . rstrip ( ' \x00 ' )
2015-11-18 15:22:17 +00:00
logger . debug ( ' remoteCommand ' + repr ( command ) + ' from ' + str ( self . peer ) )
2014-05-22 15:57:48 +00:00
2014-08-27 07:14:32 +00:00
try :
#TODO: Use a dispatcher here
2014-09-10 20:47:51 +00:00
if command == ' error ' :
self . recerror ( payload )
elif not self . connectionIsOrWasFullyEstablished :
2014-08-27 07:14:32 +00:00
if command == ' version ' :
self . recversion ( payload )
elif command == ' verack ' :
self . recverack ( )
else :
if command == ' addr ' :
self . recaddr ( payload )
elif command == ' inv ' :
self . recinv ( payload )
elif command == ' getdata ' :
self . recgetdata ( payload )
elif command == ' object ' :
self . recobject ( payload )
elif command == ' ping ' :
self . sendpong ( payload )
2017-02-07 15:06:24 +00:00
elif command == ' pong ' :
pass
else :
logger . info ( " Unknown command %s , ignoring " , command )
2014-08-27 07:14:32 +00:00
except varintDecodeError as e :
logger . debug ( " There was a problem with a varint while processing a message from the wire. Some details: %s " % e )
except Exception as e :
logger . critical ( " Critical error in a receiveDataThread: \n %s " % traceback . format_exc ( ) )
2014-08-06 19:54:59 +00:00
del payload
2017-01-11 13:27:19 +00:00
self . data = self . data [ payloadLength + protocol . Header . size : ] # take this message out and then process the next message
2013-06-21 22:29:04 +00:00
2014-08-06 19:54:59 +00:00
if self . data == ' ' : # if there are no more messages
2017-03-19 21:08:00 +00:00
toRequest = [ ]
2017-01-16 18:36:58 +00:00
try :
2017-03-20 00:22:37 +00:00
for i in range ( len ( self . downloadQueue . pending ) , 100 ) :
2017-03-19 21:08:00 +00:00
while True :
hashId = self . downloadQueue . get ( False )
if not hashId in Inventory ( ) :
toRequest . append ( hashId )
break
# don't track download for duplicates
2017-03-20 00:22:37 +00:00
self . downloadQueue . task_done ( hashId )
2017-03-19 21:08:00 +00:00
except Queue . Empty :
2017-01-16 18:36:58 +00:00
pass
2017-03-19 21:08:00 +00:00
if len ( toRequest ) > 0 :
self . sendgetdata ( toRequest )
2013-06-21 22:29:04 +00:00
self . processData ( )
2017-01-11 17:13:00 +00:00
def sendpong ( self , payload ) :
2015-11-18 15:22:17 +00:00
logger . debug ( ' Sending pong ' )
2017-01-11 17:13:00 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . CreatePacket ( ' pong ' , payload ) ) )
2013-06-29 17:29:35 +00:00
2017-02-07 15:06:24 +00:00
def sendping ( self , payload ) :
logger . debug ( ' Sending ping ' )
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . CreatePacket ( ' ping ' , payload ) ) )
2013-06-21 22:29:04 +00:00
def recverack ( self ) :
2015-11-18 15:22:17 +00:00
logger . debug ( ' verack received ' )
2013-06-21 22:29:04 +00:00
self . verackReceived = True
if self . verackSent :
# We have thus both sent and received a verack.
self . connectionFullyEstablished ( )
2017-02-07 18:38:52 +00:00
def sslHandshake ( self ) :
2015-11-13 11:32:10 +00:00
self . sslSock = self . sock
2017-01-11 13:27:19 +00:00
if ( ( self . services & protocol . NODE_SSL == protocol . NODE_SSL ) and
protocol . haveSSL ( not self . initiatedConnection ) ) :
2015-11-22 21:44:58 +00:00
logger . debug ( " Initialising TLS " )
2017-01-11 19:47:27 +00:00
if sys . version_info > = ( 2 , 7 , 9 ) :
2017-01-14 12:22:46 +00:00
context = ssl . SSLContext ( protocol . sslProtocolVersion )
2017-01-14 16:50:49 +00:00
context . set_ciphers ( protocol . sslProtocolCiphers )
2017-01-11 19:47:27 +00:00
context . set_ecdh_curve ( " secp256k1 " )
context . check_hostname = False
context . verify_mode = ssl . CERT_NONE
# also exclude TLSv1 and TLSv1.1 in the future
2017-01-14 12:22:46 +00:00
context . options = ssl . OP_ALL | ssl . OP_NO_SSLv2 | ssl . OP_NO_SSLv3 | ssl . OP_SINGLE_ECDH_USE | ssl . OP_CIPHER_SERVER_PREFERENCE
2017-01-11 19:47:27 +00:00
self . sslSock = context . wrap_socket ( self . sock , server_side = not self . initiatedConnection , do_handshake_on_connect = False )
else :
2017-01-14 16:50:49 +00:00
self . sslSock = ssl . wrap_socket ( self . sock , keyfile = os . path . join ( paths . codePath ( ) , ' sslkeys ' , ' key.pem ' ) , certfile = os . path . join ( paths . codePath ( ) , ' sslkeys ' , ' cert.pem ' ) , server_side = not self . initiatedConnection , ssl_version = protocol . sslProtocolVersion , do_handshake_on_connect = False , ciphers = protocol . sslProtocolCiphers )
2017-01-14 12:22:46 +00:00
self . sendDataThreadQueue . join ( )
2015-11-13 11:32:10 +00:00
while True :
try :
self . sslSock . do_handshake ( )
2017-01-07 22:42:07 +00:00
logger . debug ( " TLS handshake success " )
2017-05-07 18:15:57 +00:00
if sys . version_info > = ( 2 , 7 , 9 ) :
logger . debug ( " TLS protocol version: %s " , self . sslSock . version ( ) )
2015-11-13 11:32:10 +00:00
break
2017-01-19 18:52:54 +00:00
except ssl . SSLError as e :
2017-02-03 09:05:35 +00:00
if sys . hexversion > = 0x02070900 :
if isinstance ( e , ssl . SSLWantReadError ) :
logger . debug ( " Waiting for SSL socket handhake read " )
select . select ( [ self . sslSock ] , [ ] , [ ] , 10 )
continue
elif isinstance ( e , ssl . SSLWantWriteError ) :
logger . debug ( " Waiting for SSL socket handhake write " )
select . select ( [ ] , [ self . sslSock ] , [ ] , 10 )
continue
else :
if e . args [ 0 ] == ssl . SSL_ERROR_WANT_READ :
logger . debug ( " Waiting for SSL socket handhake read " )
select . select ( [ self . sslSock ] , [ ] , [ ] , 10 )
continue
elif e . args [ 0 ] == ssl . SSL_ERROR_WANT_WRITE :
logger . debug ( " Waiting for SSL socket handhake write " )
select . select ( [ ] , [ self . sslSock ] , [ ] , 10 )
continue
2017-03-09 10:26:44 +00:00
logger . error ( " SSL socket handhake failed: shutting down connection, %s " , str ( e ) )
2017-01-19 18:52:54 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' tls handshake fail %s ' % ( str ( e ) ) ) )
2017-02-08 19:49:14 +00:00
return False
2017-03-09 10:26:44 +00:00
except socket . error as err :
logger . debug ( ' SSL socket handshake failed, shutting down connection, %s ' , str ( err ) )
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' tls handshake fail ' ) )
return False
2017-02-03 09:05:35 +00:00
except Exception :
2017-01-14 12:22:46 +00:00
logger . error ( " SSL socket handhake failed, shutting down connection " , exc_info = True )
2017-01-07 22:42:07 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' tls handshake fail ' ) )
2017-02-08 19:49:14 +00:00
return False
2017-02-07 15:42:02 +00:00
# SSL in the background should be blocking, otherwise the error handling is difficult
self . sslSock . settimeout ( None )
2017-02-08 19:49:14 +00:00
return True
# no SSL
return True
2017-02-07 18:38:52 +00:00
def peerValidityChecks ( self ) :
if self . remoteProtocolVersion < 3 :
2017-02-07 19:09:11 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage (
fatal = 2 , errorText = " Your is using an old protocol. Closing connection. " ) ) )
2017-02-07 18:38:52 +00:00
logger . debug ( ' Closing connection to old protocol version ' + str ( self . remoteProtocolVersion ) + ' node: ' + str ( self . peer ) )
return False
if self . timeOffset > 3600 :
2017-02-07 19:09:11 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage (
fatal = 2 , errorText = " Your time is too far in the future compared to mine. Closing connection. " ) ) )
logger . info ( " %s ' s time is too far in the future ( %s seconds). Closing connection to it. " , self . peer , self . timeOffset )
2017-02-07 18:38:52 +00:00
shared . timeOffsetWrongCount + = 1
time . sleep ( 2 )
return False
elif self . timeOffset < - 3600 :
2017-02-07 19:09:11 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage (
fatal = 2 , errorText = " Your time is too far in the past compared to mine. Closing connection. " ) ) )
logger . info ( " %s ' s time is too far in the past (timeOffset %s seconds). Closing connection to it. " , self . peer , self . timeOffset )
2017-02-07 18:38:52 +00:00
shared . timeOffsetWrongCount + = 1
return False
else :
shared . timeOffsetWrongCount = 0
if len ( self . streamNumber ) == 0 :
2017-02-07 19:09:11 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage (
fatal = 2 , errorText = " We don ' t have shared stream interests. Closing connection. " ) ) )
2017-02-07 18:38:52 +00:00
logger . debug ( ' Closed connection to ' + str ( self . peer ) + ' because there is no overlapping interest in streams. ' )
return False
return True
def connectionFullyEstablished ( self ) :
if self . connectionIsOrWasFullyEstablished :
# there is no reason to run this function a second time
return
2017-02-08 19:49:14 +00:00
if not self . sslHandshake ( ) :
return
2017-02-07 18:38:52 +00:00
if self . peerValidityChecks ( ) == False :
time . sleep ( 2 )
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
self . checkTimeOffsetNotification ( )
return
self . connectionIsOrWasFullyEstablished = True
shared . timeOffsetWrongCount = 0
2014-04-30 19:39:25 +00:00
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
2015-11-22 21:44:58 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' connectionIsOrWasFullyEstablished ' , ( self . services , self . sslSock ) ) )
2015-11-13 11:32:10 +00:00
2013-06-21 22:29:04 +00:00
if not self . initiatedConnection :
2013-08-24 23:40:48 +00:00
shared . clientHasReceivedIncomingConnections = True
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' setStatusIcon ' , ' green ' ) )
2013-06-21 22:29:04 +00:00
self . sock . settimeout (
2017-02-07 15:06:24 +00:00
600 ) # We'll send out a ping every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' updateNetworkStatusTab ' , ' no data ' ) )
2015-11-18 15:22:17 +00:00
logger . debug ( ' Connection fully established with ' + str ( self . peer ) + " \n " + \
' The size of the connectedHostsList is now ' + str ( len ( shared . connectedHostsList ) ) + " \n " + \
2017-01-11 16:00:00 +00:00
' The length of sendDataQueues is now: ' + str ( len ( state . sendDataQueues ) ) + " \n " + \
2015-11-18 15:22:17 +00:00
' broadcasting addr from within connectionFullyEstablished function. ' )
2013-06-29 17:29:35 +00:00
2017-01-12 18:18:56 +00:00
if self . initiatedConnection :
state . networkProtocolAvailability [ protocol . networkType ( self . peer . host ) ] = True
2017-01-19 18:48:12 +00:00
# we need to send our own objects to this node
PendingUpload ( ) . add ( )
2013-11-20 06:29:37 +00:00
# Let all of our peers know about this new node.
2017-02-06 16:47:05 +00:00
for stream in self . remoteStreams :
dataToSend = ( int ( time . time ( ) ) , stream , self . services , self . peer . host , self . remoteNodeIncomingPort )
protocol . broadcastToSendDataQueues ( (
stream , ' advertisepeer ' , dataToSend ) )
2013-09-09 23:26:32 +00:00
2013-06-21 22:29:04 +00:00
self . sendaddr ( ) # This is one large addr message to this one peer.
2017-02-27 22:31:12 +00:00
if len ( shared . connectedHostsList ) > \
2017-02-26 16:46:02 +00:00
BMConfigParser ( ) . safeGetInt ( " bitmessagesettings " , " maxtotalconnections " , 200 ) :
2015-11-18 15:22:17 +00:00
logger . info ( ' We are connected to too many people. Closing connection. ' )
2017-02-27 22:31:12 +00:00
if self . initiatedConnection :
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage ( fatal = 2 , errorText = " Thank you for providing a listening node. " ) ) )
else :
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . assembleErrorMessage ( fatal = 2 , errorText = " Server full, please try again later. " ) ) )
2014-08-06 19:54:59 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' shutdown ' , ' no data ' ) )
2013-06-21 22:29:04 +00:00
return
self . sendBigInv ( )
def sendBigInv ( self ) :
2014-08-27 07:14:32 +00:00
# Select all hashes for objects in this stream.
2013-06-21 22:29:04 +00:00
bigInvList = { }
2017-02-06 16:47:05 +00:00
for stream in self . streamNumber :
for hash in Inventory ( ) . unexpired_hashes_by_stream ( stream ) :
2017-03-19 21:08:00 +00:00
if not self . objectHashHolderInstance . hasHash ( hash ) :
2017-02-06 16:47:05 +00:00
bigInvList [ hash ] = 0
2013-06-21 22:29:04 +00:00
numberOfObjectsInInvMessage = 0
payload = ' '
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash , storedValue in bigInvList . items ( ) :
payload + = hash
numberOfObjectsInInvMessage + = 1
2014-05-22 15:57:48 +00:00
if numberOfObjectsInInvMessage == 50000 : # We can only send a max of 50000 items per inv message but we may have more objects to advertise. They must be split up into multiple inv messages.
2013-06-21 22:29:04 +00:00
self . sendinvMessageToJustThisOnePeer (
numberOfObjectsInInvMessage , payload )
payload = ' '
numberOfObjectsInInvMessage = 0
if numberOfObjectsInInvMessage > 0 :
self . sendinvMessageToJustThisOnePeer (
numberOfObjectsInInvMessage , payload )
2014-04-30 19:39:25 +00:00
# Used to send a big inv message when the connection with a node is
# first fully established. Notice that there is also a broadcastinv
# function for broadcasting invs to everyone in our stream.
2013-06-21 22:29:04 +00:00
def sendinvMessageToJustThisOnePeer ( self , numberOfObjects , payload ) :
payload = encodeVarint ( numberOfObjects ) + payload
2015-11-18 15:22:17 +00:00
logger . debug ( ' Sending huge inv message with ' + str ( numberOfObjects ) + ' objects to just this one peer ' )
2017-01-11 13:27:19 +00:00
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . CreatePacket ( ' inv ' , payload ) ) )
2013-06-29 17:29:35 +00:00
2014-02-06 13:16:07 +00:00
def _sleepForTimingAttackMitigation ( self , sleepTime ) :
# We don't need to do the timing attack mitigation if we are
# only connected to the trusted peer because we can trust the
# peer not to attack
2017-01-12 05:58:35 +00:00
if sleepTime > 0 and doTimingAttackMitigation and state . trustedPeer == None :
2015-11-18 15:22:17 +00:00
logger . debug ( ' Timing attack mitigation: Sleeping for ' + str ( sleepTime ) + ' seconds. ' )
2014-02-06 13:16:07 +00:00
time . sleep ( sleepTime )
2014-09-10 20:47:51 +00:00
def recerror ( self , data ) :
"""
The remote node has been polite enough to send you an error message .
"""
fatalStatus , readPosition = decodeVarint ( data [ : 10 ] )
banTime , banTimeLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
readPosition + = banTimeLength
inventoryVectorLength , inventoryVectorLengthLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
if inventoryVectorLength > 100 :
return
readPosition + = inventoryVectorLengthLength
inventoryVector = data [ readPosition : readPosition + inventoryVectorLength ]
readPosition + = inventoryVectorLength
errorTextLength , errorTextLengthLength = decodeVarint ( data [ readPosition : readPosition + 10 ] )
if errorTextLength > 1000 :
return
readPosition + = errorTextLengthLength
errorText = data [ readPosition : readPosition + errorTextLength ]
if fatalStatus == 0 :
fatalHumanFriendly = ' Warning '
elif fatalStatus == 1 :
fatalHumanFriendly = ' Error '
elif fatalStatus == 2 :
fatalHumanFriendly = ' Fatal '
message = ' %s message received from %s : %s . ' % ( fatalHumanFriendly , self . peer , errorText )
if inventoryVector :
2016-03-23 22:26:57 +00:00
message + = " This concerns object %s " % hexlify ( inventoryVector )
2014-09-10 20:47:51 +00:00
if banTime > 0 :
message + = " Remote node says that the ban time is %s " % banTime
logger . error ( message )
2013-06-21 22:29:04 +00:00
2013-11-20 06:29:37 +00:00
2014-08-27 07:14:32 +00:00
def recobject ( self , data ) :
2013-06-21 22:29:04 +00:00
self . messageProcessingStartTime = time . time ( )
2014-08-27 07:14:32 +00:00
lengthOfTimeWeShouldUseToProcessThisMessage = shared . checkAndShareObjectWithPeers ( data )
2017-03-20 00:22:37 +00:00
self . downloadQueue . task_done ( calculateInventoryHash ( data ) )
2014-08-27 07:14:32 +00:00
2013-11-20 06:29:37 +00:00
"""
2014-08-27 07:14:32 +00:00
Sleeping will help guarantee that we can process messages faster than a
remote node can send them . If we fall behind , the attacker could observe
that we are are slowing down the rate at which we request objects from the
2013-11-20 06:29:37 +00:00
network which would indicate that we own a particular address ( whichever
one to which they are sending all of their attack messages ) . Note
that if an attacker connects to a target with many connections , this
mitigation mechanism might not be sufficient .
"""
2014-08-27 07:14:32 +00:00
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - ( time . time ( ) - self . messageProcessingStartTime )
2014-02-06 13:16:07 +00:00
self . _sleepForTimingAttackMitigation ( sleepTime )
2014-08-27 07:14:32 +00:00
2013-06-21 22:29:04 +00:00
# We have received an inv message
def recinv ( self , data ) :
numberOfItemsInInv , lengthOfVarint = decodeVarint ( data [ : 10 ] )
if numberOfItemsInInv > 50000 :
sys . stderr . write ( ' Too many items in inv message! ' )
return
if len ( data ) < lengthOfVarint + ( numberOfItemsInInv * 32 ) :
2015-11-18 15:22:17 +00:00
logger . info ( ' inv message doesn \' t contain enough data. Ignoring. ' )
2013-06-21 22:29:04 +00:00
return
2017-01-15 18:21:24 +00:00
startTime = time . time ( )
advertisedSet = set ( )
for i in range ( numberOfItemsInInv ) :
advertisedSet . add ( data [ lengthOfVarint + ( 32 * i ) : 32 + lengthOfVarint + ( 32 * i ) ] )
2017-02-06 16:47:05 +00:00
objectsNewToMe = advertisedSet
for stream in self . streamNumber :
objectsNewToMe - = Inventory ( ) . hashes_by_stream ( stream )
2017-01-15 18:21:24 +00:00
logger . info ( ' inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out. ' , numberOfItemsInInv , len ( objectsNewToMe ) , time . time ( ) - startTime )
2017-03-20 00:22:37 +00:00
for item in random . sample ( objectsNewToMe , len ( objectsNewToMe ) ) :
2017-03-19 21:08:00 +00:00
self . downloadQueue . put ( item )
2013-06-21 22:29:04 +00:00
# Send a getdata message to our peer to request the object with the given
# hash
2017-01-16 18:36:58 +00:00
def sendgetdata ( self , hashes ) :
2017-01-16 22:37:25 +00:00
if len ( hashes ) == 0 :
return
2017-01-16 18:36:58 +00:00
logger . debug ( ' sending getdata to retrieve %i objects ' , len ( hashes ) )
payload = encodeVarint ( len ( hashes ) ) + ' ' . join ( hashes )
self . sendDataThreadQueue . put ( ( 0 , ' sendRawData ' , protocol . CreatePacket ( ' getdata ' , payload ) ) , False )
2013-06-29 17:29:35 +00:00
2013-06-21 22:29:04 +00:00
# We have received a getdata request from our peer
def recgetdata ( self , data ) :
numberOfRequestedInventoryItems , lengthOfVarint = decodeVarint (
data [ : 10 ] )
if len ( data ) < lengthOfVarint + ( 32 * numberOfRequestedInventoryItems ) :
2015-11-18 15:22:17 +00:00
logger . debug ( ' getdata message does not contain enough data. Ignoring. ' )
2013-06-21 22:29:04 +00:00
return
2016-02-17 23:53:13 +00:00
self . antiIntersectionDelay ( True ) # only handle getdata requests if we have been connected long enough
2013-06-21 22:29:04 +00:00
for i in xrange ( numberOfRequestedInventoryItems ) :
hash = data [ lengthOfVarint + (
i * 32 ) : 32 + lengthOfVarint + ( i * 32 ) ]
2016-03-23 22:26:57 +00:00
logger . debug ( ' received getdata request for item: ' + hexlify ( hash ) )
2013-06-29 17:29:35 +00:00
2016-02-17 23:53:13 +00:00
if self . objectHashHolderInstance . hasHash ( hash ) :
self . antiIntersectionDelay ( )
2013-06-21 22:29:04 +00:00
else :
2017-01-10 20:15:35 +00:00
if hash in Inventory ( ) :