Change peer discovery tracking from queue to a dict
- with a queue, a situation could occur when new entries are appended but nothing is polling the queue
This commit is contained in:
parent
4564d37f5b
commit
f338c00f8e
|
@ -38,6 +38,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
|
||||||
|
|
||||||
class singleCleaner(threading.Thread, StoppableThread):
|
class singleCleaner(threading.Thread, StoppableThread):
|
||||||
cycleLength = 300
|
cycleLength = 300
|
||||||
|
expireDiscoveredPeers = 300
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="singleCleaner")
|
threading.Thread.__init__(self, name="singleCleaner")
|
||||||
|
@ -126,6 +127,14 @@ class singleCleaner(threading.Thread, StoppableThread):
|
||||||
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
||||||
connection.clean()
|
connection.clean()
|
||||||
|
|
||||||
|
# discovery tracking
|
||||||
|
exp = time.time() - singleCleander.expireDiscoveredPeers
|
||||||
|
reaper = (k for k, v in state.discoveredPeers.items() if v < exp)
|
||||||
|
for k in reaper:
|
||||||
|
try:
|
||||||
|
del state.discoveredPeers[k]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
# TODO: cleanup pending upload / download
|
# TODO: cleanup pending upload / download
|
||||||
|
|
||||||
if state.shutdown == 0:
|
if state.shutdown == 0:
|
||||||
|
|
|
@ -3,9 +3,15 @@ import random
|
||||||
|
|
||||||
from bmconfigparser import BMConfigParser
|
from bmconfigparser import BMConfigParser
|
||||||
import knownnodes
|
import knownnodes
|
||||||
from queues import portCheckerQueue, peerDiscoveryQueue
|
from queues import portCheckerQueue
|
||||||
import state
|
import state
|
||||||
|
|
||||||
|
def getDiscoveredPeer(stream):
|
||||||
|
try:
|
||||||
|
peer = random.choice(state.discoveredPeers.keys())
|
||||||
|
except (IndexError, KeyError):
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
def chooseConnection(stream):
|
def chooseConnection(stream):
|
||||||
haveOnion = BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
|
haveOnion = BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
|
||||||
if state.trustedPeer:
|
if state.trustedPeer:
|
||||||
|
@ -13,11 +19,11 @@ def chooseConnection(stream):
|
||||||
try:
|
try:
|
||||||
retval = portCheckerQueue.get(False)
|
retval = portCheckerQueue.get(False)
|
||||||
portCheckerQueue.task_done()
|
portCheckerQueue.task_done()
|
||||||
|
return retval
|
||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
try:
|
pass
|
||||||
retval = peerDiscoveryQueue.get(False)
|
if random.choice((False, True)):
|
||||||
peerDiscoveryQueue.task_done()
|
return getDiscoveredPeer(stream)
|
||||||
except Queue.Empty:
|
|
||||||
for i in range(50):
|
for i in range(50):
|
||||||
peer = random.choice(knownnodes.knownNodes[stream].keys())
|
peer = random.choice(knownnodes.knownNodes[stream].keys())
|
||||||
try:
|
try:
|
||||||
|
@ -35,4 +41,3 @@ def chooseConnection(stream):
|
||||||
except ZeroDivisionError:
|
except ZeroDivisionError:
|
||||||
return peer
|
return peer
|
||||||
raise ValueError
|
raise ValueError
|
||||||
return retval
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInv
|
||||||
import network.asyncore_pollchoose as asyncore
|
import network.asyncore_pollchoose as asyncore
|
||||||
from network.objectracker import ObjectTracker
|
from network.objectracker import ObjectTracker
|
||||||
|
|
||||||
from queues import objectProcessorQueue, peerDiscoveryQueue, UISignalQueue, receiveDataQueue
|
from queues import objectProcessorQueue, UISignalQueue, receiveDataQueue
|
||||||
import state
|
import state
|
||||||
import protocol
|
import protocol
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ class UDPSocket(BMProto):
|
||||||
return True
|
return True
|
||||||
logger.debug("received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport)
|
logger.debug("received peer discovery from %s:%i (port %i):", self.destination.host, self.destination.port, remoteport)
|
||||||
if self.local:
|
if self.local:
|
||||||
peerDiscoveryQueue.put(state.Peer(self.destination.host, remoteport))
|
state.discoveredPeers[state.Peer(self.destination.host, remoteport)] = time.time
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_portcheck(self):
|
def bm_command_portcheck(self):
|
||||||
|
|
|
@ -11,7 +11,6 @@ objectProcessorQueue = ObjectProcessorQueue()
|
||||||
invQueue = MultiQueue()
|
invQueue = MultiQueue()
|
||||||
addrQueue = MultiQueue()
|
addrQueue = MultiQueue()
|
||||||
portCheckerQueue = Queue.Queue()
|
portCheckerQueue = Queue.Queue()
|
||||||
peerDiscoveryQueue = Queue.Queue()
|
|
||||||
receiveDataQueue = Queue.Queue()
|
receiveDataQueue = Queue.Queue()
|
||||||
apiAddressGeneratorReturnQueue = Queue.Queue(
|
apiAddressGeneratorReturnQueue = Queue.Queue(
|
||||||
) # The address generator thread uses this queue to get information back to the API thread.
|
) # The address generator thread uses this queue to get information back to the API thread.
|
||||||
|
|
|
@ -41,6 +41,8 @@ ownAddresses = {}
|
||||||
# security.
|
# security.
|
||||||
trustedPeer = None
|
trustedPeer = None
|
||||||
|
|
||||||
|
discoveredPeers = {}
|
||||||
|
|
||||||
Peer = collections.namedtuple('Peer', ['host', 'port'])
|
Peer = collections.namedtuple('Peer', ['host', 'port'])
|
||||||
|
|
||||||
def resetNetworkProtocolAvailability():
|
def resetNetworkProtocolAvailability():
|
||||||
|
|
Loading…
Reference in New Issue
Block a user