Refactor bandwidth limit and speed calculator

- also fixes potential deadlocks
This commit is contained in:
Peter Šurda 2017-01-14 23:21:00 +01:00
parent ad75552b5c
commit 689d697a40
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
6 changed files with 101 additions and 65 deletions

View File

@ -82,6 +82,7 @@ from proofofwork import getPowType
import protocol
import state
from statusbar import BMStatusBar
import throttle
from version import softwareVersion
def _translate(context, text, disambiguation = None, encoding = None, number = None):
@ -2391,6 +2392,8 @@ class MyForm(settingsmixin.SMainWindow):
except:
QMessageBox.about(self, _translate("MainWindow", "Number needed"), _translate(
"MainWindow", "Your maximum download and upload rate must be numbers. Ignoring what you typed."))
throttle.SendThrottle.resetLimit()
throttle.ReceiveThrottle.resetLimit()
BMConfigParser().set('bitmessagesettings', 'namecoinrpctype',
self.settingsDialogInstance.getNamecoinType())

View File

@ -7,6 +7,7 @@ import l10n
from retranslateui import RetranslateMixin
from uisignaler import UISignaler
import widgets
import throttle
class NetworkStatus(QtGui.QWidget, RetranslateMixin):
@ -28,9 +29,6 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL(
"updateNetworkStatusTab()"), self.updateNetworkStatusTab)
self.totalNumberOfBytesReceived = 0
self.totalNumberOfBytesSent = 0
self.timer = QtCore.QTimer()
self.timer.start(2000) # milliseconds
QtCore.QObject.connect(self.timer, QtCore.SIGNAL("timeout()"), self.runEveryTwoSeconds)
@ -70,13 +68,9 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
sent and received by 2.
"""
self.labelBytesRecvCount.setText(_translate(
"networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(shared.numberOfBytesReceived/2), self.formatBytes(self.totalNumberOfBytesReceived)))
"networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(throttle.ReceiveThrottle().getSpeed()), self.formatBytes(throttle.ReceiveThrottle().total)))
self.labelBytesSentCount.setText(_translate(
"networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(shared.numberOfBytesSent/2), self.formatBytes(self.totalNumberOfBytesSent)))
self.totalNumberOfBytesReceived += shared.numberOfBytesReceived
self.totalNumberOfBytesSent += shared.numberOfBytesSent
shared.numberOfBytesReceived = 0
shared.numberOfBytesSent = 0
"networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(throttle.SendThrottle().getSpeed()), self.formatBytes(throttle.SendThrottle().total)))
def updateNetworkStatusTab(self):
totalNumberOfConnectionsFromAllStreams = 0 # One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages).

View File

@ -32,6 +32,7 @@ import paths
import protocol
from inventory import Inventory
import state
import throttle
import tr
from version import softwareVersion
@ -82,19 +83,6 @@ class receiveDataThread(threading.Thread):
logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
while True:
if BMConfigParser().getint('bitmessagesettings', 'maxdownloadrate') == 0:
downloadRateLimitBytes = float("inf")
else:
downloadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxdownloadrate') * 1000
with shared.receiveDataLock:
while shared.numberOfBytesReceivedLastSecond >= downloadRateLimitBytes:
if int(time.time()) == shared.lastTimeWeResetBytesReceived:
# If it's still the same second that it was last time then sleep.
time.sleep(0.3)
else:
# It's a new second. Let us clear the shared.numberOfBytesReceivedLastSecond.
shared.lastTimeWeResetBytesReceived = int(time.time())
shared.numberOfBytesReceivedLastSecond = 0
dataLen = len(self.data)
try:
ssl = False
@ -102,12 +90,11 @@ class receiveDataThread(threading.Thread):
self.connectionIsOrWasFullyEstablished and
protocol.haveSSL(not self.initiatedConnection)):
ssl = True
dataRecv = self.sslSock.recv(1024)
dataRecv = self.sslSock.recv(4096)
else:
dataRecv = self.sock.recv(1024)
dataRecv = self.sock.recv(4096)
self.data += dataRecv
shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates.
shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit
throttle.ReceiveThrottle().wait(len(dataRecv))
except socket.timeout:
logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
break

View File

@ -1,3 +1,4 @@
import errno
import time
import threading
import shared
@ -15,6 +16,7 @@ from addresses import *
from debug import logger
import protocol
import state
import throttle
# Every connection to a peer has a sendDataThread (and also a
# receiveDataThread).
@ -70,36 +72,30 @@ class sendDataThread(threading.Thread):
self.versionSent = 1
def sendBytes(self, data):
if BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') == 0:
uploadRateLimitBytes = 999999999 # float("inf") doesn't work
else:
uploadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') * 1000
with shared.sendDataLock:
while data:
while shared.numberOfBytesSentLastSecond >= uploadRateLimitBytes:
if int(time.time()) == shared.lastTimeWeResetBytesSent:
time.sleep(0.3)
else:
# It's a new second. Let us clear the shared.numberOfBytesSentLastSecond
shared.lastTimeWeResetBytesSent = int(time.time())
shared.numberOfBytesSentLastSecond = 0
# If the user raises or lowers the uploadRateLimit then we should make use of
# the new setting. If we are hitting the limit then we'll check here about
# once per second.
if BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') == 0:
uploadRateLimitBytes = 999999999 # float("inf") doesn't work
else:
uploadRateLimitBytes = BMConfigParser().getint('bitmessagesettings', 'maxuploadrate') * 1000
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
self.connectionIsOrWasFullyEstablished and
protocol.haveSSL(not self.initiatedConnection)):
amountSent = self.sslSock.send(data[:1000])
else:
amountSent = self.sock.send(data[:1000])
shared.numberOfBytesSent += amountSent # used for the 'network status' tab in the UI
shared.numberOfBytesSentLastSecond += amountSent
self.lastTimeISentData = int(time.time())
data = data[amountSent:]
while data:
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
self.connectionIsOrWasFullyEstablished and
protocol.haveSSL(not self.initiatedConnection)):
while state.shutdown == 0:
try:
amountSent = self.sslSock.send(data[:4096])
break
except socket.error as e:
if e.errno == errno.EAGAIN:
continue
raise
else:
while True:
try:
amountSent = self.sock.send(data[:4096])
break
except socket.error as e:
if e.errno == errno.EAGAIN:
continue
raise
throttle.SendThrottle().wait(amountSent)
self.lastTimeISentData = int(time.time())
data = data[amountSent:]
def run(self):
@ -178,7 +174,7 @@ class sendDataThread(threading.Thread):
try:
self.sendBytes(data)
except:
logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.')
logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.', exc_info=True)
break
elif command == 'connectionIsOrWasFullyEstablished':
self.connectionIsOrWasFullyEstablished = True

View File

@ -67,14 +67,6 @@ clientHasReceivedIncomingConnections = False #used by API command clientStatus
numberOfMessagesProcessed = 0
numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0
numberOfBytesReceived = 0 # Used for the 'network status' page
numberOfBytesSent = 0 # Used for the 'network status' page
numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit
numberOfBytesSentLastSecond = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesReceived = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesSent = 0 # used for the bandwidth rate limit
sendDataLock = threading.Lock() # used for the bandwidth rate limit
receiveDataLock = threading.Lock() # used for the bandwidth rate limit
daemon = False
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
maximumLengthOfTimeToBotherResendingMessages = 0

64
src/throttle.py Normal file
View File

@ -0,0 +1,64 @@
import threading
import time
from configparser import BMConfigParser
from singleton import Singleton
import state
class Throttle(object):
def __init__(self, limit=0):
self.limit = limit
self.speed = 0
self.txTime = int(time.time())
self.txLen = 0
self.total = 0
self.timer = threading.Event()
self.lock = threading.RLock()
def recalculate(self):
with self.lock:
now = int(time.time())
if now > self.txTime:
self.speed = self.txLen / (now - self.txTime)
self.txLen -= self.limit * (now - self.txTime)
self.txTime = now
if self.txLen < 0 or self.limit == 0:
self.txLen = 0
def wait(self, dataLen):
with self.lock:
self.waiting = True
with self.lock:
self.txLen += dataLen
self.total += dataLen
while state.shutdown == 0:
self.recalculate()
if self.limit == 0:
break
if self.txLen < self.limit:
break
self.timer.wait(0.2)
with self.lock:
self.waiting = False
def getSpeed(self):
self.recalculate()
return self.speed
@Singleton
class SendThrottle(Throttle):
def __init__(self):
Throttle.__init__(self, BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024)
def resetLimit(self):
with self.lock:
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024
@Singleton
class ReceiveThrottle(Throttle):
def __init__(self):
Throttle.__init__(self, BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024)
def resetLimit(self):
with self.lock:
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024