diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index 9ea361cc..b8e3193f 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -191,7 +191,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): try: self.sock.connect((peer.host, peer.port)) someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. - sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. + sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. sd = sendDataThread(sendDataThreadQueue) sd.setup(self.sock, peer.host, peer.port, self.streamNumber, diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index deb1e05b..1abb8e74 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -90,9 +90,9 @@ class receiveDataThread(threading.Thread): self.connectionIsOrWasFullyEstablished and protocol.haveSSL(not self.initiatedConnection)): ssl = True - dataRecv = self.sslSock.recv(4096) + dataRecv = self.sslSock.recv(throttle.ReceiveThrottle().chunkSize) else: - dataRecv = self.sock.recv(4096) + dataRecv = self.sock.recv(throttle.ReceiveThrottle().chunkSize) self.data += dataRecv throttle.ReceiveThrottle().wait(len(dataRecv)) except socket.timeout: @@ -222,7 +222,7 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages - while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: + while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0 and not self.sendDataThreadQueue.full(): objectHash, = random.sample( self.objectsThatWeHaveYetToGetFromThisPeer, 1) if objectHash in Inventory(): @@ -240,7 +240,6 @@ class receiveDataThread(threading.Thread): self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. except: pass - break if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: # We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore. logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0') diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 76409050..55e674cb 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -42,6 +42,7 @@ class sendDataThread(threading.Thread): self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator self.streamNumber = streamNumber self.services = 0 + self.buffer = "" self.initiatedConnection = False self.remoteProtocolVersion = - \ 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue. @@ -69,36 +70,32 @@ class sendDataThread(threading.Thread): self.versionSent = 1 - def sendBytes(self, data): - while data: - if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and - self.connectionIsOrWasFullyEstablished and - protocol.haveSSL(not self.initiatedConnection)): - while state.shutdown == 0: - try: - amountSent = self.sslSock.send(data[:4096]) - break - except socket.error as e: - if e.errno == errno.EAGAIN: - continue - raise - else: - while True: - try: - amountSent = self.sock.send(data[:4096]) - break - except socket.error as e: - if e.errno == errno.EAGAIN: - continue - raise + def sendBytes(self, data = ""): + self.buffer += data + if len(self.buffer) < throttle.SendThrottle().chunkSize and self.sendDataThreadQueue.qsize() > 1: + return + + while self.buffer and state.shutdown == 0: + try: + if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and + self.connectionIsOrWasFullyEstablished and + protocol.haveSSL(not self.initiatedConnection)): + amountSent = self.sslSock.send(self.buffer[:throttle.SendThrottle().chunkSize]) + else: + amountSent = self.sock.send(self.buffer[:throttle.SendThrottle().chunkSize]) + except socket.error as e: + if e.errno == errno.EAGAIN: + continue + raise throttle.SendThrottle().wait(amountSent) self.lastTimeISentData = int(time.time()) - data = data[amountSent:] + self.buffer = self.buffer[amountSent:] def run(self): logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(state.sendDataQueues))) while True: + self.sendBytes() deststream, command, data = self.sendDataThreadQueue.get() if deststream == self.streamNumber or deststream == 0: diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 49acc71c..65c0a8a8 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -134,7 +134,7 @@ class singleListener(threading.Thread, StoppableThread): break someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. - sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. + sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. socketObject.settimeout(20) sd = sendDataThread(sendDataThreadQueue) diff --git a/src/throttle.py b/src/throttle.py index e58ca11a..9578adc5 100644 --- a/src/throttle.py +++ b/src/throttle.py @@ -1,3 +1,4 @@ +import math import threading import time @@ -6,14 +7,19 @@ from singleton import Singleton import state class Throttle(object): + minChunkSize = 4096 + maxChunkSize = 131072 + def __init__(self, limit=0): self.limit = limit self.speed = 0 + self.chunkSize = Throttle.maxChunkSize self.txTime = int(time.time()) self.txLen = 0 self.total = 0 self.timer = threading.Event() self.lock = threading.RLock() + self.resetChunkSize() def recalculate(self): with self.lock: @@ -41,6 +47,19 @@ class Throttle(object): self.recalculate() return self.speed + def resetChunkSize(self): + with self.lock: + # power of two smaller or equal to speed limit + try: + self.chunkSize = int(math.pow(2, int(math.log(self.limit,2)))) + except ValueError: + self.chunkSize = Throttle.maxChunkSize + # range check + if self.chunkSize < Throttle.minChunkSize: + self.chunkSize = Throttle.minChunkSize + elif self.chunkSize > Throttle.maxChunkSize: + self.chunkSize = Throttle.maxChunkSize + @Singleton class SendThrottle(Throttle): def __init__(self): @@ -49,6 +68,7 @@ class SendThrottle(Throttle): def resetLimit(self): with self.lock: self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024 + Throttle.resetChunkSize() @Singleton class ReceiveThrottle(Throttle): @@ -58,3 +78,4 @@ class ReceiveThrottle(Throttle): def resetLimit(self): with self.lock: self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024 + Throttle.resetChunkSize()