diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 4717c3cb..9ffc1607 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -1,5 +1,5 @@ """ -The singleCleaner class is a timer-driven thread that cleans data structures +The `singleCleaner` class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. @@ -45,12 +45,12 @@ class singleCleaner(StoppableThread): try: shared.maximumLengthOfTimeToBotherResendingMessages = ( float(BMConfigParser().get( - 'bitmessagesettings', 'stopresendingafterxdays')) * - 24 * 60 * 60 + 'bitmessagesettings', 'stopresendingafterxdays')) + * 24 * 60 * 60 ) + ( float(BMConfigParser().get( - 'bitmessagesettings', 'stopresendingafterxmonths')) * - (60 * 60 * 24 * 365) / 12) + 'bitmessagesettings', 'stopresendingafterxmonths')) + * (60 * 60 * 24 * 365) / 12) except: # Either the user hasn't set stopresendingafterxdays and # stopresendingafterxmonths yet or the options are missing @@ -92,8 +92,8 @@ class singleCleaner(StoppableThread): "SELECT toaddress, ackdata, status FROM sent" " WHERE ((status='awaitingpubkey' OR status='msgsent')" " AND folder='sent' AND sleeptill<? AND senttime>?)", - int(time.time()), int(time.time()) - - shared.maximumLengthOfTimeToBotherResendingMessages + int(time.time()), int(time.time()) + - shared.maximumLengthOfTimeToBotherResendingMessages ) for row in queryreturn: if len(row) < 2: @@ -139,9 +139,7 @@ class singleCleaner(StoppableThread): # thread.downloadQueue.clear() # inv/object tracking - for connection in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): connection.clean() # discovery tracking diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 6375f393..86295b87 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -645,10 +645,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): """Stop downloading an object""" - for connection in ( - connectionpool.BMConnectionPool().inboundConnections.values() + - connectionpool.BMConnectionPool().outboundConnections.values() - ): + for connection in connectionpool.BMConnectionPool().connections(): try: del connection.objectsNewToMe[hashId] except KeyError: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 1267522a..8f959356 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -1,6 +1,5 @@ """ -src/network/connectionpool.py -================================== +`BMConnectionPool` class definition """ import errno import logging @@ -26,9 +25,10 @@ logger = logging.getLogger('default') @Singleton -# pylint: disable=too-many-instance-attributes class BMConnectionPool(object): """Pool of all existing connections""" + # pylint: disable=too-many-instance-attributes + def __init__(self): asyncore.set_rates( BMConfigParser().safeGetInt( @@ -41,9 +41,21 @@ class BMConnectionPool(object): self.listeningSockets = {} self.udpSockets = {} self.streams = [] - self.lastSpawned = 0 - self.spawnWait = 2 - self.bootstrapped = False + self._lastSpawned = 0 + self._spawnWait = 2 + self._bootstrapped = False + + def connections(self): + """ + Shortcut for combined list of connections from + `inboundConnections` and `outboundConnections` dicts + """ + return self.inboundConnections.values() + self.outboundConnections.values() + + def establishedConnections(self): + """Shortcut for list of connections having fullyEstablished == True""" + return [ + x for x in self.connections() if x.fullyEstablished] def connectToStream(self, streamNumber): """Connect to a bitmessage stream""" @@ -74,10 +86,7 @@ class BMConnectionPool(object): def isAlreadyConnected(self, nodeid): """Check if we're already connected to this peer""" - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): try: if nodeid == i.nodeid: return True @@ -129,10 +138,11 @@ class BMConnectionPool(object): "bitmessagesettings", "onionbindip") else: host = '127.0.0.1' - if (BMConfigParser().safeGetBoolean( - "bitmessagesettings", "sockslisten") or - BMConfigParser().safeGet( - "bitmessagesettings", "socksproxytype") == "none"): + if ( + BMConfigParser().safeGetBoolean("bitmessagesettings", "sockslisten") + or BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") + == "none" + ): # python doesn't like bind + INADDR_ANY? # host = socket.INADDR_ANY host = BMConfigParser().get("network", "bind") @@ -205,11 +215,13 @@ class BMConnectionPool(object): 'bitmessagesettings', 'socksproxytype', '') onionsocksproxytype = BMConfigParser().safeGet( 'bitmessagesettings', 'onionsocksproxytype', '') - if (socksproxytype[:5] == 'SOCKS' and - not BMConfigParser().safeGetBoolean( - 'bitmessagesettings', 'sockslisten') and - '.onion' not in BMConfigParser().safeGet( - 'bitmessagesettings', 'onionhostname', '')): + if ( + socksproxytype[:5] == 'SOCKS' + and not BMConfigParser().safeGetBoolean( + 'bitmessagesettings', 'sockslisten') + and '.onion' not in BMConfigParser().safeGet( + 'bitmessagesettings', 'onionhostname', '') + ): acceptConnections = False # pylint: disable=too-many-nested-blocks @@ -217,8 +229,8 @@ class BMConnectionPool(object): if not knownnodes.knownNodesActual: self.startBootstrappers() knownnodes.knownNodesActual = True - if not self.bootstrapped: - self.bootstrapped = True + if not self._bootstrapped: + self._bootstrapped = True Proxy.proxy = ( BMConfigParser().safeGet( 'bitmessagesettings', 'sockshostname'), @@ -260,8 +272,7 @@ class BMConnectionPool(object): continue try: - if (chosen.host.endswith(".onion") and - Proxy.onion_proxy is not None): + if chosen.host.endswith(".onion") and Proxy.onion_proxy: if onionsocksproxytype == "SOCKS5": self.addConnection(Socks5BMConnection(chosen)) elif onionsocksproxytype == "SOCKS4a": @@ -276,12 +287,9 @@ class BMConnectionPool(object): if e.errno == errno.ENETUNREACH: continue - self.lastSpawned = time.time() + self._lastSpawned = time.time() else: - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): # FIXME: rating will be increased after next connection i.handle_close() @@ -291,8 +299,8 @@ class BMConnectionPool(object): self.startListening() else: for bind in re.sub( - '[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string - BMConfigParser().safeGet('network', 'bind') + r'[^\w.]+', ' ', + BMConfigParser().safeGet('network', 'bind') ).split(): self.startListening(bind) logger.info('Listening for incoming connections.') @@ -301,8 +309,8 @@ class BMConnectionPool(object): self.startUDPSocket() else: for bind in re.sub( - '[^\w.]+', ' ', # pylint: disable=anomalous-backslash-in-string - BMConfigParser().safeGet('network', 'bind') + r'[^\w.]+', ' ', + BMConfigParser().safeGet('network', 'bind') ).split(): self.startUDPSocket(bind) self.startUDPSocket(False) @@ -319,16 +327,13 @@ class BMConnectionPool(object): i.accepting = i.connecting = i.connected = False logger.info('Stopped udp sockets.') - loopTime = float(self.spawnWait) - if self.lastSpawned < time.time() - self.spawnWait: + loopTime = float(self._spawnWait) + if self._lastSpawned < time.time() - self._spawnWait: loopTime = 2.0 asyncore.loop(timeout=loopTime, count=1000) reaper = [] - for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() - ): + for i in self.connections(): minTx = time.time() - 20 if i.fullyEstablished: minTx -= 300 - 20 @@ -340,10 +345,8 @@ class BMConnectionPool(object): time.time() - i.lastTx) i.set_state("close") for i in ( - self.inboundConnections.values() + - self.outboundConnections.values() + - self.listeningSockets.values() + - self.udpSockets.values() + self.connections() + + self.listeningSockets.values() + self.udpSockets.values() ): if not (i.accepting or i.connecting or i.connected): reaper.append(i) diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 472b32c0..e882f6de 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,6 +1,5 @@ """ -src/network/downloadthread.py -============================= +`DownloadThread` class definition """ import time @@ -29,7 +28,7 @@ class DownloadThread(StoppableThread): def cleanPending(self): """Expire pending downloads eventually""" - deadline = time.time() - DownloadThread.requestExpires + deadline = time.time() - self.requestExpires try: toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] except RuntimeError: @@ -43,15 +42,12 @@ class DownloadThread(StoppableThread): while not self._stopped: requested = 0 # Choose downloading peers randomly - connections = [ - x for x in - BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values() - if x.fullyEstablished] + connections = BMConnectionPool().establishedConnections() helper_random.randomshuffle(connections) - try: - requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1) - except ZeroDivisionError: - requestChunk = 1 + requestChunk = max(int( + min(self.maxRequestChunk, len(missingObjects)) + / len(connections)), 1) if connections else 1 + for i in connections: now = time.time() # avoid unnecessary delay @@ -81,7 +77,7 @@ class DownloadThread(StoppableThread): '%s:%i Requesting %i objects', i.destination.host, i.destination.port, chunkCount) requested += chunkCount - if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: + if time.time() >= self.lastCleaned + self.cleanInterval: self.cleanPending() if not requested: self.stop.wait(1) diff --git a/src/network/invthread.py b/src/network/invthread.py index bffa6ecb..d5690486 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -20,9 +20,7 @@ def handleExpiredDandelion(expired): the object""" if not expired: return - for i in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for i in BMConnectionPool().connections(): if not i.fullyEstablished: continue for x in expired: @@ -44,9 +42,7 @@ class InvThread(StoppableThread): def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" Dandelion().addHash(hashId, stream=stream) - for connection in \ - BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): if state.dandelion and connection != Dandelion().objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() @@ -67,8 +63,7 @@ class InvThread(StoppableThread): break if chunk: - for connection in BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): + for connection in BMConnectionPool().connections(): fluffs = [] stems = [] for inv in chunk: @@ -96,13 +91,13 @@ class InvThread(StoppableThread): if fluffs: random.shuffle(fluffs) connection.append_write_buf(protocol.CreatePacket( - 'inv', addresses.encodeVarint(len(fluffs)) + - "".join(fluffs))) + 'inv', + addresses.encodeVarint(len(fluffs)) + ''.join(fluffs))) if stems: random.shuffle(stems) connection.append_write_buf(protocol.CreatePacket( - 'dinv', addresses.encodeVarint(len(stems)) + - "".join(stems))) + 'dinv', + addresses.encodeVarint(len(stems)) + ''.join(stems))) invQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/objectracker.py b/src/network/objectracker.py index a8e3292a..b97aee46 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -95,8 +95,7 @@ class ObjectTracker(object): def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" - for i in network.connectionpool.BMConnectionPool().inboundConnections.values( - ) + network.connectionpool.BMConnectionPool().outboundConnections.values(): + for i in network.connectionpool.BMConnectionPool().connections(): if not i.fullyEstablished: continue try: diff --git a/src/network/stats.py b/src/network/stats.py index fedfbbc1..d760ace2 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -19,16 +19,7 @@ currentSentSpeed = 0 def connectedHostsList(): """List of all the connected hosts""" - retval = [] - for i in BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values(): - if not i.fullyEstablished: - continue - try: - retval.append(i) - except AttributeError: - pass - return retval + return BMConnectionPool().establishedConnections() def sentBytes(): @@ -71,12 +62,6 @@ def downloadSpeed(): def pendingDownload(): """Getting pending downloads""" return len(missingObjects) - # tmp = {} - # for connection in BMConnectionPool().inboundConnections.values() + \ - # BMConnectionPool().outboundConnections.values(): - # for k in connection.objectsNewToMe.keys(): - # tmp[k] = True - # return len(tmp) def pendingUpload(): diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 1b57bd9a..7d80d789 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -1,5 +1,5 @@ """ -src/network/uploadthread.py +`UploadThread` class definition """ import time @@ -22,19 +22,19 @@ class UploadThread(StoppableThread): def run(self): while not self._stopped: uploaded = 0 - # Choose downloading peers randomly - connections = [x for x in BMConnectionPool().inboundConnections.values() + - BMConnectionPool().outboundConnections.values() if x.fullyEstablished] + # Choose uploading peers randomly + connections = BMConnectionPool().establishedConnections() helper_random.randomshuffle(connections) for i in connections: now = time.time() # avoid unnecessary delay if i.skipUntil >= now: continue - if len(i.write_buf) > UploadThread.maxBufSize: + if len(i.write_buf) > self.maxBufSize: continue try: - request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) + request = i.pendingUpload.randomKeys( + RandomTrackingDict.maxPending) except KeyError: continue payload = bytearray()