From 28355d70c734f57b8638fd8ace44a0bf725a1561 Mon Sep 17 00:00:00 2001 From: anand k Date: Thu, 9 May 2024 08:44:56 +0530 Subject: [PATCH] Made BMConnectionPool as global runtime variable in connectionpool from singleton --- src/api.py | 10 +++++----- src/bitmessageqt/networkstatus.py | 10 +++++----- src/bitmessageqt/settings.py | 3 ++- src/class_singleCleaner.py | 4 ++-- src/network/__init__.py | 12 ++++-------- src/network/addrthread.py | 4 ++-- src/network/announcethread.py | 4 ++-- src/network/bmproto.py | 14 +++++++------- src/network/connectionpool.py | 5 +++-- src/network/downloadthread.py | 4 ++-- src/network/invthread.py | 8 ++++---- src/network/networkthread.py | 10 +++++----- src/network/objectracker.py | 4 ++-- src/network/receivequeuethread.py | 4 ++-- src/network/stats.py | 8 ++++---- src/network/tcp.py | 12 ++++++------ src/network/uploadthread.py | 4 ++-- src/tests/core.py | 8 ++++---- src/tests/test_network.py | 2 +- src/upnp.py | 4 ++-- 20 files changed, 66 insertions(+), 68 deletions(-) diff --git a/src/api.py b/src/api.py index e93b250a..a4445569 100644 --- a/src/api.py +++ b/src/api.py @@ -96,9 +96,9 @@ from helper_sql import ( from highlevelcrypto import calculateInventoryHash try: - from network import BMConnectionPool + from network import connectionpool except ImportError: - BMConnectionPool = None + connectionpool = None from network import stats, StoppableThread from version import softwareVersion @@ -1475,18 +1475,18 @@ class BMRPCDispatcher(object): Returns bitmessage connection information as dict with keys *inbound*, *outbound*. """ - if BMConnectionPool is None: + if connectionpool is None: raise APIError(21, 'Could not import BMConnectionPool.') inboundConnections = [] outboundConnections = [] - for i in BMConnectionPool().inboundConnections.values(): + for i in connectionpool.pool.inboundConnections.values(): inboundConnections.append({ 'host': i.destination.host, 'port': i.destination.port, 'fullyEstablished': i.fullyEstablished, 'userAgent': str(i.userAgent) }) - for i in BMConnectionPool().outboundConnections.values(): + for i in connectionpool.pool.outboundConnections.values(): outboundConnections.append({ 'host': i.destination.host, 'port': i.destination.port, diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 3aa213c8..5d669f39 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -10,7 +10,7 @@ import l10n import network.stats import state import widgets -from network import BMConnectionPool, knownnodes +from network import connectionpool, knownnodes from retranslateui import RetranslateMixin from tr import _translate from uisignaler import UISignaler @@ -148,16 +148,16 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): # pylint: disable=too-many-branches,undefined-variable if outbound: try: - c = BMConnectionPool().outboundConnections[destination] + c = connectionpool.pool.outboundConnections[destination] except KeyError: if add: return else: try: - c = BMConnectionPool().inboundConnections[destination] + c = connectionpool.pool.inboundConnections[destination] except KeyError: try: - c = BMConnectionPool().inboundConnections[destination.host] + c = connectionpool.pool.inboundConnections[destination.host] except KeyError: if add: return @@ -201,7 +201,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): self.tableWidgetConnectionCount.item(0, 0).setData(QtCore.Qt.UserRole, destination) self.tableWidgetConnectionCount.item(0, 1).setData(QtCore.Qt.UserRole, outbound) else: - if not BMConnectionPool().inboundConnections: + if not connectionpool.pool.inboundConnections: self.window().setStatusIcon('yellow') for i in range(self.tableWidgetConnectionCount.rowCount()): if self.tableWidgetConnectionCount.item(i, 0).data(QtCore.Qt.UserRole).toPyObject() != destination: diff --git a/src/bitmessageqt/settings.py b/src/bitmessageqt/settings.py index 6e0d4792..2d56c47f 100644 --- a/src/bitmessageqt/settings.py +++ b/src/bitmessageqt/settings.py @@ -20,7 +20,8 @@ import widgets from bmconfigparser import config as config_obj from helper_sql import sqlExecute, sqlStoredProcedure from helper_startup import start_proxyconfig -from network import knownnodes, AnnounceThread +from network import knownnodes +from network.announcethread import AnnounceThread from network.asyncore_pollchoose import set_rates from tr import _translate diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 1314f938..acbf36ab 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -27,7 +27,7 @@ import queues import state from bmconfigparser import config from helper_sql import sqlExecute, sqlQuery -from network import BMConnectionPool, knownnodes, StoppableThread +from network import connectionpool, knownnodes, StoppableThread from tr import _translate @@ -129,7 +129,7 @@ class singleCleaner(StoppableThread): os._exit(1) # pylint: disable=protected-access # inv/object tracking - for connection in BMConnectionPool().connections(): + for connection in connectionpool.pool.connections(): connection.clean() # discovery tracking diff --git a/src/network/__init__.py b/src/network/__init__.py index 49f77c43..d89670a7 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -2,21 +2,17 @@ Network subsystem package """ -try: - from .announcethread import AnnounceThread - from .connectionpool import BMConnectionPool -except ImportError: - AnnounceThread = None - BMConnectionPool = None from .threads import StoppableThread -__all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"] +__all__ = ["StoppableThread"] def start(config, state): """Start network threads""" import state + from .announcethread import AnnounceThread + import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread from .dandelion import Dandelion from .downloadthread import DownloadThread @@ -29,7 +25,7 @@ def start(config, state): readKnownNodes() # init, needs to be early because other thread may access it early state.Dandelion = Dandelion() - BMConnectionPool().connectToStream(1) + connectionpool.pool.connectToStream(1) for thread in ( BMNetworkThread(), InvThread(), AddrThread(), DownloadThread(), UploadThread() diff --git a/src/network/addrthread.py b/src/network/addrthread.py index fea0910e..74a5d744 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -5,10 +5,10 @@ from six.moves import queue # magic imports! import state +import connectionpool from helper_random import randomshuffle from protocol import assembleAddrMessage from queues import addrQueue # FIXME: init with queue -from network.connectionpool import BMConnectionPool from threads import StoppableThread @@ -29,7 +29,7 @@ class AddrThread(StoppableThread): if chunk: # Choose peers randomly - connections = BMConnectionPool().establishedConnections() + connections = connectionpool.pool.establishedConnections() randomshuffle(connections) for i in connections: randomshuffle(chunk) diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 84807757..003eb092 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -5,9 +5,9 @@ import time # magic imports! import state +import connectionpool from bmconfigparser import config from protocol import assembleAddrMessage -from network.connectionpool import BMConnectionPool from node import Peer from threads import StoppableThread @@ -31,7 +31,7 @@ class AnnounceThread(StoppableThread): @staticmethod def announceSelf(): """Announce our presence""" - for connection in BMConnectionPool().udpSockets.values(): + for connection in connectionpool.pool.udpSockets.values(): if not connection.announcing: continue for stream in state.streamsInWhichIAmParticipating: diff --git a/src/network/bmproto.py b/src/network/bmproto.py index a0674077..41e163df 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -12,10 +12,10 @@ import time # magic imports! import addresses -import connectionpool import knownnodes import protocol import state +import connectionpool from bmconfigparser import config from queues import invQueue, objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict @@ -540,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if not self.isOutbound: self.append_write_buf(protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, True, + connectionpool.pool.streams, True, nodeid=self.nodeid)) logger.debug( '%(host)s:%(port)i sending version', @@ -596,7 +596,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s because there is no overlapping' ' interest in streams.', self.destination) return False - if connectionpool.BMConnectionPool().inboundConnections.get( + if connectionpool.pool.inboundConnections.get( self.destination): try: if not protocol.checkSocksIP(self.destination.host): @@ -614,8 +614,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # or server full report the same error to counter deanonymisation if ( Peer(self.destination.host, self.peerNode.port) - in connectionpool.BMConnectionPool().inboundConnections - or len(connectionpool.BMConnectionPool()) + in connectionpool.pool.inboundConnections + or len(connectionpool.pool) > config.safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + config.safeGetInt( @@ -627,7 +627,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s due to server full' ' or duplicate inbound/outbound.', self.destination) return False - if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce): + if connectionpool.pool.isAlreadyConnected(self.nonce): self.append_write_buf(protocol.assembleErrorMessage( errorText="I'm connected to myself. Closing connection.", fatal=2)) @@ -641,7 +641,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): """Stop downloading object *hashId*""" - for connection in connectionpool.BMConnectionPool().connections(): + for connection in connectionpool.pool.connections(): try: del connection.objectsNewToMe[hashId] except KeyError: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 4823b3c8..b756f8a4 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -17,7 +17,6 @@ from bmconfigparser import config from connectionchooser import chooseConnection from node import Peer from proxy import Proxy -from singleton import Singleton from tcp import ( bootstrap, Socks4aBMConnection, Socks5BMConnection, TCPConnection, TCPServer) @@ -26,7 +25,6 @@ from udp import UDPSocket logger = logging.getLogger('default') -@Singleton class BMConnectionPool(object): """Pool of all existing connections""" # pylint: disable=too-many-instance-attributes @@ -403,3 +401,6 @@ class BMConnectionPool(object): pass for i in reaper: self.removeConnection(i) + + +pool = BMConnectionPool() diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 777f190d..4f108c72 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -6,7 +6,7 @@ import state import addresses import helper_random import protocol -from network.connectionpool import BMConnectionPool +import connectionpool from objectracker import missingObjects from threads import StoppableThread @@ -41,7 +41,7 @@ class DownloadThread(StoppableThread): while not self._stopped: requested = 0 # Choose downloading peers randomly - connections = BMConnectionPool().establishedConnections() + connections = connectionpool.pool.establishedConnections() helper_random.randomshuffle(connections) requestChunk = max(int( min(self.maxRequestChunk, len(missingObjects)) diff --git a/src/network/invthread.py b/src/network/invthread.py index 346baa93..b55408d4 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -8,7 +8,7 @@ from time import time import addresses import protocol import state -from network.connectionpool import BMConnectionPool +import connectionpool from queues import invQueue from threads import StoppableThread @@ -18,7 +18,7 @@ def handleExpiredDandelion(expired): the object""" if not expired: return - for i in BMConnectionPool().connections(): + for i in connectionpool.pool.connections(): if not i.fullyEstablished: continue for x in expired: @@ -40,7 +40,7 @@ class InvThread(StoppableThread): def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" state.Dandelion.addHash(hashId, stream=stream) - for connection in BMConnectionPool().connections(): + for connection in connectionpool.pool.connections(): if state.dandelion_enabled and connection != \ state.Dandelion.objectChildStem(hashId): continue @@ -62,7 +62,7 @@ class InvThread(StoppableThread): break if chunk: - for connection in BMConnectionPool().connections(): + for connection in connectionpool.pool.connections(): fluffs = [] stems = [] for inv in chunk: diff --git a/src/network/networkthread.py b/src/network/networkthread.py index ef4f92ba..dc5f616f 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -3,7 +3,7 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore import state -from network.connectionpool import BMConnectionPool +import connectionpool from queues import excQueue from threads import StoppableThread @@ -15,24 +15,24 @@ class BMNetworkThread(StoppableThread): def run(self): try: while not self._stopped and state.shutdown == 0: - BMConnectionPool().loop() + connectionpool.pool.loop() except Exception as e: excQueue.put((self.name, e)) raise def stopThread(self): super(BMNetworkThread, self).stopThread() - for i in BMConnectionPool().listeningSockets.values(): + for i in connectionpool.pool.listeningSockets.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except pass - for i in BMConnectionPool().outboundConnections.values(): + for i in connectionpool.pool.outboundConnections.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except pass - for i in BMConnectionPool().inboundConnections.values(): + for i in connectionpool.pool.inboundConnections.values(): try: i.close() except: # nosec B110 # pylint:disable=bare-except diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 8de098ce..a458e5d2 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -5,7 +5,7 @@ import time from threading import RLock import state -import network.connectionpool +import connectionpool from randomtrackingdict import RandomTrackingDict haveBloom = False @@ -100,7 +100,7 @@ class ObjectTracker(object): def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" - for i in network.connectionpool.BMConnectionPool().connections(): + for i in connectionpool.pool.connections(): if not i.fullyEstablished: continue try: diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 56c01b77..cad1376c 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -6,8 +6,8 @@ import Queue import socket import state +import connectionpool from network.advanceddispatcher import UnknownStateError -from network.connectionpool import BMConnectionPool from queues import receiveDataQueue from threads import StoppableThread @@ -36,7 +36,7 @@ class ReceiveQueueThread(StoppableThread): # enough data, or the connection is to be aborted try: - connection = BMConnectionPool().getConnectionByAddr(dest) + connection = connectionpool.pool.getConnectionByAddr(dest) # connection object not found except KeyError: receiveDataQueue.task_done() diff --git a/src/network/stats.py b/src/network/stats.py index 82e6c87f..0ab1ae0f 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -4,7 +4,7 @@ Network statistics import time import asyncore_pollchoose as asyncore -from network.connectionpool import BMConnectionPool +import connectionpool from objectracker import missingObjects @@ -18,7 +18,7 @@ currentSentSpeed = 0 def connectedHostsList(): """List of all the connected hosts""" - return BMConnectionPool().establishedConnections() + return connectionpool.pool.establishedConnections() def sentBytes(): @@ -69,8 +69,8 @@ def pendingDownload(): def pendingUpload(): """Getting pending uploads""" # tmp = {} - # for connection in BMConnectionPool().inboundConnections.values() + \ - # BMConnectionPool().outboundConnections.values(): + # for connection in connectionpool.pool.inboundConnections.values() + \ + # connectionpool.pool.outboundConnections.values(): # for k in connection.objectsNewToThem.keys(): # tmp[k] = True # This probably isn't the correct logic so it's disabled diff --git a/src/network/tcp.py b/src/network/tcp.py index 0964d8b8..139715a6 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -15,13 +15,13 @@ import helper_random import l10n import protocol import state +import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes from queues import invQueue, receiveDataQueue, UISignalQueue from tr import _translate import asyncore_pollchoose as asyncore -import connectionpool import knownnodes from network.advanceddispatcher import AdvancedDispatcher from network.bmproto import BMProto @@ -267,7 +267,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + connectionpool.pool.streams, False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -318,7 +318,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + connectionpool.pool.streams, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -342,7 +342,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + connectionpool.pool.streams, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -430,7 +430,7 @@ class TCPServer(AdvancedDispatcher): state.ownAddresses[Peer(*sock.getsockname())] = True if ( - len(connectionpool.BMConnectionPool()) + len(connectionpool.pool) > config.safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + config.safeGetInt( @@ -442,7 +442,7 @@ class TCPServer(AdvancedDispatcher): sock.close() return try: - connectionpool.BMConnectionPool().addConnection( + connectionpool.pool.addConnection( TCPConnection(sock=sock)) except socket.error: pass diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 8b3e5938..90048c0a 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -6,7 +6,7 @@ import time import helper_random import protocol import state -from network.connectionpool import BMConnectionPool +import connectionpool from randomtrackingdict import RandomTrackingDict from threads import StoppableThread @@ -22,7 +22,7 @@ class UploadThread(StoppableThread): while not self._stopped: uploaded = 0 # Choose uploading peers randomly - connections = BMConnectionPool().establishedConnections() + connections = connectionpool.pool.establishedConnections() helper_random.randomshuffle(connections) for i in connections: now = time.time() diff --git a/src/tests/core.py b/src/tests/core.py index 21966dcc..806c288e 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -26,7 +26,7 @@ from helper_msgcoding import MsgEncode, MsgDecode from helper_sql import sqlQuery from network import asyncore_pollchoose as asyncore, knownnodes from network.bmproto import BMProto -from network.connectionpool import BMConnectionPool +import network.connectionpool as connectionpool from network.node import Node, Peer from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection from queues import excQueue @@ -202,7 +202,7 @@ class TestCore(unittest.TestCase): while c > 0: time.sleep(1) c -= 2 - for peer, con in BMConnectionPool().outboundConnections.iteritems(): + for peer, con in connectionpool.pool.outboundConnections.iteritems(): if ( peer.host.startswith('bootstrap') or peer.host == 'quzwelsuziwqgpt2.onion' @@ -232,7 +232,7 @@ class TestCore(unittest.TestCase): def test_dontconnect(self): """all connections are closed 5 seconds after setting dontconnect""" self._initiate_bootstrap() - self.assertEqual(len(BMConnectionPool().connections()), 0) + self.assertEqual(len(connectionpool.pool.connections()), 0) def test_connection(self): """test connection to bootstrap servers""" @@ -287,7 +287,7 @@ class TestCore(unittest.TestCase): tried_hosts = set() for _ in range(360): time.sleep(1) - for peer in BMConnectionPool().outboundConnections: + for peer in connectionpool.pool.outboundConnections: if peer.host.endswith('.onion'): tried_hosts.add(peer.host) else: diff --git a/src/tests/test_network.py b/src/tests/test_network.py index 08cd95ce..206117e0 100644 --- a/src/tests/test_network.py +++ b/src/tests/test_network.py @@ -27,7 +27,7 @@ class TestNetwork(TestPartialRun): # beware of singleton connectionpool.config = cls.config - cls.pool = network.BMConnectionPool() + cls.pool = connectionpool.pool cls.stats = stats network.start(cls.config, cls.state) diff --git a/src/upnp.py b/src/upnp.py index aa04a556..42ff0c6d 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -19,7 +19,7 @@ import state import tr from bmconfigparser import config from debug import logger -from network import BMConnectionPool, knownnodes, StoppableThread +from network import connectionpool, knownnodes, StoppableThread from network.node import Peer @@ -228,7 +228,7 @@ class uPnPThread(StoppableThread): # wait until asyncore binds so that we know the listening port bound = False while state.shutdown == 0 and not self._stopped and not bound: - for s in BMConnectionPool().listeningSockets.values(): + for s in connectionpool.pool.listeningSockets.values(): if s.is_bound(): bound = True if not bound: