From 81872c7f2f8f092190f2bbd3a02f9c8065343e40 Mon Sep 17 00:00:00 2001 From: lakshyacis Date: Thu, 19 Dec 2019 16:54:53 +0530 Subject: [PATCH] network code quality fixes --- src/network/networkthread.py | 5 ++- src/network/objectracker.py | 27 ++++++++----- src/network/proxy.py | 6 +-- src/network/randomtrackingdict.py | 30 +++++++------- src/network/receivequeuethread.py | 11 +++++- src/network/socks4a.py | 3 +- src/network/socks5.py | 7 ++-- src/network/stats.py | 10 +++-- src/network/tcp.py | 25 ++++++------ src/network/tls.py | 65 +++++++++++++++++++------------ src/network/udp.py | 14 +++---- 11 files changed, 117 insertions(+), 86 deletions(-) diff --git a/src/network/networkthread.py b/src/network/networkthread.py index ba560906..61ff6c09 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -1,3 +1,6 @@ +""" +A thread to handle network concerns +""" import network.asyncore_pollchoose as asyncore import state from network.connectionpool import BMConnectionPool @@ -6,7 +9,7 @@ from threads import StoppableThread class BMNetworkThread(StoppableThread): - """A thread to handle network concerns""" + """Main network thread""" name = "Asyncore" def run(self): diff --git a/src/network/objectracker.py b/src/network/objectracker.py index b97aee46..ca29c023 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -1,6 +1,5 @@ """ -src/network/objectracker.py -=========================== +Module for tracking objects """ import time from threading import RLock @@ -50,15 +49,18 @@ class ObjectTracker(object): """Init bloom filter for tracking. WIP.""" if haveBloom: # lock? - self.invBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, - error_rate=ObjectTracker.invErrorRate) + self.invBloom = BloomFilter( + capacity=ObjectTracker.invInitialCapacity, + error_rate=ObjectTracker.invErrorRate) def initAddrBloom(self): - """Init bloom filter for tracking addrs, WIP. This either needs to be moved to addrthread.py or removed.""" + """Init bloom filter for tracking addrs, WIP. + This either needs to be moved to addrthread.py or removed.""" if haveBloom: # lock? - self.addrBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, - error_rate=ObjectTracker.invErrorRate) + self.addrBloom = BloomFilter( + capacity=ObjectTracker.invInitialCapacity, + error_rate=ObjectTracker.invErrorRate) def clean(self): """Clean up tracking to prevent memory bloat""" @@ -71,7 +73,10 @@ class ObjectTracker(object): # release memory deadline = time.time() - ObjectTracker.trackingExpires with self.objectsNewToThemLock: - self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline} + self.objectsNewToThem = { + k: v + for k, v in self.objectsNewToThem.iteritems() + if v >= deadline} self.lastCleaned = time.time() def hasObj(self, hashid): @@ -102,10 +107,12 @@ class ObjectTracker(object): del i.objectsNewToMe[hashid] except KeyError: if streamNumber in i.streams and ( - not Dandelion().hasHash(hashid) or Dandelion().objectChildStem(hashid) == i): + not Dandelion().hasHash(hashid) or + Dandelion().objectChildStem(hashid) == i): with i.objectsNewToThemLock: i.objectsNewToThem[hashid] = time.time() - # update stream number, which we didn't have when we just received the dinv + # update stream number, + # which we didn't have when we just received the dinv # also resets expiration of the stem mode Dandelion().setHashStream(hashid, streamNumber) diff --git a/src/network/proxy.py b/src/network/proxy.py index e0bb5e78..38676d66 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -1,6 +1,5 @@ """ -src/network/proxy.py -==================== +Set proxy if avaiable otherwise exception """ # pylint: disable=protected-access import logging @@ -122,8 +121,7 @@ class Proxy(AdvancedDispatcher): BMConfigParser().safeGet( "bitmessagesettings", "socksusername"), BMConfigParser().safeGet( - "bitmessagesettings", "sockspassword") - ) + "bitmessagesettings", "sockspassword")) else: self.auth = None self.connect( diff --git a/src/network/randomtrackingdict.py b/src/network/randomtrackingdict.py index 6c3300ab..e87bf156 100644 --- a/src/network/randomtrackingdict.py +++ b/src/network/randomtrackingdict.py @@ -1,8 +1,6 @@ """ -src/randomtrackingdict.py -========================= +Track randomize ordered dict """ - import random from threading import RLock from time import time @@ -14,10 +12,12 @@ class RandomTrackingDict(object): """ Dict with randomised order and tracking. - Keeps a track of how many items have been requested from the dict, and timeouts. Resets after all objects have been - retrieved and timed out. The main purpose of this isn't as much putting related code together as performance - optimisation and anonymisation of downloading of objects from other peers. If done using a standard dict or array, - it takes too much CPU (and looks convoluted). Randomisation helps with anonymity. + Keeps a track of how many items have been requested from the dict, + and timeouts. Resets after all objects have been retrieved and timed out. + The main purpose of this isn't as much putting related code together + as performance optimisation and anonymisation of downloading of objects + from other peers. If done using a standard dict or array, it takes + too much CPU (and looks convoluted). Randomisation helps with anonymity. """ # pylint: disable=too-many-instance-attributes maxPending = 10 @@ -85,13 +85,14 @@ class RandomTrackingDict(object): def setMaxPending(self, maxPending): """ - Sets maximum number of objects that can be retrieved from the class simultaneously as long as there is no - timeout + Sets maximum number of objects that can be retrieved from the class + simultaneously as long as there is no timeout """ self.maxPending = maxPending def setPendingTimeout(self, pendingTimeout): - """Sets how long to wait for a timeout if max pending is reached (or all objects have been retrieved)""" + """Sets how long to wait for a timeout if max pending is reached + (or all objects have been retrieved)""" self.pendingTimeout = pendingTimeout def setLastObject(self): @@ -99,7 +100,8 @@ class RandomTrackingDict(object): self.lastObject = time() def randomKeys(self, count=1): - """Retrieve count random keys from the dict that haven't already been retrieved""" + """Retrieve count random keys from the dict + that haven't already been retrieved""" if self.len == 0 or ((self.pendingLen >= self.maxPending or self.pendingLen == self.len) and self.lastPoll + self.pendingTimeout > time()): @@ -109,13 +111,15 @@ class RandomTrackingDict(object): with self.lock: # reset if we've requested all # and if last object received too long time ago - if self.pendingLen == self.len and self.lastObject + self.pendingTimeout < time(): + if self.pendingLen == self.len and self.lastObject + \ + self.pendingTimeout < time(): self.pendingLen = 0 self.setLastObject() available = self.len - self.pendingLen if count > available: count = available - randomIndex = helper_random.randomsample(range(self.len - self.pendingLen), count) + randomIndex = helper_random.randomsample( + range(self.len - self.pendingLen), count) retval = [self.indexDict[i] for i in randomIndex] for i in sorted(randomIndex, reverse=True): diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index cd904065..1f5533b3 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -1,3 +1,6 @@ +""" +Process data incoming from network +""" import errno import Queue import socket @@ -10,6 +13,8 @@ from threads import StoppableThread class ReceiveQueueThread(StoppableThread): + """This thread processes data received from the network + (which is done by the asyncore thread)""" def __init__(self, num=0): super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) @@ -32,12 +37,14 @@ class ReceiveQueueThread(StoppableThread): try: connection = BMConnectionPool().getConnectionByAddr(dest) - except KeyError: # connection object not found + # connection object not found + except KeyError: receiveDataQueue.task_done() continue try: connection.process() - except UnknownStateError: # state isn't implemented + # state isn't implemented + except UnknownStateError: pass except socket.error as err: if err.errno == errno.EBADF: diff --git a/src/network/socks4a.py b/src/network/socks4a.py index f0b234f5..42eab4b7 100644 --- a/src/network/socks4a.py +++ b/src/network/socks4a.py @@ -1,6 +1,5 @@ """ -src/network/socks4a.py -================================= +SOCKS4a proxy module """ # pylint: disable=attribute-defined-outside-init import socket diff --git a/src/network/socks5.py b/src/network/socks5.py index f0241744..fc33f4df 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -1,7 +1,5 @@ """ -src/network/socks5.py -===================== - +SOCKS5 proxy module """ # pylint: disable=attribute-defined-outside-init @@ -155,7 +153,8 @@ class Socks5(Proxy): return True def proxy_sock_name(self): - """Handle return value when using SOCKS5 for DNS resolving instead of connecting.""" + """Handle return value when using SOCKS5 + for DNS resolving instead of connecting.""" return socket.inet_ntoa(self.__proxysockname[0]) diff --git a/src/network/stats.py b/src/network/stats.py index d760ace2..82e6c87f 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -1,6 +1,5 @@ """ -src/network/stats.py -==================== +Network statistics """ import time @@ -34,7 +33,9 @@ def uploadSpeed(): currentTimestamp = time.time() if int(lastSentTimestamp) < int(currentTimestamp): currentSentBytes = asyncore.sentBytes - currentSentSpeed = int((currentSentBytes - lastSentBytes) / (currentTimestamp - lastSentTimestamp)) + currentSentSpeed = int( + (currentSentBytes - lastSentBytes) / ( + currentTimestamp - lastSentTimestamp)) lastSentBytes = currentSentBytes lastSentTimestamp = currentTimestamp return currentSentSpeed @@ -53,7 +54,8 @@ def downloadSpeed(): if int(lastReceivedTimestamp) < int(currentTimestamp): currentReceivedBytes = asyncore.receivedBytes currentReceivedSpeed = int( - (currentReceivedBytes - lastReceivedBytes) / (currentTimestamp - lastReceivedTimestamp)) + (currentReceivedBytes - lastReceivedBytes) / ( + currentTimestamp - lastReceivedTimestamp)) lastReceivedBytes = currentReceivedBytes lastReceivedTimestamp = currentTimestamp return currentReceivedSpeed diff --git a/src/network/tcp.py b/src/network/tcp.py index 3097765f..d611b1ca 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -1,9 +1,7 @@ +""" +TCP protocol handler +""" # pylint: disable=too-many-ancestors -""" -src/network/tcp.py -================== -""" - import logging import math import random @@ -31,7 +29,7 @@ from network.socks4a import Socks4aConnection from network.socks5 import Socks5Connection from network.tls import TLSDispatcher from node import Peer -from queues import UISignalQueue, invQueue, receiveDataQueue +from queues import invQueue, receiveDataQueue, UISignalQueue logger = logging.getLogger('default') @@ -39,7 +37,6 @@ logger = logging.getLogger('default') class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes """ - .. todo:: Look to understand and/or fix the non-parent-init-called """ @@ -85,7 +82,8 @@ class TCPConnection(BMProto, TLSDispatcher): not protocol.checkSocksIP(self.destination.host) ) except socket.error: - pass # it's probably a hostname + # it's probably a hostname + pass self.network_group = protocol.network_group(self.destination.host) ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called self.bm_proto_reset() @@ -140,10 +138,9 @@ class TCPConnection(BMProto, TLSDispatcher): if not self.isOutbound and not self.local: shared.clientHasReceivedIncomingConnections = True UISignalQueue.put(('setStatusIcon', 'green')) - UISignalQueue.put(( - 'updateNetworkStatusTab', - (self.isOutbound, True, self.destination) - )) + UISignalQueue.put( + ('updateNetworkStatusTab', ( + self.isOutbound, True, self.destination))) self.antiIntersectionDelay(True) self.fullyEstablished = True if self.isOutbound: @@ -215,8 +212,8 @@ class TCPConnection(BMProto, TLSDispatcher): bigInvList[objHash] = 0 objectCount = 0 payload = b'' - # Now let us start appending all of these hashes together. They will be - # sent out in a big inv message to our new peer. + # Now let us start appending all of these hashes together. + # They will be sent out in a big inv message to our new peer. for obj_hash, _ in bigInvList.items(): payload += obj_hash objectCount += 1 diff --git a/src/network/tls.py b/src/network/tls.py index d5c4e23a..f756591c 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -1,7 +1,6 @@ """ SSL/TLS negotiation. """ - import logging import os import socket @@ -10,6 +9,7 @@ import sys from network.advanceddispatcher import AdvancedDispatcher import network.asyncore_pollchoose as asyncore + from queues import receiveDataQueue import paths @@ -24,7 +24,8 @@ if sys.version_info >= (2, 7, 13): # ssl.PROTOCOL_TLS1.2 sslProtocolVersion = ssl.PROTOCOL_TLS # pylint: disable=no-member elif sys.version_info >= (2, 7, 9): - # this means any SSL/TLS. SSLv2 and 3 are excluded with an option after context is created + # this means any SSL/TLS. + # SSLv2 and 3 are excluded with an option after context is created sslProtocolVersion = ssl.PROTOCOL_SSLv23 else: # this means TLSv1, there is no way to set "TLSv1 or higher" or @@ -33,7 +34,8 @@ else: # ciphers -if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not ssl.OPENSSL_VERSION.startswith("LibreSSL"): +if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not \ + ssl.OPENSSL_VERSION.startswith("LibreSSL"): sslProtocolCiphers = "AECDH-AES256-SHA@SECLEVEL=0" else: sslProtocolCiphers = "AECDH-AES256-SHA" @@ -41,19 +43,19 @@ else: class TLSDispatcher(AdvancedDispatcher): """TLS functionality for classes derived from AdvancedDispatcher""" - # pylint: disable=too-many-instance-attributes - # pylint: disable=too-many-arguments,super-init-not-called,unused-argument - def __init__( - self, address=None, sock=None, certfile=None, keyfile=None, - server_side=False, ciphers=sslProtocolCiphers - ): + # pylint: disable=too-many-instance-attributes, too-many-arguments + # pylint: disable=super-init-not-called + def __init__(self, _=None, sock=None, certfile=None, keyfile=None, + server_side=False, ciphers=sslProtocolCiphers): self.want_read = self.want_write = True if certfile is None: - self.certfile = os.path.join(paths.codePath(), 'sslkeys', 'cert.pem') + self.certfile = os.path.join( + paths.codePath(), 'sslkeys', 'cert.pem') else: self.certfile = certfile if keyfile is None: - self.keyfile = os.path.join(paths.codePath(), 'sslkeys', 'key.pem') + self.keyfile = os.path.join( + paths.codePath(), 'sslkeys', 'key.pem') else: self.keyfile = keyfile self.server_side = server_side @@ -68,20 +70,23 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=attribute-defined-outside-init self.isSSL = True self.tlsStarted = True - # Once the connection has been established, it's safe to wrap the - # socket. + # Once the connection has been established, + # it's safe to wrap the socket. if sys.version_info >= (2, 7, 9): context = ssl.create_default_context( - purpose=ssl.Purpose.SERVER_AUTH if self.server_side else ssl.Purpose.CLIENT_AUTH) + purpose=ssl.Purpose.SERVER_AUTH + if self.server_side else ssl.Purpose.CLIENT_AUTH) context.set_ciphers(self.ciphers) context.set_ecdh_curve("secp256k1") context.check_hostname = False context.verify_mode = ssl.CERT_NONE # also exclude TLSv1 and TLSv1.1 in the future context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 |\ - ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE + ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE |\ + ssl.OP_CIPHER_SERVER_PREFERENCE self.sslSocket = context.wrap_socket( - self.socket, server_side=self.server_side, do_handshake_on_connect=False) + self.socket, server_side=self.server_side, + do_handshake_on_connect=False) else: self.sslSocket = ssl.wrap_socket( self.socket, server_side=self.server_side, @@ -115,12 +120,15 @@ class TLSDispatcher(AdvancedDispatcher): def readable(self): """Handle readable check for TLS-enabled sockets""" try: - # during TLS handshake, and after flushing write buffer, return status of last handshake attempt + # during TLS handshake, and after flushing write buffer, + # return status of last handshake attempt if self.tlsStarted and not self.tlsDone and not self.write_buf: # print "tls readable, %r" % (self.want_read) return self.want_read - # prior to TLS handshake, receiveDataThread should emulate synchronous behaviour - elif not self.fullyEstablished and (self.expectBytes == 0 or not self.write_buf_empty()): + # prior to TLS handshake, + # receiveDataThread should emulate synchronous behaviour + elif not self.fullyEstablished and ( + self.expectBytes == 0 or not self.write_buf_empty()): return False return AdvancedDispatcher.readable(self) except AttributeError: @@ -135,10 +143,14 @@ class TLSDispatcher(AdvancedDispatcher): try: # wait for write buffer flush if self.tlsStarted and not self.tlsDone and not self.write_buf: - # logger.debug("%s:%i TLS handshaking (read)", self.destination.host, self.destination.port) + # logger.debug( + # "%s:%i TLS handshaking (read)", self.destination.host, + # self.destination.port) self.tls_handshake() else: - # logger.debug("%s:%i Not TLS handshaking (read)", self.destination.host, self.destination.port) + # logger.debug( + # "%s:%i Not TLS handshaking (read)", self.destination.host, + # self.destination.port) return AdvancedDispatcher.handle_read(self) except AttributeError: return AdvancedDispatcher.handle_read(self) @@ -161,10 +173,14 @@ class TLSDispatcher(AdvancedDispatcher): try: # wait for write buffer flush if self.tlsStarted and not self.tlsDone and not self.write_buf: - # logger.debug("%s:%i TLS handshaking (write)", self.destination.host, self.destination.port) + # logger.debug( + # "%s:%i TLS handshaking (write)", self.destination.host, + # self.destination.port) self.tls_handshake() else: - # logger.debug("%s:%i Not TLS handshaking (write)", self.destination.host, self.destination.port) + # logger.debug( + # "%s:%i Not TLS handshaking (write)", self.destination.host, + # self.destination.port) return AdvancedDispatcher.handle_write(self) except AttributeError: return AdvancedDispatcher.handle_write(self) @@ -188,7 +204,8 @@ class TLSDispatcher(AdvancedDispatcher): # print "handshaking (internal)" self.sslSocket.do_handshake() except ssl.SSLError as err: - # print "%s:%i: handshake fail" % (self.destination.host, self.destination.port) + # print "%s:%i: handshake fail" % ( + # self.destination.host, self.destination.port) self.want_read = self.want_write = False if err.args[0] == ssl.SSL_ERROR_WANT_READ: # print "want read" diff --git a/src/network/udp.py b/src/network/udp.py index d8c5f340..d5f1cccd 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -1,13 +1,12 @@ """ -src/network/udp.py -================== +UDP protocol handler """ import logging -import time import socket +import time -import state import protocol +import state from bmproto import BMProto from node import Peer from objectracker import ObjectTracker @@ -79,7 +78,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes if not self.local: return True remoteport = False - for seenTime, stream, services, ip, port in addresses: + for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) if stream not in state.streamsInWhichIAmParticipating: continue @@ -96,9 +95,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes "received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport) if self.local: - state.discoveredPeers[ - Peer(self.destination.host, remoteport) - ] = time.time() + state.discoveredPeers[Peer(self.destination.host, remoteport)] = \ + time.time() return True def bm_command_portcheck(self):