Made BMConnectionPool as global runtime variable in connectionpool from singleton

This commit is contained in:
anand k 2024-05-09 08:44:56 +05:30
parent 8fa9da4cb4
commit 28355d70c7
No known key found for this signature in database
GPG Key ID: 515AC24FA525DDE0
20 changed files with 66 additions and 68 deletions

View File

@ -96,9 +96,9 @@ from helper_sql import (
from highlevelcrypto import calculateInventoryHash
try:
from network import BMConnectionPool
from network import connectionpool
except ImportError:
BMConnectionPool = None
connectionpool = None
from network import stats, StoppableThread
from version import softwareVersion
@ -1475,18 +1475,18 @@ class BMRPCDispatcher(object):
Returns bitmessage connection information as dict with keys *inbound*,
*outbound*.
"""
if BMConnectionPool is None:
if connectionpool is None:
raise APIError(21, 'Could not import BMConnectionPool.')
inboundConnections = []
outboundConnections = []
for i in BMConnectionPool().inboundConnections.values():
for i in connectionpool.pool.inboundConnections.values():
inboundConnections.append({
'host': i.destination.host,
'port': i.destination.port,
'fullyEstablished': i.fullyEstablished,
'userAgent': str(i.userAgent)
})
for i in BMConnectionPool().outboundConnections.values():
for i in connectionpool.pool.outboundConnections.values():
outboundConnections.append({
'host': i.destination.host,
'port': i.destination.port,

View File

@ -10,7 +10,7 @@ import l10n
import network.stats
import state
import widgets
from network import BMConnectionPool, knownnodes
from network import connectionpool, knownnodes
from retranslateui import RetranslateMixin
from tr import _translate
from uisignaler import UISignaler
@ -148,16 +148,16 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
# pylint: disable=too-many-branches,undefined-variable
if outbound:
try:
c = BMConnectionPool().outboundConnections[destination]
c = connectionpool.pool.outboundConnections[destination]
except KeyError:
if add:
return
else:
try:
c = BMConnectionPool().inboundConnections[destination]
c = connectionpool.pool.inboundConnections[destination]
except KeyError:
try:
c = BMConnectionPool().inboundConnections[destination.host]
c = connectionpool.pool.inboundConnections[destination.host]
except KeyError:
if add:
return
@ -201,7 +201,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
self.tableWidgetConnectionCount.item(0, 0).setData(QtCore.Qt.UserRole, destination)
self.tableWidgetConnectionCount.item(0, 1).setData(QtCore.Qt.UserRole, outbound)
else:
if not BMConnectionPool().inboundConnections:
if not connectionpool.pool.inboundConnections:
self.window().setStatusIcon('yellow')
for i in range(self.tableWidgetConnectionCount.rowCount()):
if self.tableWidgetConnectionCount.item(i, 0).data(QtCore.Qt.UserRole).toPyObject() != destination:

View File

@ -20,7 +20,8 @@ import widgets
from bmconfigparser import config as config_obj
from helper_sql import sqlExecute, sqlStoredProcedure
from helper_startup import start_proxyconfig
from network import knownnodes, AnnounceThread
from network import knownnodes
from network.announcethread import AnnounceThread
from network.asyncore_pollchoose import set_rates
from tr import _translate

View File

@ -27,7 +27,7 @@ import queues
import state
from bmconfigparser import config
from helper_sql import sqlExecute, sqlQuery
from network import BMConnectionPool, knownnodes, StoppableThread
from network import connectionpool, knownnodes, StoppableThread
from tr import _translate
@ -129,7 +129,7 @@ class singleCleaner(StoppableThread):
os._exit(1) # pylint: disable=protected-access
# inv/object tracking
for connection in BMConnectionPool().connections():
for connection in connectionpool.pool.connections():
connection.clean()
# discovery tracking

View File

@ -2,21 +2,17 @@
Network subsystem package
"""
try:
from .announcethread import AnnounceThread
from .connectionpool import BMConnectionPool
except ImportError:
AnnounceThread = None
BMConnectionPool = None
from .threads import StoppableThread
__all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"]
__all__ = ["StoppableThread"]
def start(config, state):
"""Start network threads"""
import state
from .announcethread import AnnounceThread
import connectionpool # pylint: disable=relative-import
from .addrthread import AddrThread
from .dandelion import Dandelion
from .downloadthread import DownloadThread
@ -29,7 +25,7 @@ def start(config, state):
readKnownNodes()
# init, needs to be early because other thread may access it early
state.Dandelion = Dandelion()
BMConnectionPool().connectToStream(1)
connectionpool.pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),
DownloadThread(), UploadThread()

View File

@ -5,10 +5,10 @@ from six.moves import queue
# magic imports!
import state
import connectionpool
from helper_random import randomshuffle
from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue
from network.connectionpool import BMConnectionPool
from threads import StoppableThread
@ -29,7 +29,7 @@ class AddrThread(StoppableThread):
if chunk:
# Choose peers randomly
connections = BMConnectionPool().establishedConnections()
connections = connectionpool.pool.establishedConnections()
randomshuffle(connections)
for i in connections:
randomshuffle(chunk)

View File

@ -5,9 +5,9 @@ import time
# magic imports!
import state
import connectionpool
from bmconfigparser import config
from protocol import assembleAddrMessage
from network.connectionpool import BMConnectionPool
from node import Peer
from threads import StoppableThread
@ -31,7 +31,7 @@ class AnnounceThread(StoppableThread):
@staticmethod
def announceSelf():
"""Announce our presence"""
for connection in BMConnectionPool().udpSockets.values():
for connection in connectionpool.pool.udpSockets.values():
if not connection.announcing:
continue
for stream in state.streamsInWhichIAmParticipating:

View File

@ -12,10 +12,10 @@ import time
# magic imports!
import addresses
import connectionpool
import knownnodes
import protocol
import state
import connectionpool
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
@ -540,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
if not self.isOutbound:
self.append_write_buf(protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams, True,
connectionpool.pool.streams, True,
nodeid=self.nodeid))
logger.debug(
'%(host)s:%(port)i sending version',
@ -596,7 +596,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closed connection to %s because there is no overlapping'
' interest in streams.', self.destination)
return False
if connectionpool.BMConnectionPool().inboundConnections.get(
if connectionpool.pool.inboundConnections.get(
self.destination):
try:
if not protocol.checkSocksIP(self.destination.host):
@ -614,8 +614,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# or server full report the same error to counter deanonymisation
if (
Peer(self.destination.host, self.peerNode.port)
in connectionpool.BMConnectionPool().inboundConnections
or len(connectionpool.BMConnectionPool())
in connectionpool.pool.inboundConnections
or len(connectionpool.pool)
> config.safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ config.safeGetInt(
@ -627,7 +627,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closed connection to %s due to server full'
' or duplicate inbound/outbound.', self.destination)
return False
if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce):
if connectionpool.pool.isAlreadyConnected(self.nonce):
self.append_write_buf(protocol.assembleErrorMessage(
errorText="I'm connected to myself. Closing connection.",
fatal=2))
@ -641,7 +641,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
@staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading object *hashId*"""
for connection in connectionpool.BMConnectionPool().connections():
for connection in connectionpool.pool.connections():
try:
del connection.objectsNewToMe[hashId]
except KeyError:

View File

@ -17,7 +17,6 @@ from bmconfigparser import config
from connectionchooser import chooseConnection
from node import Peer
from proxy import Proxy
from singleton import Singleton
from tcp import (
bootstrap, Socks4aBMConnection, Socks5BMConnection,
TCPConnection, TCPServer)
@ -26,7 +25,6 @@ from udp import UDPSocket
logger = logging.getLogger('default')
@Singleton
class BMConnectionPool(object):
"""Pool of all existing connections"""
# pylint: disable=too-many-instance-attributes
@ -403,3 +401,6 @@ class BMConnectionPool(object):
pass
for i in reaper:
self.removeConnection(i)
pool = BMConnectionPool()

View File

@ -6,7 +6,7 @@ import state
import addresses
import helper_random
import protocol
from network.connectionpool import BMConnectionPool
import connectionpool
from objectracker import missingObjects
from threads import StoppableThread
@ -41,7 +41,7 @@ class DownloadThread(StoppableThread):
while not self._stopped:
requested = 0
# Choose downloading peers randomly
connections = BMConnectionPool().establishedConnections()
connections = connectionpool.pool.establishedConnections()
helper_random.randomshuffle(connections)
requestChunk = max(int(
min(self.maxRequestChunk, len(missingObjects))

View File

@ -8,7 +8,7 @@ from time import time
import addresses
import protocol
import state
from network.connectionpool import BMConnectionPool
import connectionpool
from queues import invQueue
from threads import StoppableThread
@ -18,7 +18,7 @@ def handleExpiredDandelion(expired):
the object"""
if not expired:
return
for i in BMConnectionPool().connections():
for i in connectionpool.pool.connections():
if not i.fullyEstablished:
continue
for x in expired:
@ -40,7 +40,7 @@ class InvThread(StoppableThread):
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
state.Dandelion.addHash(hashId, stream=stream)
for connection in BMConnectionPool().connections():
for connection in connectionpool.pool.connections():
if state.dandelion_enabled and connection != \
state.Dandelion.objectChildStem(hashId):
continue
@ -62,7 +62,7 @@ class InvThread(StoppableThread):
break
if chunk:
for connection in BMConnectionPool().connections():
for connection in connectionpool.pool.connections():
fluffs = []
stems = []
for inv in chunk:

View File

@ -3,7 +3,7 @@ A thread to handle network concerns
"""
import network.asyncore_pollchoose as asyncore
import state
from network.connectionpool import BMConnectionPool
import connectionpool
from queues import excQueue
from threads import StoppableThread
@ -15,24 +15,24 @@ class BMNetworkThread(StoppableThread):
def run(self):
try:
while not self._stopped and state.shutdown == 0:
BMConnectionPool().loop()
connectionpool.pool.loop()
except Exception as e:
excQueue.put((self.name, e))
raise
def stopThread(self):
super(BMNetworkThread, self).stopThread()
for i in BMConnectionPool().listeningSockets.values():
for i in connectionpool.pool.listeningSockets.values():
try:
i.close()
except: # nosec B110 # pylint:disable=bare-except
pass
for i in BMConnectionPool().outboundConnections.values():
for i in connectionpool.pool.outboundConnections.values():
try:
i.close()
except: # nosec B110 # pylint:disable=bare-except
pass
for i in BMConnectionPool().inboundConnections.values():
for i in connectionpool.pool.inboundConnections.values():
try:
i.close()
except: # nosec B110 # pylint:disable=bare-except

View File

@ -5,7 +5,7 @@ import time
from threading import RLock
import state
import network.connectionpool
import connectionpool
from randomtrackingdict import RandomTrackingDict
haveBloom = False
@ -100,7 +100,7 @@ class ObjectTracker(object):
def handleReceivedObject(self, streamNumber, hashid):
"""Handling received object"""
for i in network.connectionpool.BMConnectionPool().connections():
for i in connectionpool.pool.connections():
if not i.fullyEstablished:
continue
try:

View File

@ -6,8 +6,8 @@ import Queue
import socket
import state
import connectionpool
from network.advanceddispatcher import UnknownStateError
from network.connectionpool import BMConnectionPool
from queues import receiveDataQueue
from threads import StoppableThread
@ -36,7 +36,7 @@ class ReceiveQueueThread(StoppableThread):
# enough data, or the connection is to be aborted
try:
connection = BMConnectionPool().getConnectionByAddr(dest)
connection = connectionpool.pool.getConnectionByAddr(dest)
# connection object not found
except KeyError:
receiveDataQueue.task_done()

View File

@ -4,7 +4,7 @@ Network statistics
import time
import asyncore_pollchoose as asyncore
from network.connectionpool import BMConnectionPool
import connectionpool
from objectracker import missingObjects
@ -18,7 +18,7 @@ currentSentSpeed = 0
def connectedHostsList():
"""List of all the connected hosts"""
return BMConnectionPool().establishedConnections()
return connectionpool.pool.establishedConnections()
def sentBytes():
@ -69,8 +69,8 @@ def pendingDownload():
def pendingUpload():
"""Getting pending uploads"""
# tmp = {}
# for connection in BMConnectionPool().inboundConnections.values() + \
# BMConnectionPool().outboundConnections.values():
# for connection in connectionpool.pool.inboundConnections.values() + \
# connectionpool.pool.outboundConnections.values():
# for k in connection.objectsNewToThem.keys():
# tmp[k] = True
# This probably isn't the correct logic so it's disabled

View File

@ -15,13 +15,13 @@ import helper_random
import l10n
import protocol
import state
import connectionpool
from bmconfigparser import config
from highlevelcrypto import randomBytes
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
import asyncore_pollchoose as asyncore
import connectionpool
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProto
@ -267,7 +267,7 @@ class TCPConnection(BMProto, TLSDispatcher):
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
connectionpool.pool.streams,
False, nodeid=self.nodeid))
self.connectedAt = time.time()
receiveDataQueue.put(self.destination)
@ -318,7 +318,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection):
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
connectionpool.pool.streams,
False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
return True
@ -342,7 +342,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection):
self.append_write_buf(
protocol.assembleVersionMessage(
self.destination.host, self.destination.port,
connectionpool.BMConnectionPool().streams,
connectionpool.pool.streams,
False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size)
return True
@ -430,7 +430,7 @@ class TCPServer(AdvancedDispatcher):
state.ownAddresses[Peer(*sock.getsockname())] = True
if (
len(connectionpool.BMConnectionPool())
len(connectionpool.pool)
> config.safeGetInt(
'bitmessagesettings', 'maxtotalconnections')
+ config.safeGetInt(
@ -442,7 +442,7 @@ class TCPServer(AdvancedDispatcher):
sock.close()
return
try:
connectionpool.BMConnectionPool().addConnection(
connectionpool.pool.addConnection(
TCPConnection(sock=sock))
except socket.error:
pass

View File

@ -6,7 +6,7 @@ import time
import helper_random
import protocol
import state
from network.connectionpool import BMConnectionPool
import connectionpool
from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread
@ -22,7 +22,7 @@ class UploadThread(StoppableThread):
while not self._stopped:
uploaded = 0
# Choose uploading peers randomly
connections = BMConnectionPool().establishedConnections()
connections = connectionpool.pool.establishedConnections()
helper_random.randomshuffle(connections)
for i in connections:
now = time.time()

View File

@ -26,7 +26,7 @@ from helper_msgcoding import MsgEncode, MsgDecode
from helper_sql import sqlQuery
from network import asyncore_pollchoose as asyncore, knownnodes
from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool
import network.connectionpool as connectionpool
from network.node import Node, Peer
from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection
from queues import excQueue
@ -202,7 +202,7 @@ class TestCore(unittest.TestCase):
while c > 0:
time.sleep(1)
c -= 2
for peer, con in BMConnectionPool().outboundConnections.iteritems():
for peer, con in connectionpool.pool.outboundConnections.iteritems():
if (
peer.host.startswith('bootstrap')
or peer.host == 'quzwelsuziwqgpt2.onion'
@ -232,7 +232,7 @@ class TestCore(unittest.TestCase):
def test_dontconnect(self):
"""all connections are closed 5 seconds after setting dontconnect"""
self._initiate_bootstrap()
self.assertEqual(len(BMConnectionPool().connections()), 0)
self.assertEqual(len(connectionpool.pool.connections()), 0)
def test_connection(self):
"""test connection to bootstrap servers"""
@ -287,7 +287,7 @@ class TestCore(unittest.TestCase):
tried_hosts = set()
for _ in range(360):
time.sleep(1)
for peer in BMConnectionPool().outboundConnections:
for peer in connectionpool.pool.outboundConnections:
if peer.host.endswith('.onion'):
tried_hosts.add(peer.host)
else:

View File

@ -27,7 +27,7 @@ class TestNetwork(TestPartialRun):
# beware of singleton
connectionpool.config = cls.config
cls.pool = network.BMConnectionPool()
cls.pool = connectionpool.pool
cls.stats = stats
network.start(cls.config, cls.state)

View File

@ -19,7 +19,7 @@ import state
import tr
from bmconfigparser import config
from debug import logger
from network import BMConnectionPool, knownnodes, StoppableThread
from network import connectionpool, knownnodes, StoppableThread
from network.node import Peer
@ -228,7 +228,7 @@ class uPnPThread(StoppableThread):
# wait until asyncore binds so that we know the listening port
bound = False
while state.shutdown == 0 and not self._stopped and not bound:
for s in BMConnectionPool().listeningSockets.values():
for s in connectionpool.pool.listeningSockets.values():
if s.is_bound():
bound = True
if not bound: