diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 134ef9ee..7e36948c 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -6,8 +6,8 @@ from six.moves import queue from pybitmessage import state from pybitmessage.helper_random import randomshuffle from pybitmessage.queues import addrQueue -from .assemble import assemble_addr from .connectionpool import BMConnectionPool +from .assemble import assemble_addr from .threads import StoppableThread diff --git a/src/network/bmobject.py b/src/network/bmobject.py index c93dedf3..c5d4b2c6 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -9,6 +9,7 @@ from pybitmessage.addresses import calculateInventoryHash from pybitmessage.inventory import Inventory from .dandelion import Dandelion + logger = logging.getLogger('default') diff --git a/src/network/bmproto.py b/src/network/bmproto.py index f149683c..18d8e5c3 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -14,13 +14,15 @@ from pybitmessage import addresses, protocol, state from pybitmessage.bmconfigparser import BMConfigParser from pybitmessage.inventory import Inventory from pybitmessage.queues import invQueue, objectProcessorQueue, portCheckerQueue -from . import connectionpool, knownnodes +from pybitmessage.randomtrackingdict import RandomTrackingDict +from . import knownnodes from .advanceddispatcher import AdvancedDispatcher from .bmobject import ( BMObject, BMObjectAlreadyHaveError, BMObjectExpiredError, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectInvalidError, BMObjectUnwantedStreamError ) +from .pool import ConnectionPool from .constants import ( ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT, MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET @@ -29,7 +31,7 @@ from .dandelion import Dandelion from .proxy import ProxyError from .node import Node, Peer from .objectracker import ObjectTracker, missingObjects -from .randomtrackingdict import RandomTrackingDict + logger = logging.getLogger('default') @@ -535,7 +537,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if not self.isOutbound: self.append_write_buf(protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, True, + ConnectionPool.streams, True, nodeid=self.nodeid)) logger.debug( '%(host)s:%(port)i sending version', @@ -591,7 +593,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s because there is no overlapping' ' interest in streams.', self.destination) return False - if connectionpool.BMConnectionPool().inboundConnections.get( + if ConnectionPool.inboundConnections.get( self.destination): try: if not protocol.checkSocksIP(self.destination.host): @@ -609,8 +611,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # or server full report the same error to counter deanonymisation if ( Peer(self.destination.host, self.peerNode.port) - in connectionpool.BMConnectionPool().inboundConnections - or len(connectionpool.BMConnectionPool()) + in ConnectionPool.inboundConnections + or len(ConnectionPool) > BMConfigParser().safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + BMConfigParser().safeGetInt( @@ -622,7 +624,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closed connection to %s due to server full' ' or duplicate inbound/outbound.', self.destination) return False - if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce): + if ConnectionPool.isAlreadyConnected(self.nonce): self.append_write_buf(protocol.assembleErrorMessage( errorText="I'm connected to myself. Closing connection.", fatal=2)) @@ -636,7 +638,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): """Stop downloading object *hashId*""" - for connection in connectionpool.BMConnectionPool().connections(): + for connection in ConnectionPool.connections(): try: del connection.objectsNewToMe[hashId] except KeyError: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index f0890817..6545c3ce 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -8,18 +8,18 @@ import socket import sys import time -import asyncore_pollchoose as asyncore -import knownnodes from pybitmessage import helper_random, protocol, state from pybitmessage.bmconfigparser import BMConfigParser from pybitmessage.singleton import Singleton -from connectionchooser import chooseConnection -from node import Peer -from proxy import Proxy -from tcp import ( +from . import asyncore_pollchoose as asyncore +from . import knownnodes, pool +from .connectionchooser import chooseConnection +from .node import Peer +from .proxy import Proxy +from .tcp import ( bootstrap, Socks4aBMConnection, Socks5BMConnection, TCPConnection, TCPServer) -from udp import UDPSocket +from .udp import UDPSocket logger = logging.getLogger('default') @@ -401,3 +401,6 @@ class BMConnectionPool(object): pass for i in reaper: self.removeConnection(i) + + +pool.ConnectionPool = BMConnectionPool() diff --git a/src/network/dandelion.py b/src/network/dandelion.py index f4e9cec5..51d10af7 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -10,7 +10,7 @@ from time import time from pybitmessage import state from pybitmessage.queues import invQueue from pybitmessage.singleton import Singleton -from .connectionpool import BMConnectionPool +from .pool import ConnectionPool # randomise routes after 600 seconds REASSIGN_INTERVAL = 600 @@ -185,10 +185,10 @@ class Dandelion: # pylint: disable=old-style-class try: # random two connections self.stem = sample( - BMConnectionPool().outboundConnections.values(), MAX_STEMS) + ConnectionPool.outboundConnections.values(), MAX_STEMS) # not enough stems available except ValueError: - self.stem = BMConnectionPool().outboundConnections.values() + self.stem = ConnectionPool.outboundConnections.values() self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 0c856f51..0a907755 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,9 +4,9 @@ Module for tracking objects import time from threading import RLock -from .connectionpool import BMConnectionPool +from pybitmessage.randomtrackingdict import RandomTrackingDict from .dandelion import Dandelion -from .randomtrackingdict import RandomTrackingDict +from .pool import ConnectionPool haveBloom = False @@ -100,7 +100,7 @@ class ObjectTracker(object): def handleReceivedObject(self, streamNumber, hashid): """Handling received object""" - for i in BMConnectionPool().connections(): + for i in ConnectionPool.connections(): if not i.fullyEstablished: continue try: diff --git a/src/network/pool.py b/src/network/pool.py new file mode 100644 index 00000000..0a850e0c --- /dev/null +++ b/src/network/pool.py @@ -0,0 +1 @@ +ConnectionPool = None diff --git a/src/network/tcp.py b/src/network/tcp.py index 4ad8462a..2b554910 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -16,10 +16,11 @@ from pybitmessage.inventory import Inventory from pybitmessage.queues import invQueue, receiveDataQueue, UISignalQueue from pybitmessage.tr import _translate from . import asyncore_pollchoose as asyncore -from . import connectionpool, knownnodes +from . import knownnodes from .advanceddispatcher import AdvancedDispatcher from .assemble import assemble_addr from .bmproto import BMProto +from .pool import ConnectionPool from .constants import MAX_OBJECT_COUNT from .dandelion import Dandelion from .objectracker import ObjectTracker @@ -264,7 +265,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + ConnectionPool.streams, False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -315,7 +316,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + ConnectionPool.streams, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -339,7 +340,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.BMConnectionPool().streams, + ConnectionPool.streams, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -427,7 +428,7 @@ class TCPServer(AdvancedDispatcher): state.ownAddresses[Peer(*sock.getsockname())] = True if ( - len(connectionpool.BMConnectionPool()) + len(ConnectionPool) > BMConfigParser().safeGetInt( 'bitmessagesettings', 'maxtotalconnections') + BMConfigParser().safeGetInt( @@ -439,7 +440,6 @@ class TCPServer(AdvancedDispatcher): sock.close() return try: - connectionpool.BMConnectionPool().addConnection( - TCPConnection(sock=sock)) + ConnectionPool.addConnection(TCPConnection(sock=sock)) except socket.error: pass diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index e2d640c5..264d8be0 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -5,9 +5,9 @@ import time from pybitmessage import helper_random, protocol from pybitmessage.inventory import Inventory +from pybitmessage.randomtrackingdict import RandomTrackingDict from .connectionpool import BMConnectionPool from .dandelion import Dandelion -from .randomtrackingdict import RandomTrackingDict from .threads import StoppableThread