From f338c00f8efc8868743882875ba9b9b5d92d90a0 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 6 Aug 2017 21:29:54 +0200 Subject: [PATCH] Change peer discovery tracking from queue to a dict - with a queue, a situation could occur when new entries are appended but nothing is polling the queue --- src/class_singleCleaner.py | 9 ++++++ src/network/connectionchooser.py | 49 ++++++++++++++++++-------------- src/network/udp.py | 4 +-- src/queues.py | 1 - src/state.py | 2 ++ 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index c21d8cb1..5cc35b41 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -38,6 +38,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...) class singleCleaner(threading.Thread, StoppableThread): cycleLength = 300 + expireDiscoveredPeers = 300 def __init__(self): threading.Thread.__init__(self, name="singleCleaner") @@ -126,6 +127,14 @@ class singleCleaner(threading.Thread, StoppableThread): for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): connection.clean() + # discovery tracking + exp = time.time() - singleCleander.expireDiscoveredPeers + reaper = (k for k, v in state.discoveredPeers.items() if v < exp) + for k in reaper: + try: + del state.discoveredPeers[k] + except KeyError: + pass # TODO: cleanup pending upload / download if state.shutdown == 0: diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 2cce5036..b4d7a37f 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -3,9 +3,15 @@ import random from bmconfigparser import BMConfigParser import knownnodes -from queues import portCheckerQueue, peerDiscoveryQueue +from queues import portCheckerQueue import state +def getDiscoveredPeer(stream): + try: + peer = random.choice(state.discoveredPeers.keys()) + except (IndexError, KeyError): + raise ValueError + def chooseConnection(stream): haveOnion = BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' if state.trustedPeer: @@ -13,26 +19,25 @@ def chooseConnection(stream): try: retval = portCheckerQueue.get(False) portCheckerQueue.task_done() + return retval except Queue.Empty: + pass + if random.choice((False, True)): + return getDiscoveredPeer(stream) + for i in range(50): + peer = random.choice(knownnodes.knownNodes[stream].keys()) try: - retval = peerDiscoveryQueue.get(False) - peerDiscoveryQueue.task_done() - except Queue.Empty: - for i in range(50): - peer = random.choice(knownnodes.knownNodes[stream].keys()) - try: - rating = knownnodes.knownNodes[stream][peer]["rating"] - except TypeError: - print "Error in %s" % (peer) - rating = 0 - if haveOnion and peer.host.endswith('.onion') and rating > 0: - rating *= 10 - if rating > 1: - rating = 1 - try: - if 0.05/(1.0-rating) > random.random(): - return peer - except ZeroDivisionError: - return peer - raise ValueError - return retval + rating = knownnodes.knownNodes[stream][peer]["rating"] + except TypeError: + print "Error in %s" % (peer) + rating = 0 + if haveOnion and peer.host.endswith('.onion') and rating > 0: + rating *= 10 + if rating > 1: + rating = 1 + try: + if 0.05/(1.0-rating) > random.random(): + return peer + except ZeroDivisionError: + return peer + raise ValueError diff --git a/src/network/udp.py b/src/network/udp.py index bf52ebd5..910e2430 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -9,7 +9,7 @@ from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInv import network.asyncore_pollchoose as asyncore from network.objectracker import ObjectTracker -from queues import objectProcessorQueue, peerDiscoveryQueue, UISignalQueue, receiveDataQueue +from queues import objectProcessorQueue, UISignalQueue, receiveDataQueue import state import protocol @@ -100,7 +100,7 @@ class UDPSocket(BMProto): return True logger.debug("received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport) if self.local: - peerDiscoveryQueue.put(state.Peer(self.destination.host, remoteport)) + state.discoveredPeers[state.Peer(self.destination.host, remoteport)] = time.time return True def bm_command_portcheck(self): diff --git a/src/queues.py b/src/queues.py index f768c59f..e8923dbd 100644 --- a/src/queues.py +++ b/src/queues.py @@ -11,7 +11,6 @@ objectProcessorQueue = ObjectProcessorQueue() invQueue = MultiQueue() addrQueue = MultiQueue() portCheckerQueue = Queue.Queue() -peerDiscoveryQueue = Queue.Queue() receiveDataQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread. diff --git a/src/state.py b/src/state.py index 08da36eb..c9cb3d1c 100644 --- a/src/state.py +++ b/src/state.py @@ -41,6 +41,8 @@ ownAddresses = {} # security. trustedPeer = None +discoveredPeers = {} + Peer = collections.namedtuple('Peer', ['host', 'port']) def resetNetworkProtocolAvailability():