From 689d697a407599214e06fa312ee9e389b6848ce8 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 14 Jan 2017 23:21:00 +0100 Subject: [PATCH] Refactor bandwidth limit and speed calculator - also fixes potential deadlocks --- src/bitmessageqt/__init__.py | 3 ++ src/bitmessageqt/networkstatus.py | 12 ++---- src/class_receiveDataThread.py | 21 ++-------- src/class_sendDataThread.py | 58 +++++++++++++--------------- src/shared.py | 8 ---- src/throttle.py | 64 +++++++++++++++++++++++++++++++ 6 files changed, 101 insertions(+), 65 deletions(-) create mode 100644 src/throttle.py diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 91d62564..ad89be30 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -82,6 +82,7 @@ from proofofwork import getPowType import protocol import state from statusbar import BMStatusBar +import throttle from version import softwareVersion def _translate(context, text, disambiguation = None, encoding = None, number = None): @@ -2391,6 +2392,8 @@ class MyForm(settingsmixin.SMainWindow): except: QMessageBox.about(self, _translate("MainWindow", "Number needed"), _translate( "MainWindow", "Your maximum download and upload rate must be numbers. Ignoring what you typed.")) + throttle.SendThrottle.resetLimit() + throttle.ReceiveThrottle.resetLimit() BMConfigParser().set('bitmessagesettings', 'namecoinrpctype', self.settingsDialogInstance.getNamecoinType()) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index c792f81c..2fe493ac 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -7,6 +7,7 @@ import l10n from retranslateui import RetranslateMixin from uisignaler import UISignaler import widgets +import throttle class NetworkStatus(QtGui.QWidget, RetranslateMixin): @@ -28,9 +29,6 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL( "updateNetworkStatusTab()"), self.updateNetworkStatusTab) - self.totalNumberOfBytesReceived = 0 - self.totalNumberOfBytesSent = 0 - self.timer = QtCore.QTimer() self.timer.start(2000) # milliseconds QtCore.QObject.connect(self.timer, QtCore.SIGNAL("timeout()"), self.runEveryTwoSeconds) @@ -70,13 +68,9 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): sent and received by 2. """ self.labelBytesRecvCount.setText(_translate( - "networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(shared.numberOfBytesReceived/2), self.formatBytes(self.totalNumberOfBytesReceived))) + "networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(throttle.ReceiveThrottle().getSpeed()), self.formatBytes(throttle.ReceiveThrottle().total))) self.labelBytesSentCount.setText(_translate( - "networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(shared.numberOfBytesSent/2), self.formatBytes(self.totalNumberOfBytesSent))) - self.totalNumberOfBytesReceived += shared.numberOfBytesReceived - self.totalNumberOfBytesSent += shared.numberOfBytesSent - shared.numberOfBytesReceived = 0 - shared.numberOfBytesSent = 0 + "networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(throttle.SendThrottle().getSpeed()), self.formatBytes(throttle.SendThrottle().total))) def updateNetworkStatusTab(self): totalNumberOfConnectionsFromAllStreams = 0 # One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages). diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index ca4430ab..deb1e05b 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -32,6 +32,7 @@ import paths import protocol from inventory import Inventory import state +import throttle import tr from version import softwareVersion @@ -82,19 +83,6 @@ class receiveDataThread(threading.Thread): logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) while True: - if BMConfigParser().getint('bitmessagesettings', 'maxdownloadrate') == 0: - downloadRateLimitBytes = float("inf") - else: - downloadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxdownloadrate') * 1000 - with shared.receiveDataLock: - while shared.numberOfBytesReceivedLastSecond >= downloadRateLimitBytes: - if int(time.time()) == shared.lastTimeWeResetBytesReceived: - # If it's still the same second that it was last time then sleep. - time.sleep(0.3) - else: - # It's a new second. Let us clear the shared.numberOfBytesReceivedLastSecond. - shared.lastTimeWeResetBytesReceived = int(time.time()) - shared.numberOfBytesReceivedLastSecond = 0 dataLen = len(self.data) try: ssl = False @@ -102,12 +90,11 @@ class receiveDataThread(threading.Thread): self.connectionIsOrWasFullyEstablished and protocol.haveSSL(not self.initiatedConnection)): ssl = True - dataRecv = self.sslSock.recv(1024) + dataRecv = self.sslSock.recv(4096) else: - dataRecv = self.sock.recv(1024) + dataRecv = self.sock.recv(4096) self.data += dataRecv - shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates. - shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit + throttle.ReceiveThrottle().wait(len(dataRecv)) except socket.timeout: logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')') break diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index e1c03c98..d619d451 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -1,3 +1,4 @@ +import errno import time import threading import shared @@ -15,6 +16,7 @@ from addresses import * from debug import logger import protocol import state +import throttle # Every connection to a peer has a sendDataThread (and also a # receiveDataThread). @@ -70,36 +72,30 @@ class sendDataThread(threading.Thread): self.versionSent = 1 def sendBytes(self, data): - if BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') == 0: - uploadRateLimitBytes = 999999999 # float("inf") doesn't work - else: - uploadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') * 1000 - with shared.sendDataLock: - while data: - while shared.numberOfBytesSentLastSecond >= uploadRateLimitBytes: - if int(time.time()) == shared.lastTimeWeResetBytesSent: - time.sleep(0.3) - else: - # It's a new second. Let us clear the shared.numberOfBytesSentLastSecond - shared.lastTimeWeResetBytesSent = int(time.time()) - shared.numberOfBytesSentLastSecond = 0 - # If the user raises or lowers the uploadRateLimit then we should make use of - # the new setting. If we are hitting the limit then we'll check here about - # once per second. - if BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') == 0: - uploadRateLimitBytes = 999999999 # float("inf") doesn't work - else: - uploadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') * 1000 - if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and - self.connectionIsOrWasFullyEstablished and - protocol.haveSSL(not self.initiatedConnection)): - amountSent = self.sslSock.send(data[:1000]) - else: - amountSent = self.sock.send(data[:1000]) - shared.numberOfBytesSent += amountSent # used for the 'network status' tab in the UI - shared.numberOfBytesSentLastSecond += amountSent - self.lastTimeISentData = int(time.time()) - data = data[amountSent:] + while data: + if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and + self.connectionIsOrWasFullyEstablished and + protocol.haveSSL(not self.initiatedConnection)): + while state.shutdown == 0: + try: + amountSent = self.sslSock.send(data[:4096]) + break + except socket.error as e: + if e.errno == errno.EAGAIN: + continue + raise + else: + while True: + try: + amountSent = self.sock.send(data[:4096]) + break + except socket.error as e: + if e.errno == errno.EAGAIN: + continue + raise + throttle.SendThrottle().wait(amountSent) + self.lastTimeISentData = int(time.time()) + data = data[amountSent:] def run(self): @@ -178,7 +174,7 @@ class sendDataThread(threading.Thread): try: self.sendBytes(data) except: - logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.') + logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.', exc_info=True) break elif command == 'connectionIsOrWasFullyEstablished': self.connectionIsOrWasFullyEstablished = True diff --git a/src/shared.py b/src/shared.py index 82d08d35..be291a75 100644 --- a/src/shared.py +++ b/src/shared.py @@ -67,14 +67,6 @@ clientHasReceivedIncomingConnections = False #used by API command clientStatus numberOfMessagesProcessed = 0 numberOfBroadcastsProcessed = 0 numberOfPubkeysProcessed = 0 -numberOfBytesReceived = 0 # Used for the 'network status' page -numberOfBytesSent = 0 # Used for the 'network status' page -numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit -numberOfBytesSentLastSecond = 0 # used for the bandwidth rate limit -lastTimeWeResetBytesReceived = 0 # used for the bandwidth rate limit -lastTimeWeResetBytesSent = 0 # used for the bandwidth rate limit -sendDataLock = threading.Lock() # used for the bandwidth rate limit -receiveDataLock = threading.Lock() # used for the bandwidth rate limit daemon = False needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually. maximumLengthOfTimeToBotherResendingMessages = 0 diff --git a/src/throttle.py b/src/throttle.py new file mode 100644 index 00000000..5b503e1e --- /dev/null +++ b/src/throttle.py @@ -0,0 +1,64 @@ +import threading +import time + +from configparser import BMConfigParser +from singleton import Singleton +import state + +class Throttle(object): + def __init__(self, limit=0): + self.limit = limit + self.speed = 0 + self.txTime = int(time.time()) + self.txLen = 0 + self.total = 0 + self.timer = threading.Event() + self.lock = threading.RLock() + + def recalculate(self): + with self.lock: + now = int(time.time()) + if now > self.txTime: + self.speed = self.txLen / (now - self.txTime) + self.txLen -= self.limit * (now - self.txTime) + self.txTime = now + if self.txLen < 0 or self.limit == 0: + self.txLen = 0 + + def wait(self, dataLen): + with self.lock: + self.waiting = True + with self.lock: + self.txLen += dataLen + self.total += dataLen + while state.shutdown == 0: + self.recalculate() + if self.limit == 0: + break + if self.txLen < self.limit: + break + self.timer.wait(0.2) + with self.lock: + self.waiting = False + + def getSpeed(self): + self.recalculate() + return self.speed + +@Singleton +class SendThrottle(Throttle): + def __init__(self): + Throttle.__init__(self, BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024) + + def resetLimit(self): + with self.lock: + self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024 + +@Singleton +class ReceiveThrottle(Throttle): + def __init__(self): + Throttle.__init__(self, BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024) + + def resetLimit(self): + with self.lock: + self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024