Asyncore update

- Network status UI works but current speed isn't implemented yet
- Track per connection and global transferred bytes
- Add locking to write queue so that other threads can put stuff
there
- send ping on timeout (instead of closing the connection)
- implement open port checker (untested, never triggered yet)
- error handling on IO
This commit is contained in:
Peter Šurda 2017-05-25 14:59:18 +02:00
parent edcba9982b
commit 9683c879bc
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
9 changed files with 130 additions and 42 deletions

View File

@ -5,10 +5,10 @@ import shared
from tr import _translate from tr import _translate
from inventory import Inventory, PendingDownloadQueue, PendingUpload from inventory import Inventory, PendingDownloadQueue, PendingUpload
import l10n import l10n
import network.stats
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
from uisignaler import UISignaler from uisignaler import UISignaler
import widgets import widgets
import throttle
class NetworkStatus(QtGui.QWidget, RetranslateMixin): class NetworkStatus(QtGui.QWidget, RetranslateMixin):
@ -69,14 +69,15 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
sent and received by 2. sent and received by 2.
""" """
self.labelBytesRecvCount.setText(_translate( self.labelBytesRecvCount.setText(_translate(
"networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(throttle.ReceiveThrottle().getSpeed()), self.formatBytes(throttle.ReceiveThrottle().total))) "networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(network.stats.downloadSpeed()), self.formatBytes(network.stats.receivedBytes())))
self.labelBytesSentCount.setText(_translate( self.labelBytesSentCount.setText(_translate(
"networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(throttle.SendThrottle().getSpeed()), self.formatBytes(throttle.SendThrottle().total))) "networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(network.stats.uploadSpeed()), self.formatBytes(network.stats.sentBytes())))
def updateNetworkStatusTab(self): def updateNetworkStatusTab(self):
totalNumberOfConnectionsFromAllStreams = 0 # One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages). totalNumberOfConnectionsFromAllStreams = 0 # One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages).
streamNumberTotals = {} streamNumberTotals = {}
for host, streamNumber in shared.connectedHostsList.items(): connectedHosts = network.stats.connectedHostsList()
for host, streamNumber in connectedHosts:
if not streamNumber in streamNumberTotals: if not streamNumber in streamNumberTotals:
streamNumberTotals[streamNumber] = 1 streamNumberTotals[streamNumber] = 1
else: else:
@ -114,10 +115,10 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
self.tableWidgetConnectionCount.setItem(0,1,newItem) self.tableWidgetConnectionCount.setItem(0,1,newItem)
totalNumberOfConnectionsFromAllStreams += connectionCount""" totalNumberOfConnectionsFromAllStreams += connectionCount"""
self.labelTotalConnections.setText(_translate( self.labelTotalConnections.setText(_translate(
"networkstatus", "Total Connections: %1").arg(str(len(shared.connectedHostsList)))) "networkstatus", "Total Connections: %1").arg(str(len(connectedHosts))))
if len(shared.connectedHostsList) > 0 and shared.statusIconColor == 'red': # FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly. if len(connectedHosts) > 0 and shared.statusIconColor == 'red': # FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly.
self.window().setStatusIcon('yellow') self.window().setStatusIcon('yellow')
elif len(shared.connectedHostsList) == 0: elif len(connectedHosts) == 0:
self.window().setStatusIcon('red') self.window().setStatusIcon('red')
# timer driven # timer driven

View File

@ -1,3 +1,4 @@
from threading import RLock
import time import time
import asyncore_pollchoose as asyncore import asyncore_pollchoose as asyncore
@ -11,14 +12,19 @@ class AdvancedDispatcher(asyncore.dispatcher):
asyncore.dispatcher.__init__(self, sock) asyncore.dispatcher.__init__(self, sock)
self.read_buf = b"" self.read_buf = b""
self.write_buf = b"" self.write_buf = b""
self.writeLock = RLock()
self.state = "init" self.state = "init"
self.lastTx = time.time() self.lastTx = time.time()
self.sentBytes = 0
self.receivedBytes = 0
def append_write_buf(self, string = None): def append_write_buf(self, string = None):
with self.writeLock:
self.write_buf += string self.write_buf += string
def slice_write_buf(self, length=0): def slice_write_buf(self, length=0):
if length > 0: if length > 0:
with self.writeLock:
self.write_buf = self.write_buf[length:] self.write_buf = self.write_buf[length:]
def slice_read_buf(self, length=0): def slice_read_buf(self, length=0):
@ -58,9 +64,11 @@ class AdvancedDispatcher(asyncore.dispatcher):
if asyncore.maxDownloadRate > 0: if asyncore.maxDownloadRate > 0:
newData = self.recv(asyncore.downloadChunk) newData = self.recv(asyncore.downloadChunk)
asyncore.downloadBucket -= len(newData) asyncore.downloadBucket -= len(newData)
self.read_buf += newData
else: else:
self.read_buf += self.recv(AdvancedDispatcher._buf_len) newData = self.recv(AdvancedDispatcher._buf_len)
self.receivedBytes += len(newData)
asyncore.updateReceived(len(newData))
self.read_buf += newData
self.process() self.process()
def handle_write(self): def handle_write(self):
@ -70,6 +78,8 @@ class AdvancedDispatcher(asyncore.dispatcher):
asyncore.uploadBucket -= written asyncore.uploadBucket -= written
else: else:
written = self.send(self.write_buf) written = self.send(self.write_buf)
asyncore.updateSent(written)
self.sentBytes += written
self.slice_write_buf(written) self.slice_write_buf(written)
def handle_connect(self): def handle_connect(self):

View File

@ -93,10 +93,12 @@ maxDownloadRate = 0
downloadChunk = 0 downloadChunk = 0
downloadTimestamp = 0 downloadTimestamp = 0
downloadBucket = 0 downloadBucket = 0
receivedBytes = 0
maxUploadRate = 0 maxUploadRate = 0
uploadChunk = 0 uploadChunk = 0
uploadTimestamp = 0 uploadTimestamp = 0
uploadBucket = 0 uploadBucket = 0
sentBytes = 0
def read(obj): def read(obj):
try: try:
@ -127,6 +129,14 @@ def set_rates(download, upload):
downloadTimestamp = time.time() downloadTimestamp = time.time()
uploadTimestamp = time.time() uploadTimestamp = time.time()
def updateReceived(download=0):
global receivedBytes
receivedBytes += download
def updateSent(upload=0):
global sentBytes
sentBytes += upload
def wait_tx_buckets(): def wait_tx_buckets():
global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
if maxDownloadRate > 0 and maxUploadRate > 0: if maxDownloadRate > 0 and maxUploadRate > 0:
@ -150,8 +160,6 @@ def wait_tx_buckets():
uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate
uploadTimestamp = time.time() uploadTimestamp = time.time()
def _exception(obj): def _exception(obj):
try: try:
obj.handle_expt_event() obj.handle_expt_event()

View File

@ -28,7 +28,7 @@ from network.tls import TLSDispatcher
import addresses import addresses
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from queues import objectProcessorQueue from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue
import shared import shared
import state import state
import protocol import protocol
@ -57,6 +57,7 @@ class BMConnection(TLSDispatcher, BMQueues):
self.verackReceived = False self.verackReceived = False
self.verackSent = False self.verackSent = False
self.lastTx = time.time() self.lastTx = time.time()
self.streams = [0]
self.connectionFullyEstablished = False self.connectionFullyEstablished = False
self.connectedAt = 0 self.connectedAt = 0
self.skipUntil = 0 self.skipUntil = 0
@ -79,6 +80,7 @@ class BMConnection(TLSDispatcher, BMQueues):
print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) print "connecting in background to %s:%i" % (self.destination.host, self.destination.port)
shared.connectedHostsList[self.destination] = 0 shared.connectedHostsList[self.destination] = 0
BMQueues.__init__(self) BMQueues.__init__(self)
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
def bm_proto_reset(self): def bm_proto_reset(self):
self.magic = None self.magic = None
@ -115,6 +117,7 @@ class BMConnection(TLSDispatcher, BMQueues):
self.skipUntil = time.time() + now self.skipUntil = time.time() + now
def set_connection_fully_established(self): def set_connection_fully_established(self):
UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.antiIntersectionDelay(True) self.antiIntersectionDelay(True)
self.connectionFullyEstablished = True self.connectionFullyEstablished = True
self.sendAddr() self.sendAddr()
@ -369,6 +372,10 @@ class BMConnection(TLSDispatcher, BMQueues):
addresses = self.decode_payload_content("lQIQ16sH") addresses = self.decode_payload_content("lQIQ16sH")
return True return True
def bm_command_portcheck(self):
portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port))
return True
def bm_command_ping(self): def bm_command_ping(self):
self.append_write_buf(protocol.CreatePacket('pong')) self.append_write_buf(protocol.CreatePacket('pong'))
return True return True
@ -399,10 +406,11 @@ class BMConnection(TLSDispatcher, BMQueues):
print "my external IP: %s" % (self.sockNode.host) print "my external IP: %s" % (self.sockNode.host)
print "remote node incoming port: %i" % (self.peerNode.port) print "remote node incoming port: %i" % (self.peerNode.port)
print "user agent: %s" % (self.userAgent) print "user agent: %s" % (self.userAgent)
print "streams: [%s]" % (",".join(map(str,self.streams)))
if not self.peerValidityChecks(): if not self.peerValidityChecks():
# TODO ABORT # TODO ABORT
return True return True
shared.connectedHostsList[self.destination] = self.streams[0] #shared.connectedHostsList[self.destination] = self.streams[0]
self.append_write_buf(protocol.CreatePacket('verack')) self.append_write_buf(protocol.CreatePacket('verack'))
self.verackSent = True self.verackSent = True
if not self.isOutbound: if not self.isOutbound:
@ -611,10 +619,7 @@ class BMConnection(TLSDispatcher, BMQueues):
else: else:
print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason)
network.connectionpool.BMConnectionPool().removeConnection(self) network.connectionpool.BMConnectionPool().removeConnection(self)
try:
asyncore.dispatcher.close(self) asyncore.dispatcher.close(self)
except AttributeError:
pass
class Socks5BMConnection(Socks5Connection, BMConnection): class Socks5BMConnection(Socks5Connection, BMConnection):

View File

@ -1,11 +1,16 @@
from queues import Queue
import random import random
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
import knownnodes import knownnodes
from queues import portCheckerQueue
import state import state
def chooseConnection(stream): def chooseConnection(stream):
if state.trustedPeer: if state.trustedPeer:
return state.trustedPeer return state.trustedPeer
else: else:
try:
return portCheckerQueue.get(False)
except Queue.Empty:
return random.choice(knownnodes.knownNodes[stream].keys()) return random.choice(knownnodes.knownNodes[stream].keys())

View File

@ -145,4 +145,7 @@ class BMConnectionPool(object):
if i.connectionFullyEstablished: if i.connectionFullyEstablished:
minTx -= 300 - 20 minTx -= 300 - 20
if i.lastTx < minTx: if i.lastTx < minTx:
if i.connectionFullyEstablished:
i.append_write_buf(protocol.CreatePacket('ping'))
else:
i.close("Timeout (%is)" % (time.time() - i.lastTx)) i.close("Timeout (%is)" % (time.time() - i.lastTx))

43
src/network/stats.py Normal file
View File

@ -0,0 +1,43 @@
from bmconfigparser import BMConfigParser
from network.connectionpool import BMConnectionPool
import asyncore_pollchoose as asyncore
import shared
import throttle
def connectedHostsList():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
retval = []
for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
if not i.connected:
continue
try:
retval.append((i.destination, i.streams[0]))
except AttributeError:
pass
return retval
else:
return shared.connectedHostsList.items()
def sentBytes():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return asyncore.sentBytes
else:
return throttle.SendThrottle().total
def uploadSpeed():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return 0
else:
return throttle.sendThrottle().getSpeed()
def receivedBytes():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return asyncore.receivedBytes
else:
return throttle.ReceiveThrottle().total
def downloadSpeed():
if BMConfigParser().safeGetBoolean("network", "asyncore"):
return 0
else:
return throttle.ReceiveThrottle().getSpeed()

View File

@ -59,20 +59,27 @@ class TLSDispatcher(AdvancedDispatcher):
# self.socket.context.set_ecdh_curve("secp256k1") # self.socket.context.set_ecdh_curve("secp256k1")
def writable(self): def writable(self):
try:
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
#print "tls writable, %r" % (self.want_write) #print "tls writable, %r" % (self.want_write)
return self.want_write return self.want_write
else: else:
return AdvancedDispatcher.writable(self) return AdvancedDispatcher.writable(self)
except AttributeError:
return AdvancedDispatcher.writable(self)
def readable(self): def readable(self):
try:
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
#print "tls readable, %r" % (self.want_read) #print "tls readable, %r" % (self.want_read)
return self.want_read return self.want_read
else: else:
return AdvancedDispatcher.readable(self) return AdvancedDispatcher.readable(self)
except AttributeError:
return AdvancedDispatcher.readable(self)
def handle_read(self): def handle_read(self):
try:
# wait for write buffer flush # wait for write buffer flush
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
#print "handshaking (read)" #print "handshaking (read)"
@ -80,8 +87,11 @@ class TLSDispatcher(AdvancedDispatcher):
else: else:
#print "not handshaking (read)" #print "not handshaking (read)"
return AdvancedDispatcher.handle_read(self) return AdvancedDispatcher.handle_read(self)
except AttributeError:
return AdvancedDispatcher.handle_read(self)
def handle_write(self): def handle_write(self):
try:
# wait for write buffer flush # wait for write buffer flush
if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0:
#print "handshaking (write)" #print "handshaking (write)"
@ -89,6 +99,8 @@ class TLSDispatcher(AdvancedDispatcher):
else: else:
#print "not handshaking (write)" #print "not handshaking (write)"
return AdvancedDispatcher.handle_write(self) return AdvancedDispatcher.handle_write(self)
except AttributeError:
return AdvancedDispatcher.handle_read(self)
def state_tls_handshake(self): def state_tls_handshake(self):
# wait for flush # wait for flush

View File

@ -6,5 +6,6 @@ UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue()
# receiveDataThreads dump objects they hear on the network into this queue to be processed. # receiveDataThreads dump objects they hear on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
portCheckerQueue = Queue.Queue()
apiAddressGeneratorReturnQueue = Queue.Queue( apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread. ) # The address generator thread uses this queue to get information back to the API thread.