2017-01-14 23:21:00 +01:00
import errno
2013-06-22 00:29:04 +02:00
import time
import threading
2013-06-22 01:49:50 +02:00
import Queue
from struct import unpack , pack
2013-06-23 20:31:47 +02:00
import hashlib
2013-06-23 21:52:39 +02:00
import random
2017-02-07 13:00:24 +01:00
import select
2013-06-23 21:52:39 +02:00
import socket
2017-02-07 13:00:24 +01:00
from ssl import SSLError , SSL_ERROR_WANT_WRITE
import sys
2013-06-22 01:49:50 +02:00
2014-05-02 16:46:36 +02:00
from helper_generic import addDataPadding
2013-09-07 00:55:12 +02:00
from class_objectHashHolder import *
from addresses import *
2015-11-18 16:22:17 +01:00
from debug import logger
2017-01-19 19:48:12 +01:00
from inventory import PendingUpload
2017-01-11 14:27:19 +01:00
import protocol
2017-01-11 17:00:00 +01:00
import state
2017-01-14 23:21:00 +01:00
import throttle
2013-06-22 00:29:04 +02:00
# Every connection to a peer has a sendDataThread (and also a
# receiveDataThread).
class sendDataThread ( threading . Thread ) :
2013-12-30 04:36:23 +01:00
def __init__ ( self , sendDataThreadQueue ) :
2015-11-18 16:22:17 +01:00
threading . Thread . __init__ ( self , name = " sendData " )
2013-12-30 04:36:23 +01:00
self . sendDataThreadQueue = sendDataThreadQueue
2017-01-11 17:00:00 +01:00
state . sendDataQueues . append ( self . sendDataThreadQueue )
2013-06-22 00:29:04 +02:00
self . data = ' '
2013-12-30 04:36:23 +01:00
self . objectHashHolderInstance = objectHashHolder ( self . sendDataThreadQueue )
2017-01-16 23:38:18 +01:00
self . objectHashHolderInstance . daemon = True
2013-09-07 00:55:12 +02:00
self . objectHashHolderInstance . start ( )
2014-04-30 21:39:25 +02:00
self . connectionIsOrWasFullyEstablished = False
2013-09-07 00:55:12 +02:00
2013-06-22 00:29:04 +02:00
def setup (
self ,
sock ,
HOST ,
PORT ,
streamNumber ,
2013-06-24 21:51:01 +02:00
someObjectsOfWhichThisRemoteNodeIsAlreadyAware ) :
2013-06-22 00:29:04 +02:00
self . sock = sock
2017-01-12 06:58:35 +01:00
self . peer = state . Peer ( HOST , PORT )
2016-01-26 11:54:11 +01:00
self . name = " sendData- " + self . peer . host . replace ( " : " , " . " ) # log parser field separator
2017-02-06 17:47:05 +01:00
self . streamNumber = [ ]
2015-11-13 12:32:10 +01:00
self . services = 0
2017-01-15 15:08:03 +01:00
self . buffer = " "
2015-11-22 16:43:53 +01:00
self . initiatedConnection = False
2013-06-22 00:29:04 +02:00
self . remoteProtocolVersion = - \
2013-12-30 04:36:23 +01:00
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
2013-06-22 00:29:04 +02:00
self . lastTimeISentData = int (
time . time ( ) ) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
2013-06-24 21:51:01 +02:00
self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
2017-02-06 17:47:05 +01:00
if streamNumber == - 1 : # This was an incoming connection.
2015-11-22 22:44:58 +01:00
self . initiatedConnection = False
else :
self . initiatedConnection = True
2017-02-06 17:47:05 +01:00
#logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
def sendVersionMessage ( self ) :
2017-01-11 14:27:19 +01:00
datatosend = protocol . assembleVersionMessage (
2017-02-06 17:47:05 +01:00
self . peer . host , self . peer . port , state . streamsInWhichIAmParticipating , not self . initiatedConnection ) # the IP and port of the remote host, and my streamNumber.
2013-06-22 00:29:04 +02:00
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending version packet: ' + repr ( datatosend ) )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
try :
2014-07-07 22:30:23 +02:00
self . sendBytes ( datatosend )
2013-06-22 00:29:04 +02:00
except Exception as err :
# if not 'Bad file descriptor' in err:
2015-11-18 16:22:17 +01:00
logger . error ( ' sock.sendall error: %s \n ' % err )
2013-06-29 19:29:35 +02:00
2013-06-22 00:29:04 +02:00
self . versionSent = 1
2017-01-15 15:08:03 +01:00
def sendBytes ( self , data = " " ) :
self . buffer + = data
if len ( self . buffer ) < throttle . SendThrottle ( ) . chunkSize and self . sendDataThreadQueue . qsize ( ) > 1 :
return
while self . buffer and state . shutdown == 0 :
2017-02-07 13:00:24 +01:00
isSSL = False
2017-01-15 15:08:03 +01:00
try :
if ( ( self . services & protocol . NODE_SSL == protocol . NODE_SSL ) and
self . connectionIsOrWasFullyEstablished and
protocol . haveSSL ( not self . initiatedConnection ) ) :
2017-02-07 13:00:24 +01:00
isSSL = True
2017-01-15 15:08:03 +01:00
amountSent = self . sslSock . send ( self . buffer [ : throttle . SendThrottle ( ) . chunkSize ] )
else :
amountSent = self . sock . send ( self . buffer [ : throttle . SendThrottle ( ) . chunkSize ] )
2017-01-16 23:38:18 +01:00
except socket . timeout :
continue
2017-02-07 13:00:24 +01:00
except SSLError as e :
if e . errno == SSL_ERROR_WANT_WRITE :
select . select ( [ ] , [ self . sslSock ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable SSL error ' )
continue
raise
2017-01-15 15:08:03 +01:00
except socket . error as e :
2017-02-14 01:38:58 +01:00
if e . errno in ( errno . EAGAIN , errno . EWOULDBLOCK ) or \
( sys . platform . startswith ( ' win ' ) and \
e . errno in ( errno . WSAEWOULDBLOCK ) ) :
2017-02-07 13:00:24 +01:00
select . select ( [ ] , [ self . sslSock if isSSL else self . sock ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable error ' )
2017-01-15 15:08:03 +01:00
continue
raise
2017-01-14 23:21:00 +01:00
throttle . SendThrottle ( ) . wait ( amountSent )
self . lastTimeISentData = int ( time . time ( ) )
2017-01-15 15:08:03 +01:00
self . buffer = self . buffer [ amountSent : ]
2014-08-06 21:54:59 +02:00
2014-07-07 22:30:23 +02:00
2013-06-22 00:29:04 +02:00
def run ( self ) :
2017-01-11 17:00:00 +01:00
logger . debug ( ' sendDataThread starting. ID: ' + str ( id ( self ) ) + ' . Number of queues in sendDataQueues: ' + str ( len ( state . sendDataQueues ) ) )
2013-06-22 00:29:04 +02:00
while True :
2017-01-15 15:08:03 +01:00
self . sendBytes ( )
2013-12-30 04:36:23 +01:00
deststream , command , data = self . sendDataThreadQueue . get ( )
2013-06-22 00:29:04 +02:00
2017-02-06 17:47:05 +01:00
if deststream == 0 or deststream in self . streamNumber :
2013-06-22 00:29:04 +02:00
if command == ' shutdown ' :
2015-11-18 16:22:17 +01:00
logger . debug ( ' sendDataThread (associated with ' + str ( self . peer ) + ' ) ID: ' + str ( id ( self ) ) + ' shutting down now. ' )
2014-08-06 21:54:59 +02:00
break
2013-06-22 00:29:04 +02:00
# When you receive an incoming connection, a sendDataThread is
# created even though you don't yet know what stream number the
# remote peer is interested in. They will tell you in a version
# message and if you too are interested in that stream then you
# will continue on with the connection and will set the
# streamNumber of this send data thread here:
elif command == ' setStreamNumber ' :
2014-04-30 21:39:25 +02:00
self . streamNumber = data
2017-02-06 17:47:05 +01:00
logger . debug ( ' setting the stream number to %s ' , ' , ' . join ( str ( x ) for x in self . streamNumber ) )
2013-06-22 00:29:04 +02:00
elif command == ' setRemoteProtocolVersion ' :
2013-12-30 04:36:23 +01:00
specifiedRemoteProtocolVersion = data
2015-11-18 16:22:17 +01:00
logger . debug ( ' setting the remote node \' s protocol version in the sendDataThread (ID: ' + str ( id ( self ) ) + ' ) to ' + str ( specifiedRemoteProtocolVersion ) )
2013-12-30 04:36:23 +01:00
self . remoteProtocolVersion = specifiedRemoteProtocolVersion
2013-09-10 01:26:32 +02:00
elif command == ' advertisepeer ' :
self . objectHashHolderInstance . holdPeer ( data )
2013-06-22 00:29:04 +02:00
elif command == ' sendaddr ' :
2014-09-03 01:25:03 +02:00
if self . connectionIsOrWasFullyEstablished : # only send addr messages if we have sent and heard a verack from the remote node
2014-08-06 21:54:59 +02:00
numberOfAddressesInAddrMessage = len ( data )
payload = ' '
for hostDetails in data :
timeLastReceivedMessageFromThisNode , streamNumber , services , host , port = hostDetails
payload + = pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # now uses 64-bit time
payload + = pack ( ' >I ' , streamNumber )
payload + = pack (
' >q ' , services ) # service bit flags offered by this node
2017-01-11 14:27:19 +01:00
payload + = protocol . encodeHost ( host )
2014-08-06 21:54:59 +02:00
payload + = pack ( ' >H ' , port )
payload = encodeVarint ( numberOfAddressesInAddrMessage ) + payload
2017-01-11 14:27:19 +01:00
packet = protocol . CreatePacket ( ' addr ' , payload )
2013-06-22 00:29:04 +02:00
try :
2014-08-01 22:35:48 +02:00
self . sendBytes ( packet )
2013-06-22 00:29:04 +02:00
except :
2015-11-18 16:22:17 +01:00
logger . error ( ' sendaddr: self.sock.sendall failed ' )
2013-06-22 00:29:04 +02:00
break
2014-08-06 21:54:59 +02:00
elif command == ' advertiseobject ' :
self . objectHashHolderInstance . holdHash ( data )
elif command == ' sendinv ' :
if self . connectionIsOrWasFullyEstablished : # only send inv messages if we have send and heard a verack from the remote node
payload = ' '
for hash in data :
if hash not in self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware :
payload + = hash
if payload != ' ' :
payload = encodeVarint ( len ( payload ) / 32 ) + payload
2017-01-11 14:27:19 +01:00
packet = protocol . CreatePacket ( ' inv ' , payload )
2014-08-06 21:54:59 +02:00
try :
self . sendBytes ( packet )
except :
2015-11-18 16:22:17 +01:00
logger . error ( ' sendinv: self.sock.sendall failed ' )
2014-08-06 21:54:59 +02:00
break
2013-06-22 00:29:04 +02:00
elif command == ' pong ' :
2013-06-24 21:51:01 +02:00
self . someObjectsOfWhichThisRemoteNodeIsAlreadyAware . clear ( ) # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
2013-06-22 00:29:04 +02:00
if self . lastTimeISentData < ( int ( time . time ( ) ) - 298 ) :
# Send out a pong message to keep the connection alive.
2015-11-18 16:22:17 +01:00
logger . debug ( ' Sending pong to ' + str ( self . peer ) + ' to keep connection alive. ' )
2017-01-11 14:27:19 +01:00
packet = protocol . CreatePacket ( ' pong ' )
2013-06-22 00:29:04 +02:00
try :
2014-08-01 22:35:48 +02:00
self . sendBytes ( packet )
2013-06-22 00:29:04 +02:00
except :
2015-11-18 16:22:17 +01:00
logger . error ( ' send pong failed ' )
2013-06-22 00:29:04 +02:00
break
2013-12-30 04:36:23 +01:00
elif command == ' sendRawData ' :
2017-01-19 19:48:12 +01:00
hash = None
if type ( data ) in [ list , tuple ] :
hash , data = data
2013-12-30 04:36:23 +01:00
try :
2014-07-07 22:30:23 +02:00
self . sendBytes ( data )
2017-01-19 19:48:12 +01:00
PendingUpload ( ) . delete ( hash )
2013-12-30 04:36:23 +01:00
except :
2017-01-14 23:21:00 +01:00
logger . error ( ' Sending of data to ' + str ( self . peer ) + ' failed. sendDataThread thread ' + str ( self ) + ' ending now. ' , exc_info = True )
2013-12-30 04:36:23 +01:00
break
2014-04-30 21:39:25 +02:00
elif command == ' connectionIsOrWasFullyEstablished ' :
self . connectionIsOrWasFullyEstablished = True
2015-11-22 22:44:58 +01:00
self . services , self . sslSock = data
2017-01-19 19:50:40 +01:00
elif self . connectionIsOrWasFullyEstablished :
2017-02-06 17:47:05 +01:00
logger . error ( ' sendDataThread ID: ' + str ( id ( self ) ) + ' ignoring command ' + command + ' because the thread is not in stream ' + str ( deststream ) + ' but in streams ' + ' , ' . join ( str ( x ) for x in self . streamNumber ) )
2017-01-14 13:22:46 +01:00
self . sendDataThreadQueue . task_done ( )
self . sendDataThreadQueue . task_done ( )
2013-06-29 19:29:35 +02:00
2014-04-30 21:39:25 +02:00
try :
self . sock . shutdown ( socket . SHUT_RDWR )
self . sock . close ( )
except :
pass
2017-01-11 17:00:00 +01:00
state . sendDataQueues . remove ( self . sendDataThreadQueue )
2017-01-19 19:48:12 +01:00
PendingUpload ( ) . threadEnd ( )
2017-01-11 17:00:00 +01:00
logger . info ( ' sendDataThread ending. ID: ' + str ( id ( self ) ) + ' . Number of queues in sendDataQueues: ' + str ( len ( state . sendDataQueues ) ) )
2013-09-07 00:55:12 +02:00
self . objectHashHolderInstance . close ( )