diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index f51ee063..9acd1278 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -237,8 +237,7 @@ class Main(object): upnpThread = upnp.uPnPThread() upnpThread.start() else: - # Populate with hardcoded value (same as connectToStream above) - state.streamsInWhichIAmParticipating.append(1) + network.connectionpool.pool.connectToStream(1) if not daemon and state.enableGUI: if state.curses: diff --git a/src/bitmessageqt/settings.py b/src/bitmessageqt/settings.py index 161acf69..b0c3a62a 100644 --- a/src/bitmessageqt/settings.py +++ b/src/bitmessageqt/settings.py @@ -20,7 +20,7 @@ import bitmessageqt.widgets as widgets from bmconfigparser import config as config_obj from helper_sql import sqlExecute, sqlStoredProcedure from helper_startup import start_proxyconfig -from network import knownnodes +from network import connectionpool, knownnodes from network.announcethread import AnnounceThread from network.asyncore_pollchoose import set_rates from tr import _translate @@ -165,7 +165,7 @@ class SettingsDialog(QtWidgets.QDialog): if self._proxy_type: for node, info in six.iteritems( knownnodes.knownNodes.get( - min(state.streamsInWhichIAmParticipating), []) + min(connectionpool.pool.streams), []) ): if ( node.host.endswith('.onion') and len(node.host) > 22 diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index acbf36ab..06153dcf 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -108,7 +108,7 @@ class singleCleaner(StoppableThread): # Cleanup knownnodes and handle possible severe exception # while writing it to disk if state.enableNetwork: - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) except Exception as err: if "Errno 28" in str(err): self.logger.fatal( diff --git a/src/network/addrthread.py b/src/network/addrthread.py index f0e72f37..5d11dc7c 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -18,7 +18,7 @@ class AddrThread(StoppableThread): name = "AddrBroadcaster" def run(self): - while not state.shutdown: + while not self._stopped: chunk = [] while True: try: diff --git a/src/network/announcethread.py b/src/network/announcethread.py index a5bd37cc..2c8612aa 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -20,7 +20,7 @@ class AnnounceThread(StoppableThread): def run(self): lastSelfAnnounced = 0 - while not self._stopped and state.shutdown == 0: + while not self._stopped: processed = 0 if lastSelfAnnounced < time.time() - self.announceInterval: self.announceSelf() @@ -34,7 +34,7 @@ class AnnounceThread(StoppableThread): for connection in connectionpool.pool.udpSockets.values(): if not connection.announcing: continue - for stream in state.streamsInWhichIAmParticipating: + for stream in connectionpool.pool.streams: addr = ( stream, Peer( diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 685e3c59..a5533fcd 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -6,6 +6,7 @@ import time import protocol import state +import network.connectionpool as connectionpool from highlevelcrypto import calculateInventoryHash logger = logging.getLogger('default') @@ -98,7 +99,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes logger.warning( 'The object has invalid stream: %s', self.streamNumber) raise BMObjectInvalidError() - if self.streamNumber not in state.streamsInWhichIAmParticipating: + if self.streamNumber not in connectionpool.pool.streams: logger.debug( 'The streamNumber %i isn\'t one we are interested in.', self.streamNumber) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index be7ea795..6333471f 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -444,7 +444,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # not using services for seenTime, stream, _, ip, port in self._decode_addr(): if ( - stream not in state.streamsInWhichIAmParticipating + stream not in connectionpool.pool.streams # FIXME: should check against complete list or ip.decode('utf-8', 'replace').startswith('bootstrap') ): diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 3bb33b92..7e9de547 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -88,7 +88,6 @@ class BMConnectionPool(object): def connectToStream(self, streamNumber): """Connect to a bitmessage stream""" self.streams.append(streamNumber) - state.streamsInWhichIAmParticipating.append(streamNumber) def getConnectionByAddr(self, addr): """ diff --git a/src/network/knownnodes.py b/src/network/knownnodes.py index 93ce0b9e..ad806e36 100644 --- a/src/network/knownnodes.py +++ b/src/network/knownnodes.py @@ -232,7 +232,7 @@ def dns(): 1, Peer('bootstrap%s.bitmessage.org' % port, port)) -def cleanupKnownNodes(): +def cleanupKnownNodes(pool): """ Cleanup knownnodes: remove old nodes and nodes with low rating """ @@ -242,7 +242,7 @@ def cleanupKnownNodes(): with knownNodesLock: for stream in knownNodes: - if stream not in state.streamsInWhichIAmParticipating: + if stream not in pool.streams: continue keys = knownNodes[stream].keys() for node in keys: diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 4a49896a..59c6db92 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -14,7 +14,7 @@ class BMNetworkThread(StoppableThread): def run(self): try: - while not self._stopped and state.shutdown == 0: + while not self._stopped: connectionpool.pool.loop() except Exception as e: excQueue.put((self.name, e)) diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 68ad6124..002d78d7 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -19,13 +19,13 @@ class ReceiveQueueThread(StoppableThread): super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) def run(self): - while not self._stopped and state.shutdown == 0: + while not self._stopped: try: dest = receiveDataQueue.get(block=True, timeout=1) except queue.Empty: continue - if self._stopped or state.shutdown: + if self._stopped: break # cycle as long as there is data diff --git a/src/network/udp.py b/src/network/udp.py index e0c15702..befc733e 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -8,6 +8,7 @@ import time # magic imports! import protocol import state +import network.connectionpool as connectionpool from queues import receiveDataQueue from .bmproto import BMProto @@ -81,7 +82,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes remoteport = False for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) - if stream not in state.streamsInWhichIAmParticipating: + if stream not in connectionpool.pool.streams: continue if (seenTime < time.time() - protocol.MAX_TIME_OFFSET or seenTime > time.time() + protocol.MAX_TIME_OFFSET): diff --git a/src/state.py b/src/state.py index e3f49215..e530425d 100644 --- a/src/state.py +++ b/src/state.py @@ -3,7 +3,6 @@ Global runtime variables. """ neededPubkeys = {} -streamsInWhichIAmParticipating = [] extPort = None """For UPnP""" diff --git a/src/tests/core.py b/src/tests/core.py index 806c288e..f1a11a06 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -165,7 +165,7 @@ class TestCore(unittest.TestCase): """test knownnodes starvation leading to IndexError in Asyncore""" self._outdate_knownnodes() # time.sleep(303) # singleCleaner wakes up every 5 min - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) self.assertTrue(knownnodes.knownNodes[1]) while True: try: @@ -179,7 +179,7 @@ class TestCore(unittest.TestCase): config.set('bitmessagesettings', 'dontconnect', 'true') self._wipe_knownnodes() knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True) - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) time.sleep(5) def _check_connection(self, full=False):