Transfer speed improvements
- send buffer to send multiple commands in one TCP packet - recv/send operation size now based on bandwith limit - send queue limited to 100 entries - buffer getdata commands to fill send queue, instead of waiting for the data packet to arrive first (i.e. allow getdata to work asynchronously)
This commit is contained in:
parent
c3fef7bc4e
commit
6d2a75bfc9
|
@ -191,7 +191,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
|
||||||
try:
|
try:
|
||||||
self.sock.connect((peer.host, peer.port))
|
self.sock.connect((peer.host, peer.port))
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection.
|
||||||
|
|
||||||
sd = sendDataThread(sendDataThreadQueue)
|
sd = sendDataThread(sendDataThreadQueue)
|
||||||
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
|
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
|
||||||
|
|
|
@ -90,9 +90,9 @@ class receiveDataThread(threading.Thread):
|
||||||
self.connectionIsOrWasFullyEstablished and
|
self.connectionIsOrWasFullyEstablished and
|
||||||
protocol.haveSSL(not self.initiatedConnection)):
|
protocol.haveSSL(not self.initiatedConnection)):
|
||||||
ssl = True
|
ssl = True
|
||||||
dataRecv = self.sslSock.recv(4096)
|
dataRecv = self.sslSock.recv(throttle.ReceiveThrottle().chunkSize)
|
||||||
else:
|
else:
|
||||||
dataRecv = self.sock.recv(4096)
|
dataRecv = self.sock.recv(throttle.ReceiveThrottle().chunkSize)
|
||||||
self.data += dataRecv
|
self.data += dataRecv
|
||||||
throttle.ReceiveThrottle().wait(len(dataRecv))
|
throttle.ReceiveThrottle().wait(len(dataRecv))
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
|
@ -222,7 +222,7 @@ class receiveDataThread(threading.Thread):
|
||||||
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
|
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
|
||||||
|
|
||||||
if self.data == '': # if there are no more messages
|
if self.data == '': # if there are no more messages
|
||||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0 and not self.sendDataThreadQueue.full():
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||||
if objectHash in Inventory():
|
if objectHash in Inventory():
|
||||||
|
@ -240,7 +240,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
break
|
|
||||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||||
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
|
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
|
||||||
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
|
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
|
||||||
|
|
|
@ -42,6 +42,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
|
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.services = 0
|
self.services = 0
|
||||||
|
self.buffer = ""
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
self.remoteProtocolVersion = - \
|
self.remoteProtocolVersion = - \
|
||||||
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
|
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
|
||||||
|
@ -69,36 +70,32 @@ class sendDataThread(threading.Thread):
|
||||||
|
|
||||||
self.versionSent = 1
|
self.versionSent = 1
|
||||||
|
|
||||||
def sendBytes(self, data):
|
def sendBytes(self, data = ""):
|
||||||
while data:
|
self.buffer += data
|
||||||
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
|
if len(self.buffer) < throttle.SendThrottle().chunkSize and self.sendDataThreadQueue.qsize() > 1:
|
||||||
self.connectionIsOrWasFullyEstablished and
|
return
|
||||||
protocol.haveSSL(not self.initiatedConnection)):
|
|
||||||
while state.shutdown == 0:
|
while self.buffer and state.shutdown == 0:
|
||||||
try:
|
try:
|
||||||
amountSent = self.sslSock.send(data[:4096])
|
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
|
||||||
break
|
self.connectionIsOrWasFullyEstablished and
|
||||||
except socket.error as e:
|
protocol.haveSSL(not self.initiatedConnection)):
|
||||||
if e.errno == errno.EAGAIN:
|
amountSent = self.sslSock.send(self.buffer[:throttle.SendThrottle().chunkSize])
|
||||||
continue
|
else:
|
||||||
raise
|
amountSent = self.sock.send(self.buffer[:throttle.SendThrottle().chunkSize])
|
||||||
else:
|
except socket.error as e:
|
||||||
while True:
|
if e.errno == errno.EAGAIN:
|
||||||
try:
|
continue
|
||||||
amountSent = self.sock.send(data[:4096])
|
raise
|
||||||
break
|
|
||||||
except socket.error as e:
|
|
||||||
if e.errno == errno.EAGAIN:
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
throttle.SendThrottle().wait(amountSent)
|
throttle.SendThrottle().wait(amountSent)
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
data = data[amountSent:]
|
self.buffer = self.buffer[amountSent:]
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(state.sendDataQueues)))
|
logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(state.sendDataQueues)))
|
||||||
while True:
|
while True:
|
||||||
|
self.sendBytes()
|
||||||
deststream, command, data = self.sendDataThreadQueue.get()
|
deststream, command, data = self.sendDataThreadQueue.get()
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == self.streamNumber or deststream == 0:
|
||||||
|
|
|
@ -134,7 +134,7 @@ class singleListener(threading.Thread, StoppableThread):
|
||||||
break
|
break
|
||||||
|
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection.
|
||||||
socketObject.settimeout(20)
|
socketObject.settimeout(20)
|
||||||
|
|
||||||
sd = sendDataThread(sendDataThreadQueue)
|
sd = sendDataThread(sendDataThreadQueue)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import math
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -6,14 +7,19 @@ from singleton import Singleton
|
||||||
import state
|
import state
|
||||||
|
|
||||||
class Throttle(object):
|
class Throttle(object):
|
||||||
|
minChunkSize = 4096
|
||||||
|
maxChunkSize = 131072
|
||||||
|
|
||||||
def __init__(self, limit=0):
|
def __init__(self, limit=0):
|
||||||
self.limit = limit
|
self.limit = limit
|
||||||
self.speed = 0
|
self.speed = 0
|
||||||
|
self.chunkSize = Throttle.maxChunkSize
|
||||||
self.txTime = int(time.time())
|
self.txTime = int(time.time())
|
||||||
self.txLen = 0
|
self.txLen = 0
|
||||||
self.total = 0
|
self.total = 0
|
||||||
self.timer = threading.Event()
|
self.timer = threading.Event()
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
|
self.resetChunkSize()
|
||||||
|
|
||||||
def recalculate(self):
|
def recalculate(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
@ -41,6 +47,19 @@ class Throttle(object):
|
||||||
self.recalculate()
|
self.recalculate()
|
||||||
return self.speed
|
return self.speed
|
||||||
|
|
||||||
|
def resetChunkSize(self):
|
||||||
|
with self.lock:
|
||||||
|
# power of two smaller or equal to speed limit
|
||||||
|
try:
|
||||||
|
self.chunkSize = int(math.pow(2, int(math.log(self.limit,2))))
|
||||||
|
except ValueError:
|
||||||
|
self.chunkSize = Throttle.maxChunkSize
|
||||||
|
# range check
|
||||||
|
if self.chunkSize < Throttle.minChunkSize:
|
||||||
|
self.chunkSize = Throttle.minChunkSize
|
||||||
|
elif self.chunkSize > Throttle.maxChunkSize:
|
||||||
|
self.chunkSize = Throttle.maxChunkSize
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
class SendThrottle(Throttle):
|
class SendThrottle(Throttle):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -49,6 +68,7 @@ class SendThrottle(Throttle):
|
||||||
def resetLimit(self):
|
def resetLimit(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024
|
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024
|
||||||
|
Throttle.resetChunkSize()
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
class ReceiveThrottle(Throttle):
|
class ReceiveThrottle(Throttle):
|
||||||
|
@ -58,3 +78,4 @@ class ReceiveThrottle(Throttle):
|
||||||
def resetLimit(self):
|
def resetLimit(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024
|
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024
|
||||||
|
Throttle.resetChunkSize()
|
||||||
|
|
Reference in New Issue
Block a user