Made BMConnectionPool as global runtime variable in connectionpool from singleton #2220
10
src/api.py
10
src/api.py
|
@ -96,9 +96,9 @@ from helper_sql import (
|
||||||
from highlevelcrypto import calculateInventoryHash
|
from highlevelcrypto import calculateInventoryHash
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from network import BMConnectionPool
|
from network import connectionpool
|
||||||
except ImportError:
|
except ImportError:
|
||||||
BMConnectionPool = None
|
connectionpool = None
|
||||||
|
|
||||||
from network import stats, StoppableThread
|
from network import stats, StoppableThread
|
||||||
from version import softwareVersion
|
from version import softwareVersion
|
||||||
|
@ -1475,18 +1475,18 @@ class BMRPCDispatcher(object):
|
||||||
Returns bitmessage connection information as dict with keys *inbound*,
|
Returns bitmessage connection information as dict with keys *inbound*,
|
||||||
*outbound*.
|
*outbound*.
|
||||||
"""
|
"""
|
||||||
if BMConnectionPool is None:
|
if connectionpool is None:
|
||||||
raise APIError(21, 'Could not import BMConnectionPool.')
|
raise APIError(21, 'Could not import BMConnectionPool.')
|
||||||
inboundConnections = []
|
inboundConnections = []
|
||||||
outboundConnections = []
|
outboundConnections = []
|
||||||
for i in BMConnectionPool().inboundConnections.values():
|
for i in connectionpool.pool.inboundConnections.values():
|
||||||
inboundConnections.append({
|
inboundConnections.append({
|
||||||
'host': i.destination.host,
|
'host': i.destination.host,
|
||||||
'port': i.destination.port,
|
'port': i.destination.port,
|
||||||
'fullyEstablished': i.fullyEstablished,
|
'fullyEstablished': i.fullyEstablished,
|
||||||
'userAgent': str(i.userAgent)
|
'userAgent': str(i.userAgent)
|
||||||
})
|
})
|
||||||
for i in BMConnectionPool().outboundConnections.values():
|
for i in connectionpool.pool.outboundConnections.values():
|
||||||
outboundConnections.append({
|
outboundConnections.append({
|
||||||
'host': i.destination.host,
|
'host': i.destination.host,
|
||||||
'port': i.destination.port,
|
'port': i.destination.port,
|
||||||
|
|
|
@ -10,7 +10,7 @@ import l10n
|
||||||
import network.stats
|
import network.stats
|
||||||
import state
|
import state
|
||||||
import widgets
|
import widgets
|
||||||
from network import BMConnectionPool, knownnodes
|
from network import connectionpool, knownnodes
|
||||||
from retranslateui import RetranslateMixin
|
from retranslateui import RetranslateMixin
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
from uisignaler import UISignaler
|
from uisignaler import UISignaler
|
||||||
|
@ -148,16 +148,16 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
|
||||||
# pylint: disable=too-many-branches,undefined-variable
|
# pylint: disable=too-many-branches,undefined-variable
|
||||||
if outbound:
|
if outbound:
|
||||||
try:
|
try:
|
||||||
c = BMConnectionPool().outboundConnections[destination]
|
c = connectionpool.pool.outboundConnections[destination]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
if add:
|
if add:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
c = BMConnectionPool().inboundConnections[destination]
|
c = connectionpool.pool.inboundConnections[destination]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
try:
|
try:
|
||||||
c = BMConnectionPool().inboundConnections[destination.host]
|
c = connectionpool.pool.inboundConnections[destination.host]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
if add:
|
if add:
|
||||||
return
|
return
|
||||||
|
@ -201,7 +201,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
|
||||||
self.tableWidgetConnectionCount.item(0, 0).setData(QtCore.Qt.UserRole, destination)
|
self.tableWidgetConnectionCount.item(0, 0).setData(QtCore.Qt.UserRole, destination)
|
||||||
self.tableWidgetConnectionCount.item(0, 1).setData(QtCore.Qt.UserRole, outbound)
|
self.tableWidgetConnectionCount.item(0, 1).setData(QtCore.Qt.UserRole, outbound)
|
||||||
else:
|
else:
|
||||||
if not BMConnectionPool().inboundConnections:
|
if not connectionpool.pool.inboundConnections:
|
||||||
self.window().setStatusIcon('yellow')
|
self.window().setStatusIcon('yellow')
|
||||||
for i in range(self.tableWidgetConnectionCount.rowCount()):
|
for i in range(self.tableWidgetConnectionCount.rowCount()):
|
||||||
if self.tableWidgetConnectionCount.item(i, 0).data(QtCore.Qt.UserRole).toPyObject() != destination:
|
if self.tableWidgetConnectionCount.item(i, 0).data(QtCore.Qt.UserRole).toPyObject() != destination:
|
||||||
|
|
|
@ -20,7 +20,8 @@ import widgets
|
||||||
from bmconfigparser import config as config_obj
|
from bmconfigparser import config as config_obj
|
||||||
from helper_sql import sqlExecute, sqlStoredProcedure
|
from helper_sql import sqlExecute, sqlStoredProcedure
|
||||||
from helper_startup import start_proxyconfig
|
from helper_startup import start_proxyconfig
|
||||||
from network import knownnodes, AnnounceThread
|
from network import knownnodes
|
||||||
|
from network.announcethread import AnnounceThread
|
||||||
from network.asyncore_pollchoose import set_rates
|
from network.asyncore_pollchoose import set_rates
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ import queues
|
||||||
import state
|
import state
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from helper_sql import sqlExecute, sqlQuery
|
from helper_sql import sqlExecute, sqlQuery
|
||||||
from network import BMConnectionPool, knownnodes, StoppableThread
|
from network import connectionpool, knownnodes, StoppableThread
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ class singleCleaner(StoppableThread):
|
||||||
os._exit(1) # pylint: disable=protected-access
|
os._exit(1) # pylint: disable=protected-access
|
||||||
|
|
||||||
# inv/object tracking
|
# inv/object tracking
|
||||||
for connection in BMConnectionPool().connections():
|
for connection in connectionpool.pool.connections():
|
||||||
connection.clean()
|
connection.clean()
|
||||||
|
|
||||||
# discovery tracking
|
# discovery tracking
|
||||||
|
|
|
@ -2,21 +2,17 @@
|
||||||
Network subsystem package
|
Network subsystem package
|
||||||
"""
|
"""
|
||||||
|
|||||||
|
|
||||||
try:
|
|
||||||
from .announcethread import AnnounceThread
|
|
||||||
from .connectionpool import BMConnectionPool
|
|
||||||
except ImportError:
|
|
||||||
AnnounceThread = None
|
|
||||||
BMConnectionPool = None
|
|
||||||
from .threads import StoppableThread
|
from .threads import StoppableThread
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"]
|
__all__ = ["StoppableThread"]
|
||||||
|
|
||||||
|
|
||||||
def start(config, state):
|
def start(config, state):
|
||||||
"""Start network threads"""
|
"""Start network threads"""
|
||||||
import state
|
import state
|
||||||
|
from .announcethread import AnnounceThread
|
||||||
|
import connectionpool # pylint: disable=relative-import
|
||||||
from .addrthread import AddrThread
|
from .addrthread import AddrThread
|
||||||
I think we can use the object name here, i.e. I think we can use the object name here, i.e. `connectionpool` as I recommended above.
|
|||||||
from .dandelion import Dandelion
|
from .dandelion import Dandelion
|
||||||
from .downloadthread import DownloadThread
|
from .downloadthread import DownloadThread
|
||||||
|
@ -29,7 +25,7 @@ def start(config, state):
|
||||||
readKnownNodes()
|
readKnownNodes()
|
||||||
# init, needs to be early because other thread may access it early
|
# init, needs to be early because other thread may access it early
|
||||||
state.Dandelion = Dandelion()
|
state.Dandelion = Dandelion()
|
||||||
BMConnectionPool().connectToStream(1)
|
connectionpool.pool.connectToStream(1)
|
||||||
for thread in (
|
for thread in (
|
||||||
BMNetworkThread(), InvThread(), AddrThread(),
|
BMNetworkThread(), InvThread(), AddrThread(),
|
||||||
DownloadThread(), UploadThread()
|
DownloadThread(), UploadThread()
|
||||||
|
|
|
@ -5,10 +5,10 @@ from six.moves import queue
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import state
|
import state
|
||||||
|
import connectionpool
|
||||||
from helper_random import randomshuffle
|
from helper_random import randomshuffle
|
||||||
from protocol import assembleAddrMessage
|
from protocol import assembleAddrMessage
|
||||||
from queues import addrQueue # FIXME: init with queue
|
from queues import addrQueue # FIXME: init with queue
|
||||||
from network.connectionpool import BMConnectionPool
|
|
||||||
|
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ class AddrThread(StoppableThread):
|
||||||
|
|
||||||
if chunk:
|
if chunk:
|
||||||
# Choose peers randomly
|
# Choose peers randomly
|
||||||
connections = BMConnectionPool().establishedConnections()
|
connections = connectionpool.pool.establishedConnections()
|
||||||
randomshuffle(connections)
|
randomshuffle(connections)
|
||||||
for i in connections:
|
for i in connections:
|
||||||
randomshuffle(chunk)
|
randomshuffle(chunk)
|
||||||
|
|
|
@ -5,9 +5,9 @@ import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import state
|
import state
|
||||||
|
import connectionpool
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from protocol import assembleAddrMessage
|
from protocol import assembleAddrMessage
|
||||||
from network.connectionpool import BMConnectionPool
|
|
||||||
|
|
||||||
from node import Peer
|
from node import Peer
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
@ -31,7 +31,7 @@ class AnnounceThread(StoppableThread):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def announceSelf():
|
def announceSelf():
|
||||||
"""Announce our presence"""
|
"""Announce our presence"""
|
||||||
for connection in BMConnectionPool().udpSockets.values():
|
for connection in connectionpool.pool.udpSockets.values():
|
||||||
if not connection.announcing:
|
if not connection.announcing:
|
||||||
continue
|
continue
|
||||||
for stream in state.streamsInWhichIAmParticipating:
|
for stream in state.streamsInWhichIAmParticipating:
|
||||||
|
|
|
@ -12,10 +12,10 @@ import time
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import addresses
|
import addresses
|
||||||
import connectionpool
|
|
||||||
import knownnodes
|
import knownnodes
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
|
import connectionpool
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
|
@ -540,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
if not self.isOutbound:
|
if not self.isOutbound:
|
||||||
self.append_write_buf(protocol.assembleVersionMessage(
|
self.append_write_buf(protocol.assembleVersionMessage(
|
||||||
self.destination.host, self.destination.port,
|
self.destination.host, self.destination.port,
|
||||||
connectionpool.BMConnectionPool().streams, True,
|
connectionpool.pool.streams, True,
|
||||||
nodeid=self.nodeid))
|
nodeid=self.nodeid))
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'%(host)s:%(port)i sending version',
|
'%(host)s:%(port)i sending version',
|
||||||
|
@ -596,7 +596,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
'Closed connection to %s because there is no overlapping'
|
'Closed connection to %s because there is no overlapping'
|
||||||
' interest in streams.', self.destination)
|
' interest in streams.', self.destination)
|
||||||
return False
|
return False
|
||||||
if connectionpool.BMConnectionPool().inboundConnections.get(
|
if connectionpool.pool.inboundConnections.get(
|
||||||
self.destination):
|
self.destination):
|
||||||
try:
|
try:
|
||||||
if not protocol.checkSocksIP(self.destination.host):
|
if not protocol.checkSocksIP(self.destination.host):
|
||||||
|
@ -614,8 +614,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
# or server full report the same error to counter deanonymisation
|
# or server full report the same error to counter deanonymisation
|
||||||
if (
|
if (
|
||||||
Peer(self.destination.host, self.peerNode.port)
|
Peer(self.destination.host, self.peerNode.port)
|
||||||
in connectionpool.BMConnectionPool().inboundConnections
|
in connectionpool.pool.inboundConnections
|
||||||
or len(connectionpool.BMConnectionPool())
|
or len(connectionpool.pool)
|
||||||
> config.safeGetInt(
|
> config.safeGetInt(
|
||||||
'bitmessagesettings', 'maxtotalconnections')
|
'bitmessagesettings', 'maxtotalconnections')
|
||||||
+ config.safeGetInt(
|
+ config.safeGetInt(
|
||||||
|
@ -627,7 +627,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
'Closed connection to %s due to server full'
|
'Closed connection to %s due to server full'
|
||||||
' or duplicate inbound/outbound.', self.destination)
|
' or duplicate inbound/outbound.', self.destination)
|
||||||
return False
|
return False
|
||||||
if connectionpool.BMConnectionPool().isAlreadyConnected(self.nonce):
|
if connectionpool.pool.isAlreadyConnected(self.nonce):
|
||||||
self.append_write_buf(protocol.assembleErrorMessage(
|
self.append_write_buf(protocol.assembleErrorMessage(
|
||||||
errorText="I'm connected to myself. Closing connection.",
|
errorText="I'm connected to myself. Closing connection.",
|
||||||
fatal=2))
|
fatal=2))
|
||||||
|
@ -641,7 +641,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def stopDownloadingObject(hashId, forwardAnyway=False):
|
def stopDownloadingObject(hashId, forwardAnyway=False):
|
||||||
"""Stop downloading object *hashId*"""
|
"""Stop downloading object *hashId*"""
|
||||||
for connection in connectionpool.BMConnectionPool().connections():
|
for connection in connectionpool.pool.connections():
|
||||||
try:
|
try:
|
||||||
del connection.objectsNewToMe[hashId]
|
del connection.objectsNewToMe[hashId]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|
|
@ -17,7 +17,6 @@ from bmconfigparser import config
|
||||||
from connectionchooser import chooseConnection
|
from connectionchooser import chooseConnection
|
||||||
from node import Peer
|
from node import Peer
|
||||||
from proxy import Proxy
|
from proxy import Proxy
|
||||||
from singleton import Singleton
|
|
||||||
from tcp import (
|
from tcp import (
|
||||||
bootstrap, Socks4aBMConnection, Socks5BMConnection,
|
bootstrap, Socks4aBMConnection, Socks5BMConnection,
|
||||||
TCPConnection, TCPServer)
|
TCPConnection, TCPServer)
|
||||||
|
@ -26,7 +25,6 @@ from udp import UDPSocket
|
||||||
logger = logging.getLogger('default')
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
class BMConnectionPool(object):
|
class BMConnectionPool(object):
|
||||||
"""Pool of all existing connections"""
|
"""Pool of all existing connections"""
|
||||||
# pylint: disable=too-many-instance-attributes
|
# pylint: disable=too-many-instance-attributes
|
||||||
|
@ -403,3 +401,6 @@ class BMConnectionPool(object):
|
||||||
pass
|
pass
|
||||||
for i in reaper:
|
for i in reaper:
|
||||||
self.removeConnection(i)
|
self.removeConnection(i)
|
||||||
|
|
||||||
|
|
||||||
|
pool = BMConnectionPool()
|
||||||
|
|
|
@ -6,7 +6,7 @@ import state
|
||||||
import addresses
|
import addresses
|
||||||
import helper_random
|
import helper_random
|
||||||
import protocol
|
import protocol
|
||||||
from network.connectionpool import BMConnectionPool
|
import connectionpool
|
||||||
from objectracker import missingObjects
|
from objectracker import missingObjects
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ class DownloadThread(StoppableThread):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
requested = 0
|
requested = 0
|
||||||
# Choose downloading peers randomly
|
# Choose downloading peers randomly
|
||||||
connections = BMConnectionPool().establishedConnections()
|
connections = connectionpool.pool.establishedConnections()
|
||||||
helper_random.randomshuffle(connections)
|
helper_random.randomshuffle(connections)
|
||||||
requestChunk = max(int(
|
requestChunk = max(int(
|
||||||
min(self.maxRequestChunk, len(missingObjects))
|
min(self.maxRequestChunk, len(missingObjects))
|
||||||
|
|
|
@ -8,7 +8,7 @@ from time import time
|
||||||
import addresses
|
import addresses
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
from network.connectionpool import BMConnectionPool
|
import connectionpool
|
||||||
from queues import invQueue
|
from queues import invQueue
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ def handleExpiredDandelion(expired):
|
||||||
the object"""
|
the object"""
|
||||||
if not expired:
|
if not expired:
|
||||||
return
|
return
|
||||||
for i in BMConnectionPool().connections():
|
for i in connectionpool.pool.connections():
|
||||||
if not i.fullyEstablished:
|
if not i.fullyEstablished:
|
||||||
continue
|
continue
|
||||||
for x in expired:
|
for x in expired:
|
||||||
|
@ -40,7 +40,7 @@ class InvThread(StoppableThread):
|
||||||
def handleLocallyGenerated(stream, hashId):
|
def handleLocallyGenerated(stream, hashId):
|
||||||
"""Locally generated inventory items require special handling"""
|
"""Locally generated inventory items require special handling"""
|
||||||
state.Dandelion.addHash(hashId, stream=stream)
|
state.Dandelion.addHash(hashId, stream=stream)
|
||||||
for connection in BMConnectionPool().connections():
|
for connection in connectionpool.pool.connections():
|
||||||
if state.dandelion_enabled and connection != \
|
if state.dandelion_enabled and connection != \
|
||||||
state.Dandelion.objectChildStem(hashId):
|
state.Dandelion.objectChildStem(hashId):
|
||||||
continue
|
continue
|
||||||
|
@ -62,7 +62,7 @@ class InvThread(StoppableThread):
|
||||||
break
|
break
|
||||||
|
|
||||||
if chunk:
|
if chunk:
|
||||||
for connection in BMConnectionPool().connections():
|
for connection in connectionpool.pool.connections():
|
||||||
fluffs = []
|
fluffs = []
|
||||||
stems = []
|
stems = []
|
||||||
for inv in chunk:
|
for inv in chunk:
|
||||||
|
|
|
@ -3,7 +3,7 @@ A thread to handle network concerns
|
||||||
"""
|
"""
|
||||||
import network.asyncore_pollchoose as asyncore
|
import network.asyncore_pollchoose as asyncore
|
||||||
import state
|
import state
|
||||||
from network.connectionpool import BMConnectionPool
|
import connectionpool
|
||||||
from queues import excQueue
|
from queues import excQueue
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -15,24 +15,24 @@ class BMNetworkThread(StoppableThread):
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
while not self._stopped and state.shutdown == 0:
|
while not self._stopped and state.shutdown == 0:
|
||||||
BMConnectionPool().loop()
|
connectionpool.pool.loop()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
excQueue.put((self.name, e))
|
excQueue.put((self.name, e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def stopThread(self):
|
def stopThread(self):
|
||||||
super(BMNetworkThread, self).stopThread()
|
super(BMNetworkThread, self).stopThread()
|
||||||
for i in BMConnectionPool().listeningSockets.values():
|
for i in connectionpool.pool.listeningSockets.values():
|
||||||
try:
|
try:
|
||||||
i.close()
|
i.close()
|
||||||
except: # nosec B110 # pylint:disable=bare-except
|
except: # nosec B110 # pylint:disable=bare-except
|
||||||
pass
|
pass
|
||||||
for i in BMConnectionPool().outboundConnections.values():
|
for i in connectionpool.pool.outboundConnections.values():
|
||||||
try:
|
try:
|
||||||
i.close()
|
i.close()
|
||||||
except: # nosec B110 # pylint:disable=bare-except
|
except: # nosec B110 # pylint:disable=bare-except
|
||||||
pass
|
pass
|
||||||
for i in BMConnectionPool().inboundConnections.values():
|
for i in connectionpool.pool.inboundConnections.values():
|
||||||
try:
|
try:
|
||||||
i.close()
|
i.close()
|
||||||
except: # nosec B110 # pylint:disable=bare-except
|
except: # nosec B110 # pylint:disable=bare-except
|
||||||
|
|
|
@ -5,7 +5,7 @@ import time
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
|
|
||||||
import state
|
import state
|
||||||
import network.connectionpool
|
import connectionpool
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
|
|
||||||
haveBloom = False
|
haveBloom = False
|
||||||
|
@ -100,7 +100,7 @@ class ObjectTracker(object):
|
||||||
|
|
||||||
def handleReceivedObject(self, streamNumber, hashid):
|
def handleReceivedObject(self, streamNumber, hashid):
|
||||||
"""Handling received object"""
|
"""Handling received object"""
|
||||||
for i in network.connectionpool.BMConnectionPool().connections():
|
for i in connectionpool.pool.connections():
|
||||||
if not i.fullyEstablished:
|
if not i.fullyEstablished:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -6,8 +6,8 @@ import Queue
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
import state
|
import state
|
||||||
|
import connectionpool
|
||||||
from network.advanceddispatcher import UnknownStateError
|
from network.advanceddispatcher import UnknownStateError
|
||||||
from network.connectionpool import BMConnectionPool
|
|
||||||
from queues import receiveDataQueue
|
from queues import receiveDataQueue
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ class ReceiveQueueThread(StoppableThread):
|
||||||
# enough data, or the connection is to be aborted
|
# enough data, or the connection is to be aborted
|
||||||
|
|
||||||
try:
|
try:
|
||||||
connection = BMConnectionPool().getConnectionByAddr(dest)
|
connection = connectionpool.pool.getConnectionByAddr(dest)
|
||||||
# connection object not found
|
# connection object not found
|
||||||
except KeyError:
|
except KeyError:
|
||||||
receiveDataQueue.task_done()
|
receiveDataQueue.task_done()
|
||||||
|
|
|
@ -4,7 +4,7 @@ Network statistics
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
from network.connectionpool import BMConnectionPool
|
import connectionpool
|
||||||
from objectracker import missingObjects
|
from objectracker import missingObjects
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ currentSentSpeed = 0
|
||||||
|
|
||||||
def connectedHostsList():
|
def connectedHostsList():
|
||||||
"""List of all the connected hosts"""
|
"""List of all the connected hosts"""
|
||||||
return BMConnectionPool().establishedConnections()
|
return connectionpool.pool.establishedConnections()
|
||||||
|
|
||||||
|
|
||||||
def sentBytes():
|
def sentBytes():
|
||||||
|
@ -69,8 +69,8 @@ def pendingDownload():
|
||||||
def pendingUpload():
|
def pendingUpload():
|
||||||
"""Getting pending uploads"""
|
"""Getting pending uploads"""
|
||||||
# tmp = {}
|
# tmp = {}
|
||||||
# for connection in BMConnectionPool().inboundConnections.values() + \
|
# for connection in connectionpool.pool.inboundConnections.values() + \
|
||||||
# BMConnectionPool().outboundConnections.values():
|
# connectionpool.pool.outboundConnections.values():
|
||||||
# for k in connection.objectsNewToThem.keys():
|
# for k in connection.objectsNewToThem.keys():
|
||||||
# tmp[k] = True
|
# tmp[k] = True
|
||||||
# This probably isn't the correct logic so it's disabled
|
# This probably isn't the correct logic so it's disabled
|
||||||
|
|
|
@ -15,13 +15,13 @@ import helper_random
|
||||||
import l10n
|
import l10n
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
|
import connectionpool
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from highlevelcrypto import randomBytes
|
from highlevelcrypto import randomBytes
|
||||||
from queues import invQueue, receiveDataQueue, UISignalQueue
|
from queues import invQueue, receiveDataQueue, UISignalQueue
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
import asyncore_pollchoose as asyncore
|
import asyncore_pollchoose as asyncore
|
||||||
import connectionpool
|
|
||||||
import knownnodes
|
import knownnodes
|
||||||
from network.advanceddispatcher import AdvancedDispatcher
|
from network.advanceddispatcher import AdvancedDispatcher
|
||||||
from network.bmproto import BMProto
|
from network.bmproto import BMProto
|
||||||
|
@ -267,7 +267,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
self.append_write_buf(
|
self.append_write_buf(
|
||||||
protocol.assembleVersionMessage(
|
protocol.assembleVersionMessage(
|
||||||
self.destination.host, self.destination.port,
|
self.destination.host, self.destination.port,
|
||||||
connectionpool.BMConnectionPool().streams,
|
connectionpool.pool.streams,
|
||||||
False, nodeid=self.nodeid))
|
False, nodeid=self.nodeid))
|
||||||
self.connectedAt = time.time()
|
self.connectedAt = time.time()
|
||||||
receiveDataQueue.put(self.destination)
|
receiveDataQueue.put(self.destination)
|
||||||
|
@ -318,7 +318,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection):
|
||||||
self.append_write_buf(
|
self.append_write_buf(
|
||||||
protocol.assembleVersionMessage(
|
protocol.assembleVersionMessage(
|
||||||
self.destination.host, self.destination.port,
|
self.destination.host, self.destination.port,
|
||||||
connectionpool.BMConnectionPool().streams,
|
connectionpool.pool.streams,
|
||||||
False, nodeid=self.nodeid))
|
False, nodeid=self.nodeid))
|
||||||
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
||||||
return True
|
return True
|
||||||
|
@ -342,7 +342,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection):
|
||||||
self.append_write_buf(
|
self.append_write_buf(
|
||||||
protocol.assembleVersionMessage(
|
protocol.assembleVersionMessage(
|
||||||
self.destination.host, self.destination.port,
|
self.destination.host, self.destination.port,
|
||||||
connectionpool.BMConnectionPool().streams,
|
connectionpool.pool.streams,
|
||||||
False, nodeid=self.nodeid))
|
False, nodeid=self.nodeid))
|
||||||
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
||||||
return True
|
return True
|
||||||
|
@ -430,7 +430,7 @@ class TCPServer(AdvancedDispatcher):
|
||||||
|
|
||||||
state.ownAddresses[Peer(*sock.getsockname())] = True
|
state.ownAddresses[Peer(*sock.getsockname())] = True
|
||||||
if (
|
if (
|
||||||
len(connectionpool.BMConnectionPool())
|
len(connectionpool.pool)
|
||||||
> config.safeGetInt(
|
> config.safeGetInt(
|
||||||
'bitmessagesettings', 'maxtotalconnections')
|
'bitmessagesettings', 'maxtotalconnections')
|
||||||
+ config.safeGetInt(
|
+ config.safeGetInt(
|
||||||
|
@ -442,7 +442,7 @@ class TCPServer(AdvancedDispatcher):
|
||||||
sock.close()
|
sock.close()
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
connectionpool.BMConnectionPool().addConnection(
|
connectionpool.pool.addConnection(
|
||||||
TCPConnection(sock=sock))
|
TCPConnection(sock=sock))
|
||||||
except socket.error:
|
except socket.error:
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -6,7 +6,7 @@ import time
|
||||||
import helper_random
|
import helper_random
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
from network.connectionpool import BMConnectionPool
|
import connectionpool
|
||||||
from randomtrackingdict import RandomTrackingDict
|
from randomtrackingdict import RandomTrackingDict
|
||||||
from threads import StoppableThread
|
from threads import StoppableThread
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ class UploadThread(StoppableThread):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
uploaded = 0
|
uploaded = 0
|
||||||
# Choose uploading peers randomly
|
# Choose uploading peers randomly
|
||||||
connections = BMConnectionPool().establishedConnections()
|
connections = connectionpool.pool.establishedConnections()
|
||||||
helper_random.randomshuffle(connections)
|
helper_random.randomshuffle(connections)
|
||||||
for i in connections:
|
for i in connections:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
|
@ -26,7 +26,7 @@ from helper_msgcoding import MsgEncode, MsgDecode
|
||||||
from helper_sql import sqlQuery
|
from helper_sql import sqlQuery
|
||||||
from network import asyncore_pollchoose as asyncore, knownnodes
|
from network import asyncore_pollchoose as asyncore, knownnodes
|
||||||
from network.bmproto import BMProto
|
from network.bmproto import BMProto
|
||||||
from network.connectionpool import BMConnectionPool
|
import network.connectionpool as connectionpool
|
||||||
from network.node import Node, Peer
|
from network.node import Node, Peer
|
||||||
from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection
|
from network.tcp import Socks4aBMConnection, Socks5BMConnection, TCPConnection
|
||||||
from queues import excQueue
|
from queues import excQueue
|
||||||
|
@ -202,7 +202,7 @@ class TestCore(unittest.TestCase):
|
||||||
while c > 0:
|
while c > 0:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
c -= 2
|
c -= 2
|
||||||
for peer, con in BMConnectionPool().outboundConnections.iteritems():
|
for peer, con in connectionpool.pool.outboundConnections.iteritems():
|
||||||
if (
|
if (
|
||||||
peer.host.startswith('bootstrap')
|
peer.host.startswith('bootstrap')
|
||||||
or peer.host == 'quzwelsuziwqgpt2.onion'
|
or peer.host == 'quzwelsuziwqgpt2.onion'
|
||||||
|
@ -232,7 +232,7 @@ class TestCore(unittest.TestCase):
|
||||||
def test_dontconnect(self):
|
def test_dontconnect(self):
|
||||||
"""all connections are closed 5 seconds after setting dontconnect"""
|
"""all connections are closed 5 seconds after setting dontconnect"""
|
||||||
self._initiate_bootstrap()
|
self._initiate_bootstrap()
|
||||||
self.assertEqual(len(BMConnectionPool().connections()), 0)
|
self.assertEqual(len(connectionpool.pool.connections()), 0)
|
||||||
|
|
||||||
def test_connection(self):
|
def test_connection(self):
|
||||||
"""test connection to bootstrap servers"""
|
"""test connection to bootstrap servers"""
|
||||||
|
@ -287,7 +287,7 @@ class TestCore(unittest.TestCase):
|
||||||
tried_hosts = set()
|
tried_hosts = set()
|
||||||
for _ in range(360):
|
for _ in range(360):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
for peer in BMConnectionPool().outboundConnections:
|
for peer in connectionpool.pool.outboundConnections:
|
||||||
if peer.host.endswith('.onion'):
|
if peer.host.endswith('.onion'):
|
||||||
tried_hosts.add(peer.host)
|
tried_hosts.add(peer.host)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -27,7 +27,7 @@ class TestNetwork(TestPartialRun):
|
||||||
|
|
||||||
# beware of singleton
|
# beware of singleton
|
||||||
connectionpool.config = cls.config
|
connectionpool.config = cls.config
|
||||||
cls.pool = network.BMConnectionPool()
|
cls.pool = connectionpool.pool
|
||||||
cls.stats = stats
|
cls.stats = stats
|
||||||
|
|
||||||
network.start(cls.config, cls.state)
|
network.start(cls.config, cls.state)
|
||||||
|
|
|
@ -19,7 +19,7 @@ import state
|
||||||
import tr
|
import tr
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from debug import logger
|
from debug import logger
|
||||||
from network import BMConnectionPool, knownnodes, StoppableThread
|
from network import connectionpool, knownnodes, StoppableThread
|
||||||
from network.node import Peer
|
from network.node import Peer
|
||||||
|
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ class uPnPThread(StoppableThread):
|
||||||
# wait until asyncore binds so that we know the listening port
|
# wait until asyncore binds so that we know the listening port
|
||||||
bound = False
|
bound = False
|
||||||
while state.shutdown == 0 and not self._stopped and not bound:
|
while state.shutdown == 0 and not self._stopped and not bound:
|
||||||
for s in BMConnectionPool().listeningSockets.values():
|
for s in connectionpool.pool.listeningSockets.values():
|
||||||
if s.is_bound():
|
if s.is_bound():
|
||||||
bound = True
|
bound = True
|
||||||
if not bound:
|
if not bound:
|
||||||
|
|
Reference in New Issue
Block a user
how about we use a different name for the object than the class name? How about
network.connectionpool
for instance and we keepnetwork.BMConnectionPool
as the class name?I'm not sure this try/except is needed anymore once we fix the circular imports.
done, renamed as pool