From 9683c879bc778135a3bdb233c1258771f652ab7d Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 25 May 2017 14:59:18 +0200 Subject: [PATCH] Asyncore update - Network status UI works but current speed isn't implemented yet - Track per connection and global transferred bytes - Add locking to write queue so that other threads can put stuff there - send ping on timeout (instead of closing the connection) - implement open port checker (untested, never triggered yet) - error handling on IO --- src/bitmessageqt/networkstatus.py | 15 +++++---- src/network/advanceddispatcher.py | 18 +++++++--- src/network/asyncore_pollchoose.py | 12 +++++-- src/network/bmproto.py | 17 ++++++---- src/network/connectionchooser.py | 7 +++- src/network/connectionpool.py | 5 ++- src/network/stats.py | 43 ++++++++++++++++++++++++ src/network/tls.py | 54 ++++++++++++++++++------------ src/queues.py | 1 + 9 files changed, 130 insertions(+), 42 deletions(-) create mode 100644 src/network/stats.py diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index b5870a6b..158f02fa 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -5,10 +5,10 @@ import shared from tr import _translate from inventory import Inventory, PendingDownloadQueue, PendingUpload import l10n +import network.stats from retranslateui import RetranslateMixin from uisignaler import UISignaler import widgets -import throttle class NetworkStatus(QtGui.QWidget, RetranslateMixin): @@ -69,14 +69,15 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): sent and received by 2. """ self.labelBytesRecvCount.setText(_translate( - "networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(throttle.ReceiveThrottle().getSpeed()), self.formatBytes(throttle.ReceiveThrottle().total))) + "networkstatus", "Down: %1/s Total: %2").arg(self.formatByteRate(network.stats.downloadSpeed()), self.formatBytes(network.stats.receivedBytes()))) self.labelBytesSentCount.setText(_translate( - "networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(throttle.SendThrottle().getSpeed()), self.formatBytes(throttle.SendThrottle().total))) + "networkstatus", "Up: %1/s Total: %2").arg(self.formatByteRate(network.stats.uploadSpeed()), self.formatBytes(network.stats.sentBytes()))) def updateNetworkStatusTab(self): totalNumberOfConnectionsFromAllStreams = 0 # One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages). streamNumberTotals = {} - for host, streamNumber in shared.connectedHostsList.items(): + connectedHosts = network.stats.connectedHostsList() + for host, streamNumber in connectedHosts: if not streamNumber in streamNumberTotals: streamNumberTotals[streamNumber] = 1 else: @@ -114,10 +115,10 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): self.tableWidgetConnectionCount.setItem(0,1,newItem) totalNumberOfConnectionsFromAllStreams += connectionCount""" self.labelTotalConnections.setText(_translate( - "networkstatus", "Total Connections: %1").arg(str(len(shared.connectedHostsList)))) - if len(shared.connectedHostsList) > 0 and shared.statusIconColor == 'red': # FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly. + "networkstatus", "Total Connections: %1").arg(str(len(connectedHosts)))) + if len(connectedHosts) > 0 and shared.statusIconColor == 'red': # FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly. self.window().setStatusIcon('yellow') - elif len(shared.connectedHostsList) == 0: + elif len(connectedHosts) == 0: self.window().setStatusIcon('red') # timer driven diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index f4ba120e..d1b5f567 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -1,3 +1,4 @@ +from threading import RLock import time import asyncore_pollchoose as asyncore @@ -11,15 +12,20 @@ class AdvancedDispatcher(asyncore.dispatcher): asyncore.dispatcher.__init__(self, sock) self.read_buf = b"" self.write_buf = b"" + self.writeLock = RLock() self.state = "init" self.lastTx = time.time() + self.sentBytes = 0 + self.receivedBytes = 0 def append_write_buf(self, string = None): - self.write_buf += string + with self.writeLock: + self.write_buf += string def slice_write_buf(self, length=0): if length > 0: - self.write_buf = self.write_buf[length:] + with self.writeLock: + self.write_buf = self.write_buf[length:] def slice_read_buf(self, length=0): if length > 0: @@ -58,9 +64,11 @@ class AdvancedDispatcher(asyncore.dispatcher): if asyncore.maxDownloadRate > 0: newData = self.recv(asyncore.downloadChunk) asyncore.downloadBucket -= len(newData) - self.read_buf += newData else: - self.read_buf += self.recv(AdvancedDispatcher._buf_len) + newData = self.recv(AdvancedDispatcher._buf_len) + self.receivedBytes += len(newData) + asyncore.updateReceived(len(newData)) + self.read_buf += newData self.process() def handle_write(self): @@ -70,6 +78,8 @@ class AdvancedDispatcher(asyncore.dispatcher): asyncore.uploadBucket -= written else: written = self.send(self.write_buf) + asyncore.updateSent(written) + self.sentBytes += written self.slice_write_buf(written) def handle_connect(self): diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 72a829ce..08ee42d0 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -93,10 +93,12 @@ maxDownloadRate = 0 downloadChunk = 0 downloadTimestamp = 0 downloadBucket = 0 +receivedBytes = 0 maxUploadRate = 0 uploadChunk = 0 uploadTimestamp = 0 uploadBucket = 0 +sentBytes = 0 def read(obj): try: @@ -127,6 +129,14 @@ def set_rates(download, upload): downloadTimestamp = time.time() uploadTimestamp = time.time() +def updateReceived(download=0): + global receivedBytes + receivedBytes += download + +def updateSent(upload=0): + global sentBytes + sentBytes += upload + def wait_tx_buckets(): global downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp if maxDownloadRate > 0 and maxUploadRate > 0: @@ -150,8 +160,6 @@ def wait_tx_buckets(): uploadBucket += (time.time() - uploadTimestamp) * maxUploadRate uploadTimestamp = time.time() - - def _exception(obj): try: obj.handle_expt_event() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index bf7019d3..3a5ea1b4 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -28,7 +28,7 @@ from network.tls import TLSDispatcher import addresses from bmconfigparser import BMConfigParser -from queues import objectProcessorQueue +from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue import shared import state import protocol @@ -57,6 +57,7 @@ class BMConnection(TLSDispatcher, BMQueues): self.verackReceived = False self.verackSent = False self.lastTx = time.time() + self.streams = [0] self.connectionFullyEstablished = False self.connectedAt = 0 self.skipUntil = 0 @@ -79,6 +80,7 @@ class BMConnection(TLSDispatcher, BMQueues): print "connecting in background to %s:%i" % (self.destination.host, self.destination.port) shared.connectedHostsList[self.destination] = 0 BMQueues.__init__(self) + UISignalQueue.put(('updateNetworkStatusTab', 'no data')) def bm_proto_reset(self): self.magic = None @@ -115,6 +117,7 @@ class BMConnection(TLSDispatcher, BMQueues): self.skipUntil = time.time() + now def set_connection_fully_established(self): + UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.antiIntersectionDelay(True) self.connectionFullyEstablished = True self.sendAddr() @@ -369,6 +372,10 @@ class BMConnection(TLSDispatcher, BMQueues): addresses = self.decode_payload_content("lQIQ16sH") return True + def bm_command_portcheck(self): + portCheckerQueue.put(state.Peer(self.destination, self.peerNode.port)) + return True + def bm_command_ping(self): self.append_write_buf(protocol.CreatePacket('pong')) return True @@ -399,10 +406,11 @@ class BMConnection(TLSDispatcher, BMQueues): print "my external IP: %s" % (self.sockNode.host) print "remote node incoming port: %i" % (self.peerNode.port) print "user agent: %s" % (self.userAgent) + print "streams: [%s]" % (",".join(map(str,self.streams))) if not self.peerValidityChecks(): # TODO ABORT return True - shared.connectedHostsList[self.destination] = self.streams[0] + #shared.connectedHostsList[self.destination] = self.streams[0] self.append_write_buf(protocol.CreatePacket('verack')) self.verackSent = True if not self.isOutbound: @@ -611,10 +619,7 @@ class BMConnection(TLSDispatcher, BMQueues): else: print "%s:%i: closing, %s" % (self.destination.host, self.destination.port, reason) network.connectionpool.BMConnectionPool().removeConnection(self) - try: - asyncore.dispatcher.close(self) - except AttributeError: - pass + asyncore.dispatcher.close(self) class Socks5BMConnection(Socks5Connection, BMConnection): diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index a1b1d10b..1c8d988d 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -1,11 +1,16 @@ +from queues import Queue import random from bmconfigparser import BMConfigParser import knownnodes +from queues import portCheckerQueue import state def chooseConnection(stream): if state.trustedPeer: return state.trustedPeer else: - return random.choice(knownnodes.knownNodes[stream].keys()) + try: + return portCheckerQueue.get(False) + except Queue.Empty: + return random.choice(knownnodes.knownNodes[stream].keys()) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 7036e7cf..6acf4bb6 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -145,4 +145,7 @@ class BMConnectionPool(object): if i.connectionFullyEstablished: minTx -= 300 - 20 if i.lastTx < minTx: - i.close("Timeout (%is)" % (time.time() - i.lastTx)) + if i.connectionFullyEstablished: + i.append_write_buf(protocol.CreatePacket('ping')) + else: + i.close("Timeout (%is)" % (time.time() - i.lastTx)) diff --git a/src/network/stats.py b/src/network/stats.py new file mode 100644 index 00000000..838ef23a --- /dev/null +++ b/src/network/stats.py @@ -0,0 +1,43 @@ +from bmconfigparser import BMConfigParser +from network.connectionpool import BMConnectionPool +import asyncore_pollchoose as asyncore +import shared +import throttle + +def connectedHostsList(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + retval = [] + for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + if not i.connected: + continue + try: + retval.append((i.destination, i.streams[0])) + except AttributeError: + pass + return retval + else: + return shared.connectedHostsList.items() + +def sentBytes(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + return asyncore.sentBytes + else: + return throttle.SendThrottle().total + +def uploadSpeed(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + return 0 + else: + return throttle.sendThrottle().getSpeed() + +def receivedBytes(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + return asyncore.receivedBytes + else: + return throttle.ReceiveThrottle().total + +def downloadSpeed(): + if BMConfigParser().safeGetBoolean("network", "asyncore"): + return 0 + else: + return throttle.ReceiveThrottle().getSpeed() diff --git a/src/network/tls.py b/src/network/tls.py index e691006d..d2abb6b9 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -59,36 +59,48 @@ class TLSDispatcher(AdvancedDispatcher): # self.socket.context.set_ecdh_curve("secp256k1") def writable(self): - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - #print "tls writable, %r" % (self.want_write) - return self.want_write - else: + try: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + #print "tls writable, %r" % (self.want_write) + return self.want_write + else: + return AdvancedDispatcher.writable(self) + except AttributeError: return AdvancedDispatcher.writable(self) def readable(self): - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - #print "tls readable, %r" % (self.want_read) - return self.want_read - else: + try: + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + #print "tls readable, %r" % (self.want_read) + return self.want_read + else: + return AdvancedDispatcher.readable(self) + except AttributeError: return AdvancedDispatcher.readable(self) def handle_read(self): - # wait for write buffer flush - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - #print "handshaking (read)" - self.state_tls_handshake() - else: - #print "not handshaking (read)" + try: + # wait for write buffer flush + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + #print "handshaking (read)" + self.state_tls_handshake() + else: + #print "not handshaking (read)" + return AdvancedDispatcher.handle_read(self) + except AttributeError: return AdvancedDispatcher.handle_read(self) def handle_write(self): - # wait for write buffer flush - if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: - #print "handshaking (write)" - self.state_tls_handshake() - else: - #print "not handshaking (write)" - return AdvancedDispatcher.handle_write(self) + try: + # wait for write buffer flush + if self.tlsStarted and not self.tlsDone and len(self.write_buf) == 0: + #print "handshaking (write)" + self.state_tls_handshake() + else: + #print "not handshaking (write)" + return AdvancedDispatcher.handle_write(self) + except AttributeError: + return AdvancedDispatcher.handle_read(self) def state_tls_handshake(self): # wait for flush diff --git a/src/queues.py b/src/queues.py index 335863e9..c6b09307 100644 --- a/src/queues.py +++ b/src/queues.py @@ -6,5 +6,6 @@ UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() +portCheckerQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread.