From 657c1de16b3481506e609f991a54f104f4fef831 Mon Sep 17 00:00:00 2001 From: anand k Date: Mon, 13 May 2024 07:23:11 +0530 Subject: [PATCH 1/2] Remove state.shutdown or replaced with self._stopped from some network thread --- src/network/addrthread.py | 3 +-- src/network/announcethread.py | 2 +- src/network/networkthread.py | 3 +-- src/network/receivequeuethread.py | 5 ++--- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 74a5d744..a0e869e3 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -4,7 +4,6 @@ Announce addresses as they are received from other hosts from six.moves import queue # magic imports! -import state import connectionpool from helper_random import randomshuffle from protocol import assembleAddrMessage @@ -18,7 +17,7 @@ class AddrThread(StoppableThread): name = "AddrBroadcaster" def run(self): - while not state.shutdown: + while not self._stopped: chunk = [] while True: try: diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 003eb092..cd3eed2a 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -20,7 +20,7 @@ class AnnounceThread(StoppableThread): def run(self): lastSelfAnnounced = 0 - while not self._stopped and state.shutdown == 0: + while not self._stopped: processed = 0 if lastSelfAnnounced < time.time() - self.announceInterval: self.announceSelf() diff --git a/src/network/networkthread.py b/src/network/networkthread.py index dc5f616f..640d47a1 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -2,7 +2,6 @@ A thread to handle network concerns """ import network.asyncore_pollchoose as asyncore -import state import connectionpool from queues import excQueue from threads import StoppableThread @@ -14,7 +13,7 @@ class BMNetworkThread(StoppableThread): def run(self): try: - while not self._stopped and state.shutdown == 0: + while not self._stopped: connectionpool.pool.loop() except Exception as e: excQueue.put((self.name, e)) diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index cad1376c..10f2acea 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -5,7 +5,6 @@ import errno import Queue import socket -import state import connectionpool from network.advanceddispatcher import UnknownStateError from queues import receiveDataQueue @@ -19,13 +18,13 @@ class ReceiveQueueThread(StoppableThread): super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) def run(self): - while not self._stopped and state.shutdown == 0: + while not self._stopped: try: dest = receiveDataQueue.get(block=True, timeout=1) except Queue.Empty: continue - if self._stopped or state.shutdown: + if self._stopped: break # cycle as long as there is data From e571ba8a511e99a6370df781bd23fcca5b1a50a0 Mon Sep 17 00:00:00 2001 From: anand k Date: Mon, 13 May 2024 15:46:46 +0530 Subject: [PATCH 2/2] Replaced state.streamsInWhichIAmParticipating with pool.streams --- src/bitmessagemain.py | 3 +-- src/bitmessageqt/settings.py | 4 ++-- src/class_singleCleaner.py | 2 +- src/network/announcethread.py | 3 +-- src/network/bmobject.py | 3 ++- src/network/bmproto.py | 2 +- src/network/connectionpool.py | 1 - src/network/knownnodes.py | 4 ++-- src/network/udp.py | 3 ++- src/state.py | 1 - src/tests/core.py | 4 ++-- 11 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index f51ee063..9acd1278 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -237,8 +237,7 @@ class Main(object): upnpThread = upnp.uPnPThread() upnpThread.start() else: - # Populate with hardcoded value (same as connectToStream above) - state.streamsInWhichIAmParticipating.append(1) + network.connectionpool.pool.connectToStream(1) if not daemon and state.enableGUI: if state.curses: diff --git a/src/bitmessageqt/settings.py b/src/bitmessageqt/settings.py index 2d56c47f..3d05db25 100644 --- a/src/bitmessageqt/settings.py +++ b/src/bitmessageqt/settings.py @@ -20,7 +20,7 @@ import widgets from bmconfigparser import config as config_obj from helper_sql import sqlExecute, sqlStoredProcedure from helper_startup import start_proxyconfig -from network import knownnodes +from network import connectionpool, knownnodes from network.announcethread import AnnounceThread from network.asyncore_pollchoose import set_rates from tr import _translate @@ -165,7 +165,7 @@ class SettingsDialog(QtGui.QDialog): if self._proxy_type: for node, info in six.iteritems( knownnodes.knownNodes.get( - min(state.streamsInWhichIAmParticipating), []) + min(connectionpool.pool.streams), []) ): if ( node.host.endswith('.onion') and len(node.host) > 22 diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index acbf36ab..06153dcf 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -108,7 +108,7 @@ class singleCleaner(StoppableThread): # Cleanup knownnodes and handle possible severe exception # while writing it to disk if state.enableNetwork: - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) except Exception as err: if "Errno 28" in str(err): self.logger.fatal( diff --git a/src/network/announcethread.py b/src/network/announcethread.py index cd3eed2a..7cb35e77 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -4,7 +4,6 @@ Announce myself (node address) import time # magic imports! -import state import connectionpool from bmconfigparser import config from protocol import assembleAddrMessage @@ -34,7 +33,7 @@ class AnnounceThread(StoppableThread): for connection in connectionpool.pool.udpSockets.values(): if not connection.announcing: continue - for stream in state.streamsInWhichIAmParticipating: + for stream in connectionpool.pool.streams: addr = ( stream, Peer( diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 685e3c59..c91bf1b3 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -6,6 +6,7 @@ import time import protocol import state +import connectionpool from highlevelcrypto import calculateInventoryHash logger = logging.getLogger('default') @@ -98,7 +99,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes logger.warning( 'The object has invalid stream: %s', self.streamNumber) raise BMObjectInvalidError() - if self.streamNumber not in state.streamsInWhichIAmParticipating: + if self.streamNumber not in connectionpool.pool.streams: logger.debug( 'The streamNumber %i isn\'t one we are interested in.', self.streamNumber) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 41e163df..ed1d48c4 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -445,7 +445,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): for seenTime, stream, _, ip, port in self._decode_addr(): ip = str(ip) if ( - stream not in state.streamsInWhichIAmParticipating + stream not in connectionpool.pool.streams # FIXME: should check against complete list or ip.startswith('bootstrap') ): diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index b756f8a4..36c91c18 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -88,7 +88,6 @@ class BMConnectionPool(object): def connectToStream(self, streamNumber): """Connect to a bitmessage stream""" self.streams.append(streamNumber) - state.streamsInWhichIAmParticipating.append(streamNumber) def getConnectionByAddr(self, addr): """ diff --git a/src/network/knownnodes.py b/src/network/knownnodes.py index b74c9a15..c53be2cd 100644 --- a/src/network/knownnodes.py +++ b/src/network/knownnodes.py @@ -226,7 +226,7 @@ def dns(): 1, Peer('bootstrap%s.bitmessage.org' % port, port)) -def cleanupKnownNodes(): +def cleanupKnownNodes(pool): """ Cleanup knownnodes: remove old nodes and nodes with low rating """ @@ -236,7 +236,7 @@ def cleanupKnownNodes(): with knownNodesLock: for stream in knownNodes: - if stream not in state.streamsInWhichIAmParticipating: + if stream not in pool.streams: continue keys = knownNodes[stream].keys() for node in keys: diff --git a/src/network/udp.py b/src/network/udp.py index 1a9891ec..b16146f9 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -8,6 +8,7 @@ import time # magic imports! import protocol import state +import connectionpool from queues import receiveDataQueue from bmproto import BMProto @@ -81,7 +82,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes remoteport = False for seenTime, stream, _, ip, port in addresses: decodedIP = protocol.checkIPAddress(str(ip)) - if stream not in state.streamsInWhichIAmParticipating: + if stream not in connectionpool.pool.streams: continue if (seenTime < time.time() - protocol.MAX_TIME_OFFSET or seenTime > time.time() + protocol.MAX_TIME_OFFSET): diff --git a/src/state.py b/src/state.py index e3f49215..e530425d 100644 --- a/src/state.py +++ b/src/state.py @@ -3,7 +3,6 @@ Global runtime variables. """ neededPubkeys = {} -streamsInWhichIAmParticipating = [] extPort = None """For UPnP""" diff --git a/src/tests/core.py b/src/tests/core.py index 806c288e..f1a11a06 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -165,7 +165,7 @@ class TestCore(unittest.TestCase): """test knownnodes starvation leading to IndexError in Asyncore""" self._outdate_knownnodes() # time.sleep(303) # singleCleaner wakes up every 5 min - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) self.assertTrue(knownnodes.knownNodes[1]) while True: try: @@ -179,7 +179,7 @@ class TestCore(unittest.TestCase): config.set('bitmessagesettings', 'dontconnect', 'true') self._wipe_knownnodes() knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True) - knownnodes.cleanupKnownNodes() + knownnodes.cleanupKnownNodes(connectionpool.pool) time.sleep(5) def _check_connection(self, full=False):