From d574b167d8be0d463ab30918591f8a4b67e60920 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 25 Sep 2017 08:49:21 +0200 Subject: [PATCH] Dandelion updates & fixes - Addresses #1049 - Add dandelion routes for locally generated objects - Minor bugfixes - Send dinv commands on stem objects (instead of always sending inv command) --- src/network/bmproto.py | 2 +- src/network/connectionpool.py | 6 +++--- src/network/invthread.py | 37 +++++++++++++++++++++++++++++------ 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index d5214471..66f066ef 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -320,7 +320,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True if self.dandelionRefresh < time.time(): - self.dandelionRoutes = network.connectionpool.dandelionRouteSelector(self) + self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self) self.dandelionRefresh = time.time() + REASSIGN_INTERVAL for i in items: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index cde5c9eb..fae509c7 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -51,12 +51,12 @@ class BMConnectionPool(object): except KeyError: pass - def dandelionRouteSelector(node): + def dandelionRouteSelector(self, node): # Choose 2 peers randomly # TODO: handle streams peers = [] - connections = BMConnectionPool().inboundConnections.values() + \ - BMConnectionPool().outboundConnections.values() + connections = self.inboundConnections.values() + \ + self.outboundConnections.values() random.shuffle(connections) for i in connections: if i == node: diff --git a/src/network/invthread.py b/src/network/invthread.py index a868ce95..992930db 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,10 +1,13 @@ import Queue +from random import randint import threading +from time import time import addresses +from bmconfigparser import BMConfigParser from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool -from network.dandelion import DandelionStems +from network.dandelion import DandelionStems, REASSIGN_INTERVAL from queues import invQueue import protocol import state @@ -14,15 +17,30 @@ class InvThread(threading.Thread, StoppableThread): threading.Thread.__init__(self, name="InvBroadcaster") self.initStop() self.name = "InvBroadcaster" + # for locally generated objects + self.dandelionRoutes = [] + self.dandelionRefresh = 0 + + def dandelionLocalRouteRefresh(self): + if self.dandelionRefresh < time(): + self.dandelionRoutes = BMConnectionPool().dandelionRouteSelector(None) + self.dandelionRefresh = time() + REASSIGN_INTERVAL def run(self): while not state.shutdown: chunk = [] while True: + self.dandelionLocalRouteRefresh() try: data = invQueue.get(False) + # locally generated if len(data) == 2: BMConnectionPool().handleReceivedObject(data[0], data[1]) + # Fluff trigger by RNG + # auto-ignore if config set to 0, i.e. dandelion is off + if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion"): + DandelionStems.add(data[1], self.dandelionRoutes) + # came over the network else: source = BMConnectionPool().getConnectionByAddr(data[2]) BMConnectionPool().handleReceivedObject(data[0], data[1], source) @@ -36,20 +54,27 @@ class InvThread(threading.Thread, StoppableThread): if chunk: for connection in BMConnectionPool().inboundConnections.values() + \ BMConnectionPool().outboundConnections.values(): - hashes = [] + fluffs = [] + stems = [] for inv in chunk: if inv[0] not in connection.streams: continue - if inv in DandelionStems().stem and connection not in DandelionStems().stem[inv]: + if inv[1] in DandelionStems().stem: + if connection in DandelionStems().stem[inv[1]]: + stems.append(inv[1]) continue + # else try: with connection.objectsNewToThemLock: del connection.objectsNewToThem[inv[1]] - hashes.append(inv[1]) + fluffs.append(inv[1]) except KeyError: continue - if hashes: + if fluffs: connection.append_write_buf(protocol.CreatePacket('inv', \ - addresses.encodeVarint(len(hashes)) + "".join(hashes))) + addresses.encodeVarint(len(fluffs)) + "".join(fluffs))) + if stems: + connection.append_write_buf(protocol.CreatePacket('dinv', \ + addresses.encodeVarint(len(stems)) + "".join(stems))) invQueue.iterate() self.stop.wait(1)