2017-01-14 22:21:00 +00:00
import errno
2013-06-21 22:29:04 +00:00
import time
import threading
2013-06-21 23:49:50 +00:00
import Queue
from struct import unpack , pack
2013-06-23 18:31:47 +00:00
import hashlib
2013-06-23 19:52:39 +00:00
import random
2017-02-07 12:00:24 +00:00
import select
2013-06-23 19:52:39 +00:00
import socket
2017-02-07 12:00:24 +00:00
from ssl import SSLError , SSL_ERROR_WANT_WRITE
import sys
2013-06-21 23:49:50 +00:00
2014-05-02 14:46:36 +00:00
from helper_generic import addDataPadding
2013-09-06 22:55:12 +00:00
from class_objectHashHolder import *
from addresses import *
2015-11-18 15:22:17 +00:00
from debug import logger
2017-01-19 18:48:12 +00:00
from inventory import PendingUpload
2017-01-11 13:27:19 +00:00
import protocol
2017-01-11 16:00:00 +00:00
import state
2017-01-14 22:21:00 +00:00
import throttle
2013-06-21 22:29:04 +00:00
# Every connection to a peer has a sendDataThread (and also a
# receiveDataThread).
class sendDataThread ( threading . Thread ) :
2013-12-30 03:36:23 +00:00
def __init__ ( self , sendDataThreadQueue ) :
2015-11-18 15:22:17 +00:00
threading . Thread . __init__ ( self , name = " sendData " )
2013-12-30 03:36:23 +00:00
self . sendDataThreadQueue = sendDataThreadQueue
2017-01-11 16:00:00 +00:00
state . sendDataQueues . append ( self . sendDataThreadQueue )
2013-06-21 22:29:04 +00:00
self . data = ' '
2013-12-30 03:36:23 +00:00
self . objectHashHolderInstance = objectHashHolder ( self . sendDataThreadQueue )
2017-01-16 22:38:18 +00:00
self . objectHashHolderInstance . daemon = True
2013-09-06 22:55:12 +00:00
self . objectHashHolderInstance . start ( )
2014-04-30 19:39:25 +00:00
self . connectionIsOrWasFullyEstablished = False
2013-09-06 22:55:12 +00:00
2013-06-21 22:29:04 +00:00
def setup (
self ,
sock ,
HOST ,
PORT ,
2017-03-19 21:08:00 +00:00
streamNumber
) :
2013-06-21 22:29:04 +00:00
self . sock = sock
2017-01-12 05:58:35 +00:00
self . peer = state . Peer ( HOST , PORT )
2016-01-26 10:54:11 +00:00
self . name = " sendData- " + self . peer . host . replace ( " : " , " . " ) # log parser field separator
2017-02-06 16:47:05 +00:00
self . streamNumber = [ ]
2015-11-13 11:32:10 +00:00
self . services = 0
2017-01-15 14:08:03 +00:00
self . buffer = " "
2015-11-22 15:43:53 +00:00
self . initiatedConnection = False
2013-06-21 22:29:04 +00:00
self . remoteProtocolVersion = - \
2013-12-30 03:36:23 +00:00
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
2013-06-21 22:29:04 +00:00
self . lastTimeISentData = int (
time . time ( ) ) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
2017-02-06 16:47:05 +00:00
if streamNumber == - 1 : # This was an incoming connection.
2015-11-22 21:44:58 +00:00
self . initiatedConnection = False
else :
self . initiatedConnection = True
2017-02-06 16:47:05 +00:00
#logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
2013-06-29 17:29:35 +00:00
2013-06-21 22:29:04 +00:00
def sendVersionMessage ( self ) :
2017-01-11 13:27:19 +00:00
datatosend = protocol . assembleVersionMessage (
2017-02-06 16:47:05 +00:00
self . peer . host , self . peer . port , state . streamsInWhichIAmParticipating , not self . initiatedConnection ) # the IP and port of the remote host, and my streamNumber.
2013-06-21 22:29:04 +00:00
2015-11-18 15:22:17 +00:00
logger . debug ( ' Sending version packet: ' + repr ( datatosend ) )
2013-06-29 17:29:35 +00:00
2013-06-21 22:29:04 +00:00
try :
2014-07-07 20:30:23 +00:00
self . sendBytes ( datatosend )
2013-06-21 22:29:04 +00:00
except Exception as err :
# if not 'Bad file descriptor' in err:
2015-11-18 15:22:17 +00:00
logger . error ( ' sock.sendall error: %s \n ' % err )
2013-06-29 17:29:35 +00:00
2013-06-21 22:29:04 +00:00
self . versionSent = 1
2017-01-15 14:08:03 +00:00
def sendBytes ( self , data = " " ) :
self . buffer + = data
if len ( self . buffer ) < throttle . SendThrottle ( ) . chunkSize and self . sendDataThreadQueue . qsize ( ) > 1 :
2017-02-21 08:58:28 +00:00
return True
2017-01-15 14:08:03 +00:00
while self . buffer and state . shutdown == 0 :
2017-02-07 12:00:24 +00:00
isSSL = False
2017-01-15 14:08:03 +00:00
try :
if ( ( self . services & protocol . NODE_SSL == protocol . NODE_SSL ) and
self . connectionIsOrWasFullyEstablished and
protocol . haveSSL ( not self . initiatedConnection ) ) :
2017-02-07 12:00:24 +00:00
isSSL = True
2017-01-15 14:08:03 +00:00
amountSent = self . sslSock . send ( self . buffer [ : throttle . SendThrottle ( ) . chunkSize ] )
else :
amountSent = self . sock . send ( self . buffer [ : throttle . SendThrottle ( ) . chunkSize ] )
2017-01-16 22:38:18 +00:00
except socket . timeout :
continue
2017-02-07 12:00:24 +00:00
except SSLError as e :
if e . errno == SSL_ERROR_WANT_WRITE :
select . select ( [ ] , [ self . sslSock ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable SSL error ' )
continue
2017-02-26 19:29:07 +00:00
logger . debug ( ' Connection error (SSL) ' )
return False
2017-01-15 14:08:03 +00:00
except socket . error as e :
2017-02-14 00:38:58 +00:00
if e . errno in ( errno . EAGAIN , errno . EWOULDBLOCK ) or \
( sys . platform . startswith ( ' win ' ) and \
2017-02-17 20:14:39 +00:00
e . errno == errno . WSAEWOULDBLOCK ) :
2017-02-07 12:00:24 +00:00
select . select ( [ ] , [ self . sslSock if isSSL else self . sock ] , [ ] , 10 )
logger . debug ( ' sock.recv retriable error ' )
2017-01-15 14:08:03 +00:00
continue
2017-03-09 10:26:44 +00:00
if e . errno in ( errno . EPIPE , errno . ECONNRESET , errno . EHOSTUNREACH , errno . ETIMEDOUT , errno . ECONNREFUSED ) :
logger . debug ( ' Connection error: %s ' , str ( e ) )
2017-02-21 08:58:28 +00:00
return False
2017-01-15 14:08:03 +00:00
raise
2017-01-14 22:21:00 +00:00
throttle . SendThrottle ( ) . wait ( amountSent )
self . lastTimeISentData = int ( time . time ( ) )
2017-01-15 14:08:03 +00:00
self . buffer = self . buffer [ amountSent : ]
2017-02-21 08:58:28 +00:00
return True
2014-07-07 20:30:23 +00:00
2013-06-21 22:29:04 +00:00
def run ( self ) :
2017-01-11 16:00:00 +00:00
logger . debug ( ' sendDataThread starting. ID: ' + str ( id ( self ) ) + ' . Number of queues in sendDataQueues: ' + str ( len ( state . sendDataQueues ) ) )
2017-02-21 08:58:28 +00:00
while self . sendBytes ( ) :
2013-12-30 03:36:23 +00:00
deststream , command , data = self . sendDataThreadQueue . get ( )
2013-06-21 22:29:04 +00:00
2017-02-06 16:47:05 +00:00
if deststream == 0 or deststream in self . streamNumber :
2013-06-21 22:29:04 +00:00
if command == ' shutdown ' :
2015-11-18 15:22:17 +00:00
logger . debug ( ' sendDataThread (associated with ' + str ( self . peer ) + ' ) ID: ' + str ( id ( self ) ) + ' shutting down now. ' )
2014-08-06 19:54:59 +00:00
break
2013-06-21 22:29:04 +00:00
# When you receive an incoming connection, a sendDataThread is
# created even though you don't yet know what stream number the
# remote peer is interested in. They will tell you in a version
# message and if you too are interested in that stream then you
# will continue on with the connection and will set the
# streamNumber of this send data thread here:
elif command == ' setStreamNumber ' :
2014-04-30 19:39:25 +00:00
self . streamNumber = data
2017-02-06 16:47:05 +00:00
logger . debug ( ' setting the stream number to %s ' , ' , ' . join ( str ( x ) for x in self . streamNumber ) )
2013-06-21 22:29:04 +00:00
elif command == ' setRemoteProtocolVersion ' :
2013-12-30 03:36:23 +00:00
specifiedRemoteProtocolVersion = data
2015-11-18 15:22:17 +00:00
logger . debug ( ' setting the remote node \' s protocol version in the sendDataThread (ID: ' + str ( id ( self ) ) + ' ) to ' + str ( specifiedRemoteProtocolVersion ) )
2013-12-30 03:36:23 +00:00
self . remoteProtocolVersion = specifiedRemoteProtocolVersion
2013-09-09 23:26:32 +00:00
elif command == ' advertisepeer ' :
self . objectHashHolderInstance . holdPeer ( data )
2013-06-21 22:29:04 +00:00
elif command == ' sendaddr ' :
2014-09-02 23:25:03 +00:00
if self . connectionIsOrWasFullyEstablished : # only send addr messages if we have sent and heard a verack from the remote node
2014-08-06 19:54:59 +00:00
numberOfAddressesInAddrMessage = len ( data )
payload = ' '
for hostDetails in data :
timeLastReceivedMessageFromThisNode , streamNumber , services , host , port = hostDetails
payload + = pack (
' >Q ' , timeLastReceivedMessageFromThisNode ) # now uses 64-bit time
payload + = pack ( ' >I ' , streamNumber )
payload + = pack (
' >q ' , services ) # service bit flags offered by this node
2017-01-11 13:27:19 +00:00
payload + = protocol . encodeHost ( host )
2014-08-06 19:54:59 +00:00
payload + = pack ( ' >H ' , port )
payload = encodeVarint ( numberOfAddressesInAddrMessage ) + payload
2017-01-11 13:27:19 +00:00
packet = protocol . CreatePacket ( ' addr ' , payload )
2013-06-21 22:29:04 +00:00
try :
2014-08-01 20:35:48 +00:00
self . sendBytes ( packet )
2013-06-21 22:29:04 +00:00
except :
2015-11-18 15:22:17 +00:00
logger . error ( ' sendaddr: self.sock.sendall failed ' )
2013-06-21 22:29:04 +00:00
break
2014-08-06 19:54:59 +00:00
elif command == ' advertiseobject ' :
self . objectHashHolderInstance . holdHash ( data )
elif command == ' sendinv ' :
if self . connectionIsOrWasFullyEstablished : # only send inv messages if we have send and heard a verack from the remote node
payload = ' '
for hash in data :
2017-03-19 21:08:00 +00:00
payload + = hash
2014-08-06 19:54:59 +00:00
if payload != ' ' :
payload = encodeVarint ( len ( payload ) / 32 ) + payload
2017-01-11 13:27:19 +00:00
packet = protocol . CreatePacket ( ' inv ' , payload )
2014-08-06 19:54:59 +00:00
try :
self . sendBytes ( packet )
except :
2015-11-18 15:22:17 +00:00
logger . error ( ' sendinv: self.sock.sendall failed ' )
2014-08-06 19:54:59 +00:00
break
2013-06-21 22:29:04 +00:00
elif command == ' pong ' :
if self . lastTimeISentData < ( int ( time . time ( ) ) - 298 ) :
# Send out a pong message to keep the connection alive.
2015-11-18 15:22:17 +00:00
logger . debug ( ' Sending pong to ' + str ( self . peer ) + ' to keep connection alive. ' )
2017-01-11 13:27:19 +00:00
packet = protocol . CreatePacket ( ' pong ' )
2013-06-21 22:29:04 +00:00
try :
2014-08-01 20:35:48 +00:00
self . sendBytes ( packet )
2013-06-21 22:29:04 +00:00
except :
2015-11-18 15:22:17 +00:00
logger . error ( ' send pong failed ' )
2013-06-21 22:29:04 +00:00
break
2013-12-30 03:36:23 +00:00
elif command == ' sendRawData ' :
2017-03-01 09:05:08 +00:00
objectHash = None
2017-01-19 18:48:12 +00:00
if type ( data ) in [ list , tuple ] :
2017-03-01 09:05:08 +00:00
objectHash , data = data
2013-12-30 03:36:23 +00:00
try :
2014-07-07 20:30:23 +00:00
self . sendBytes ( data )
2017-03-01 09:05:08 +00:00
PendingUpload ( ) . delete ( objectHash )
2013-12-30 03:36:23 +00:00
except :
2017-01-14 22:21:00 +00:00
logger . error ( ' Sending of data to ' + str ( self . peer ) + ' failed. sendDataThread thread ' + str ( self ) + ' ending now. ' , exc_info = True )
2013-12-30 03:36:23 +00:00
break
2014-04-30 19:39:25 +00:00
elif command == ' connectionIsOrWasFullyEstablished ' :
self . connectionIsOrWasFullyEstablished = True
2015-11-22 21:44:58 +00:00
self . services , self . sslSock = data
2017-01-19 18:50:40 +00:00
elif self . connectionIsOrWasFullyEstablished :
2017-02-06 16:47:05 +00:00
logger . error ( ' sendDataThread ID: ' + str ( id ( self ) ) + ' ignoring command ' + command + ' because the thread is not in stream ' + str ( deststream ) + ' but in streams ' + ' , ' . join ( str ( x ) for x in self . streamNumber ) )
2017-01-14 12:22:46 +00:00
self . sendDataThreadQueue . task_done ( )
2017-02-26 11:42:18 +00:00
# Flush if the cycle ended with break
try :
self . sendDataThreadQueue . task_done ( )
2017-02-26 11:52:28 +00:00
except ValueError :
2017-02-26 11:42:18 +00:00
pass
2013-06-29 17:29:35 +00:00
2014-04-30 19:39:25 +00:00
try :
self . sock . shutdown ( socket . SHUT_RDWR )
self . sock . close ( )
except :
pass
2017-01-11 16:00:00 +00:00
state . sendDataQueues . remove ( self . sendDataThreadQueue )
2017-01-19 18:48:12 +00:00
PendingUpload ( ) . threadEnd ( )
2017-01-11 16:00:00 +00:00
logger . info ( ' sendDataThread ending. ID: ' + str ( id ( self ) ) + ' . Number of queues in sendDataQueues: ' + str ( len ( state . sendDataQueues ) ) )
2013-09-06 22:55:12 +00:00
self . objectHashHolderInstance . close ( )