V0.6 #852
|
@ -1,11 +1,15 @@
|
||||||
doTimingAttackMitigation = True
|
doTimingAttackMitigation = True
|
||||||
|
|
||||||
|
import errno
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
import shared
|
import shared
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
|
import select
|
||||||
import socket
|
import socket
|
||||||
import random
|
import random
|
||||||
|
import ssl
|
||||||
from struct import unpack, pack
|
from struct import unpack, pack
|
||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -49,6 +53,7 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||||
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||||
|
self.services = 0
|
||||||
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
else:
|
else:
|
||||||
|
@ -76,7 +81,10 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.numberOfBytesReceivedLastSecond = 0
|
shared.numberOfBytesReceivedLastSecond = 0
|
||||||
dataLen = len(self.data)
|
dataLen = len(self.data)
|
||||||
try:
|
try:
|
||||||
dataRecv = self.sock.recv(1024)
|
if (self.services & 2 == 2) and self.connectionIsOrWasFullyEstablished:
|
||||||
|
dataRecv = self.sslSock.recv(1024)
|
||||||
|
else:
|
||||||
|
dataRecv = self.sock.recv(1024)
|
||||||
self.data += dataRecv
|
self.data += dataRecv
|
||||||
shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates.
|
shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates.
|
||||||
shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit
|
shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit
|
||||||
|
@ -85,6 +93,9 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Timeout occurred waiting for data from', self.peer, '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
print 'Timeout occurred waiting for data from', self.peer, '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
if (sys.platform == 'win32' and err.errno == 10035) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK):
|
||||||
|
select.select([self.sslSock], [], [])
|
||||||
|
continue
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', err
|
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', err
|
||||||
break
|
break
|
||||||
|
@ -252,8 +263,24 @@ class receiveDataThread(threading.Thread):
|
||||||
# there is no reason to run this function a second time
|
# there is no reason to run this function a second time
|
||||||
return
|
return
|
||||||
self.connectionIsOrWasFullyEstablished = True
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
|
|
||||||
|
self.sslSock = self.sock
|
||||||
|
if (self.services & 2 == 2):
|
||||||
|
self.sslSock = ssl.wrap_socket(self.sock, keyfile = os.path.join(shared.codePath(), 'sslkeys', 'key.pem'), certfile = os.path.join(shared.codePath(), 'sslkeys', 'cert.pem'), server_side = not self.initiatedConnection, ssl_version=ssl.PROTOCOL_SSLv23, do_handshake_on_connect=False, ciphers='AECDH-AES256-SHA')
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self.sslSock.do_handshake()
|
||||||
|
break
|
||||||
|
except ssl.SSLError as e:
|
||||||
|
if e.errno == 2:
|
||||||
|
select.select([self.sslSock], [self.sslSock], [])
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
break
|
||||||
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
|
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
|
||||||
self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', 'no data'))
|
self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', (self.services, self.sslSock)))
|
||||||
|
|
||||||
if not self.initiatedConnection:
|
if not self.initiatedConnection:
|
||||||
shared.clientHasReceivedIncomingConnections = True
|
shared.clientHasReceivedIncomingConnections = True
|
||||||
shared.UISignalQueue.put(('setStatusIcon', 'green'))
|
shared.UISignalQueue.put(('setStatusIcon', 'green'))
|
||||||
|
@ -674,6 +701,7 @@ class receiveDataThread(threading.Thread):
|
||||||
"""
|
"""
|
||||||
return
|
return
|
||||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||||
|
self.services, = unpack('>q', data[4:12])
|
||||||
if self.remoteProtocolVersion < 3:
|
if self.remoteProtocolVersion < 3:
|
||||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
|
|
@ -36,6 +36,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.peer = shared.Peer(HOST, PORT)
|
self.peer = shared.Peer(HOST, PORT)
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
|
self.services = 0
|
||||||
self.remoteProtocolVersion = - \
|
self.remoteProtocolVersion = - \
|
||||||
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
|
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
|
||||||
self.lastTimeISentData = int(
|
self.lastTimeISentData = int(
|
||||||
|
@ -82,7 +83,10 @@ class sendDataThread(threading.Thread):
|
||||||
uploadRateLimitBytes = 999999999 # float("inf") doesn't work
|
uploadRateLimitBytes = 999999999 # float("inf") doesn't work
|
||||||
else:
|
else:
|
||||||
uploadRateLimitBytes = shared.config.getint('bitmessagesettings', 'maxuploadrate') * 1000
|
uploadRateLimitBytes = shared.config.getint('bitmessagesettings', 'maxuploadrate') * 1000
|
||||||
amountSent = self.sock.send(data[:1000])
|
if self.services & 2 == 2 and self.connectionIsOrWasFullyEstablished:
|
||||||
|
amountSent = self.sslSock.send(data[:1000])
|
||||||
|
else:
|
||||||
|
amountSent = self.sock.send(data[:1000])
|
||||||
shared.numberOfBytesSent += amountSent # used for the 'network status' tab in the UI
|
shared.numberOfBytesSent += amountSent # used for the 'network status' tab in the UI
|
||||||
shared.numberOfBytesSentLastSecond += amountSent
|
shared.numberOfBytesSentLastSecond += amountSent
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
|
@ -178,6 +182,7 @@ class sendDataThread(threading.Thread):
|
||||||
break
|
break
|
||||||
elif command == 'connectionIsOrWasFullyEstablished':
|
elif command == 'connectionIsOrWasFullyEstablished':
|
||||||
self.connectionIsOrWasFullyEstablished = True
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
|
self.services, self.sslSock = data
|
||||||
else:
|
else:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||||
|
|
|
@ -144,7 +144,7 @@ def encodeHost(host):
|
||||||
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
|
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
|
||||||
payload = ''
|
payload = ''
|
||||||
payload += pack('>L', 3) # protocol version.
|
payload += pack('>L', 3) # protocol version.
|
||||||
payload += pack('>q', 1) # bitflags of the services I offer.
|
payload += pack('>q', 3) # bitflags of the services I offer.
|
||||||
payload += pack('>q', int(time.time()))
|
payload += pack('>q', int(time.time()))
|
||||||
|
|
||||||
payload += pack(
|
payload += pack(
|
||||||
|
|
Reference in New Issue
Block a user