From 2d34e7364899401bbb63668670ce83bbeee621d4 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 20 Oct 2017 01:21:49 +0200 Subject: [PATCH] Dandelion updates - fixes and feedback from @gfanti and @amiller - addresses #1049 - minor refactoring - two global child stems with fixed mapping between parent and child stem - allow child stems which don't support dandelion - only allow outbound connections to be stems - adjust stems if opening/closing outbound connections (should allow partial dandelion functionality when not enough outbound connections are available instead of breaking) --- src/bitmessagemain.py | 4 +- src/class_singleCleaner.py | 6 +- src/network/bmobject.py | 4 +- src/network/bmproto.py | 12 ++-- src/network/connectionpool.py | 18 +----- src/network/dandelion.py | 102 ++++++++++++++++++++++++++++------ src/network/invthread.py | 30 +++++----- src/network/objectracker.py | 6 +- src/network/tcp.py | 8 ++- 9 files changed, 122 insertions(+), 68 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 06d89ba8..83a41919 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -54,7 +54,7 @@ from bmconfigparser import BMConfigParser from inventory import Inventory from network.connectionpool import BMConnectionPool -from network.dandelion import DandelionStems +from network.dandelion import Dandelion from network.networkthread import BMNetworkThread from network.receivequeuethread import ReceiveQueueThread from network.announcethread import AnnounceThread @@ -251,7 +251,7 @@ class Main: sqlLookup.start() Inventory() # init - DandelionStems() # init, needs to be early because other thread may access it early + Dandelion() # init, needs to be early because other thread may access it early # SMTP delivery thread if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 3068910d..f3125806 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -10,7 +10,7 @@ from helper_sql import * from helper_threading import * from inventory import Inventory from network.connectionpool import BMConnectionPool -from network.dandelion import DandelionStems +from network.dandelion import Dandelion from debug import logger import knownnodes import queues @@ -136,9 +136,7 @@ class singleCleaner(threading.Thread, StoppableThread): for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): connection.clean() # dandelion fluff trigger by expiration - for h, t in DandelionStems().timeouts: - if time.time() > t: - DandelionStems().remove(h) + Dandelion().expire() # discovery tracking exp = time.time() - singleCleaner.expireDiscoveredPeers diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 4cde0c4f..f4c883ca 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -4,7 +4,7 @@ import time from addresses import calculateInventoryHash from debug import logger from inventory import Inventory -from network.dandelion import DandelionStems +from network.dandelion import Dandelion import protocol import state @@ -68,7 +68,7 @@ class BMObject(object): def checkAlreadyHave(self): # if it's a stem duplicate, pretend we don't have it - if self.inventoryHash in DandelionStems().stem: + if self.inventoryHash in Dandelion().hashMap: return if self.inventoryHash in Inventory(): raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 445af9a9..2469d6e4 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -10,7 +10,7 @@ from debug import logger from inventory import Inventory import knownnodes from network.advanceddispatcher import AdvancedDispatcher -from network.dandelion import DandelionStems, REASSIGN_INTERVAL +from network.dandelion import Dandelion from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, \ BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError import network.connectionpool @@ -279,8 +279,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): #TODO make this more asynchronous random.shuffle(items) for i in map(str, items): - if i in DandelionStems().stem and \ - self != DandelionStems().stem[i]: + if i in Dandelion().hashMap and \ + self != Dandelion().hashMap[i]: self.antiIntersectionDelay() logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination) break @@ -325,12 +325,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if BMConfigParser().safeGetBoolean("network", "dandelion") == 0: return True - if self.dandelionRefresh < time.time(): - self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self) - self.dandelionRefresh = time.time() + REASSIGN_INTERVAL - for i in map(str, items): - DandelionStems().add(i, self, self.dandelionRoutes) + Dandelion().addHash(i, self) self.handleReceivedInventory(i) return True diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 1e50eb1c..04336b00 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -9,6 +9,7 @@ from debug import logger import helper_bootstrap from network.proxy import Proxy import network.bmproto +from network.dandelion import Dandelion import network.tcp import network.udp from network.connectionchooser import chooseConnection @@ -51,23 +52,10 @@ class BMConnectionPool(object): except KeyError: pass - def dandelionRouteSelector(self, node): + def reRandomiseDandelionStems(self): # Choose 2 peers randomly # TODO: handle streams - peers = [] - connections = self.outboundConnections.values() - random.shuffle(connections) - for i in connections: - if i == node: - continue - try: - if i.services | protocol.NODE_DANDELION: - peers.append(i) - if len(peers) == 2: - break - except AttributeError: - continue - return peers + Dandelion().reRandomiseStems(self.outboundConnections.values()) def connectToStream(self, streamNumber): self.streams.append(streamNumber) diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 840cc909..a7ef4083 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -1,4 +1,4 @@ -from random import choice +from random import choice, shuffle from threading import RLock from time import time @@ -8,31 +8,101 @@ from singleton import Singleton # randomise routes after 600 seconds REASSIGN_INTERVAL = 600 FLUFF_TRIGGER_TIMEOUT = 300 +MAX_STEMS = 2 @Singleton -class DandelionStems(): +class Dandelion(): def __init__(self): - self.stem = {} - self.source = {} - self.timeouts = {} + self.stem = [] + self.nodeMap = {} + self.hashMap = {} + self.timeout = {} + self.refresh = time() + REASSIGN_INTERVAL self.lock = RLock() - def add(self, hashId, source, stems): + def addHash(self, hashId, source): if BMConfigParser().safeGetInt('network', 'dandelion') == 0: return with self.lock: - try: - self.stem[hashId] = choice(stems) - except IndexError: - self.stem = None - self.source[hashId] = source - self.timeouts[hashId] = time() + self.hashMap[hashId] = self.getNodeStem(source) + self.timeout[hashId] = time() + FLUFF_TRIGGER_TIMEOUT - def remove(self, hashId): + def removeHash(self, hashId): with self.lock: try: - del self.stem[hashId] - del self.source[hashId] - del self.timeouts[hashId] + del self.hashMap[hashId] except KeyError: pass + try: + del self.timeout[hashId] + except KeyError: + pass + + def maybeAddStem(self, connection): + # fewer than MAX_STEMS outbound connections at last reshuffle? + with self.lock: + if len(self.stem) < MAX_STEMS: + self.stem.append(connection) + # active mappings pointing nowhere + for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] is None): + self.nodeMap[k] = connection + for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] is None): + self.hashMap[k] = connection + + def maybeRemoveStem(self, connection): + # is the stem active? + with self.lock: + if connection in self.stem: + self.stem.remove(connection) + # active mappings to pointing to the removed node + for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] == connection): + self.nodeMap[k] = None + for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] == connection): + self.hashMap[k] = None + if len(self.stem) < MAX_STEMS: + self.stem.append(connection) + + def pickStem(self, parent=None): + try: + # pick a random from available stems + stem = choice(range(len(self.stem))) + if self.stem[stem] == parent: + # one stem available and it's the parent + if len(self.stem) == 1: + return None + # else, pick the other one + return self.stem[1 - stem] + # all ok + return self.stem[stem] + except IndexError: + # no stems available + return None + + def getNodeStem(self, node=None): + with self.lock: + try: + return self.nodeMap[node] + except KeyError: + self.nodeMap[node] = self.pickStem() + return self.nodeMap[node] + + def getHashStem(self, hashId): + with self.lock: + return self.hashMap[hashId] + + def expire(self): + with self.lock: + deadline = time() + toDelete = [k for k, v in self.hashMap.iteritems() if self.timeout[k] < deadline] + for k in toDelete: + del self.timeout[k] + del self.hashMap[k] + + def reRandomiseStems(self, connections): + shuffle(connections) + with self.lock: + # random two connections + self.stem = connections[:MAX_STEMS] + self.nodeMap = {} + # hashMap stays to cater for pending stems + self.refresh = time() + REASSIGN_INTERVAL diff --git a/src/network/invthread.py b/src/network/invthread.py index 4f26c0fa..5852df0b 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -7,7 +7,7 @@ import addresses from bmconfigparser import BMConfigParser from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool -from network.dandelion import DandelionStems, REASSIGN_INTERVAL +from network.dandelion import Dandelion from queues import invQueue import protocol import state @@ -17,26 +17,17 @@ class InvThread(threading.Thread, StoppableThread): threading.Thread.__init__(self, name="InvBroadcaster") self.initStop() self.name = "InvBroadcaster" - # for locally generated objects - self.dandelionRoutes = [] - self.dandelionRefresh = 0 - - def dandelionLocalRouteRefresh(self): - if self.dandelionRefresh < time(): - self.dandelionRoutes = BMConnectionPool().dandelionRouteSelector(None) - self.dandelionRefresh = time() + REASSIGN_INTERVAL def run(self): while not state.shutdown: chunk = [] while True: - self.dandelionLocalRouteRefresh() try: data = invQueue.get(False) chunk.append((data[0], data[1])) # locally generated if len(data) == 2: - DandelionStems().add(data[1], None, self.dandelionRoutes) + Dandelion().addHash(data[1], None) BMConnectionPool().handleReceivedObject(data[0], data[1]) # came over the network else: @@ -61,17 +52,19 @@ class InvThread(threading.Thread, StoppableThread): del connection.objectsNewToThem[inv[1]] except KeyError: continue - if inv[1] in DandelionStems().stem: - if connection == DandelionStems().stem[inv[1]]: + try: + if connection == Dandelion().hashMap[inv[1]]: # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off - if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion"): + # send a normal inv if stem node doesn't support dandelion + if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion") and \ + connection.services | protocol.NODE_DANDELION > 0: stems.append(inv[1]) else: fluffs.append(inv[1]) - continue - else: + except KeyError: fluffs.append(inv[1]) + if fluffs: shuffle(fluffs) connection.append_write_buf(protocol.CreatePacket('inv', \ @@ -80,7 +73,12 @@ class InvThread(threading.Thread, StoppableThread): shuffle(stems) connection.append_write_buf(protocol.CreatePacket('dinv', \ addresses.encodeVarint(len(stems)) + "".join(stems))) + invQueue.iterate() for i in range(len(chunk)): invQueue.task_done() + + if Dandelion().refresh < time(): + BMConnectionPool().reRandomiseDandelionStems() + self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 7149f4b1..62016d75 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,7 +4,7 @@ from threading import RLock from debug import logger from inventory import Inventory -from network.dandelion import DandelionStems +from network.dandelion import Dandelion haveBloom = False @@ -84,9 +84,9 @@ class ObjectTracker(object): if hashId not in Inventory(): with self.objectsNewToMeLock: self.objectsNewToMe[hashId] = True - elif hashId in DandelionStems().stem: + elif hashId in Dandelion().hashMap: # Fluff trigger by cycle detection - DandelionStems().remove(hashId) + Dandelion().removeHash(hashId) with self.objectsNewToMeLock: self.objectsNewToMe[hashId] = True diff --git a/src/network/tcp.py b/src/network/tcp.py index ab282fb4..70e22e08 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -18,7 +18,7 @@ from network.advanceddispatcher import AdvancedDispatcher from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError import network.connectionpool -from network.dandelion import DandelionStems +from network.dandelion import Dandelion from network.node import Node import network.asyncore_pollchoose as asyncore from network.proxy import Proxy, ProxyError, GeneralProxyError @@ -106,6 +106,8 @@ class TCPConnection(BMProto, TLSDispatcher): self.fullyEstablished = True if self.isOutbound: knownnodes.increaseRating(self.destination) + if self.isOutbound: + Dandelion().maybeAddStem(self) self.sendAddr() self.sendBigInv() @@ -166,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher): with self.objectsNewToThemLock: for objHash in Inventory().unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv - if objHash in DandelionStems().stem: + if objHash in Dandelion().hashMap: continue bigInvList[objHash] = 0 self.objectsNewToThem[objHash] = time.time() @@ -218,6 +220,8 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.decreaseRating(self.destination) if self.fullyEstablished: UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, False, self.destination))) + if self.isOutbound: + Dandelion().maybeRemoveStem(self) BMProto.handle_close(self)