From fd1a6c1fa14ab719f43d97ad52f464826fb32b4c Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 3 Feb 2018 11:46:39 +0100 Subject: [PATCH] Dandelion update - dandelion fixes - try to wait as long as possible before expiration if there are no outbound connections - expire in invThread rather than singleCleaner thread - deduplication of code in inv and dinv command methods - turn on by default, seems to work correctly now - turn off dandelion if outbound connections are disabled - start tracking downloads earlier, and faster download loop - remove some obsolete lines - minor PEP8 updates --- src/bitmessagemain.py | 5 ++ src/bmconfigparser.py | 2 +- src/class_singleCleaner.py | 3 -- src/network/bmobject.py | 2 +- src/network/bmproto.py | 43 +++++++-------- src/network/connectionpool.py | 51 +++++------------- src/network/dandelion.py | 98 ++++++++++++++++++++--------------- src/network/downloadthread.py | 5 +- src/network/invthread.py | 33 ++++++------ src/network/objectracker.py | 37 +++++++++---- src/network/tcp.py | 2 +- src/protocol.py | 4 +- src/state.py | 2 + 13 files changed, 149 insertions(+), 138 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 74011f29..accd5740 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -232,6 +232,11 @@ class Main: helper_threading.set_thread_name("PyBitmessage") + state.dandelion = BMConfigParser().safeGetInt('network', 'dandelion') + # dandelion requires outbound connections, without them, stem objects will get stuck forever + if state.dandelion and not BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): + state.dandelion = 0 + helper_bootstrap.knownNodes() # Start the address generation thread addressGeneratorThread = addressGenerator() diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index bb4377a2..6a598955 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -20,7 +20,7 @@ BMConfigDefaults = { }, "network": { "bind": '', - "dandelion": 0, + "dandelion": 90, }, "inventory": { "storage": "sqlite", diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 1def6cdb..ca77881c 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -10,7 +10,6 @@ from helper_sql import * from helper_threading import * from inventory import Inventory from network.connectionpool import BMConnectionPool -from network.dandelion import Dandelion from debug import logger import knownnodes import queues @@ -133,8 +132,6 @@ class singleCleaner(threading.Thread, StoppableThread): # inv/object tracking for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): connection.clean() - # dandelion fluff trigger by expiration - Dandelion().expire() # discovery tracking exp = time.time() - singleCleaner.expireDiscoveredPeers diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 267cac58..2e7dd092 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -74,7 +74,7 @@ class BMObject(object): def checkAlreadyHave(self): # if it's a stem duplicate, pretend we don't have it - if self.inventoryHash in Dandelion().hashMap: + if Dandelion().hasHash(self.inventoryHash): return if self.inventoryHash in Inventory(): raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 4bf63bca..28277f52 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -66,8 +66,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.payloadOffset = 0 self.expectBytes = protocol.Header.size self.object = None - self.dandelionRoutes = [] - self.dandelionRefresh = 0 def state_bm_header(self): self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) @@ -282,8 +280,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): #TODO make this more asynchronous random.shuffle(items) for i in map(str, items): - if i in Dandelion().hashMap and \ - self != Dandelion().hashMap[i]: + if Dandelion().hasHash(i) and \ + self != Dandelion().objectChildStem(i): self.antiIntersectionDelay() logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination) break @@ -298,41 +296,36 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # when using random reordering, as the recipient won't know exactly which objects we refuse to deliver return True - def bm_command_inv(self): + def _command_inv(self, dandelion=False): items = self.decode_payload_content("l32s") if len(items) >= BMProto.maxObjectCount: - logger.error("Too many items in inv message!") + logger.error("Too many items in %sinv message!", "d" if dandelion else "") raise BMProtoExcessiveDataError() else: pass + # ignore dinv if dandelion turned off + if dandelion and not state.dandelion: + return True + for i in map(str, items): + if i in Inventory() and not Dandelion().hasHash(i): + continue + if dandelion and not Dandelion().hasHash(i): + Dandelion().addHash(i, self) self.handleReceivedInventory(i) return True + def bm_command_inv(self): + return self._command_inv(False) + def bm_command_dinv(self): """ Dandelion stem announce """ - items = self.decode_payload_content("l32s") - - if len(items) >= BMProto.maxObjectCount: - logger.error("Too many items in dinv message!") - raise BMProtoExcessiveDataError() - else: - pass - - # ignore command if dandelion turned off - if BMConfigParser().safeGetBoolean("network", "dandelion") == 0: - return True - - for i in map(str, items): - self.handleReceivedInventory(i) - Dandelion().addHash(i, self) - - return True + return self._command_inv(True) def bm_command_object(self): objectOffset = self.payloadOffset @@ -368,8 +361,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass + if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash): + Dandelion().removeHash(self.object.inventoryHash, "cycle detection") + Inventory()[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag)) + self.handleReceivedObject(self.object.streamNumber, self.object.inventoryHash) invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination)) return True diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 2f34e485..2c3b5054 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -8,10 +8,10 @@ from bmconfigparser import BMConfigParser from debug import logger import helper_bootstrap from network.proxy import Proxy -import network.bmproto +from network.bmproto import BMProto from network.dandelion import Dandelion -import network.tcp -import network.udp +from network.tcp import TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection +from network.udp import UDPSocket from network.connectionchooser import chooseConnection import network.asyncore_pollchoose as asyncore import protocol @@ -33,31 +33,6 @@ class BMConnectionPool(object): self.spawnWait = 2 self.bootstrapped = False - def handleReceivedObject(self, streamNumber, hashid, connection = None): - for i in self.inboundConnections.values() + self.outboundConnections.values(): - if not isinstance(i, network.bmproto.BMProto): - continue - if not i.fullyEstablished: - continue - try: - del i.objectsNewToMe[hashid] - except KeyError: - with i.objectsNewToThemLock: - i.objectsNewToThem[hashid] = time.time() - if i == connection: - try: - with i.objectsNewToThemLock: - del i.objectsNewToThem[hashid] - except KeyError: - pass - if hashid in Dandelion().fluff: - Dandelion().removeHash(hashid) - - def reRandomiseDandelionStems(self): - # Choose 2 peers randomly - # TODO: handle streams - Dandelion().reRandomiseStems(self.outboundConnections.values()) - def connectToStream(self, streamNumber): self.streams.append(streamNumber) @@ -88,7 +63,7 @@ class BMConnectionPool(object): return False def addConnection(self, connection): - if isinstance(connection, network.udp.UDPSocket): + if isinstance(connection, UDPSocket): return if connection.isOutbound: self.outboundConnections[connection.destination] = connection @@ -99,9 +74,9 @@ class BMConnectionPool(object): self.inboundConnections[connection.destination.host] = connection def removeConnection(self, connection): - if isinstance(connection, network.udp.UDPSocket): + if isinstance(connection, UDPSocket): del self.udpSockets[connection.listening.host] - elif isinstance(connection, network.tcp.TCPServer): + elif isinstance(connection, TCPServer): del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)] elif connection.isOutbound: try: @@ -135,18 +110,18 @@ class BMConnectionPool(object): bind = self.getListeningIP() port = BMConfigParser().safeGetInt("bitmessagesettings", "port") # correct port even if it changed - ls = network.tcp.TCPServer(host=bind, port=port) + ls = TCPServer(host=bind, port=port) self.listeningSockets[ls.destination] = ls def startUDPSocket(self, bind=None): if bind is None: host = self.getListeningIP() - udpSocket = network.udp.UDPSocket(host=host, announcing=True) + udpSocket = UDPSocket(host=host, announcing=True) else: if bind is False: - udpSocket = network.udp.UDPSocket(announcing=False) + udpSocket = UDPSocket(announcing=False) else: - udpSocket = network.udp.UDPSocket(host=bind, announcing=True) + udpSocket = UDPSocket(host=bind, announcing=True) self.udpSockets[udpSocket.listening.host] = udpSocket def loop(self): @@ -192,11 +167,11 @@ class BMConnectionPool(object): # continue try: if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"): - self.addConnection(network.tcp.Socks5BMConnection(chosen)) + self.addConnection(Socks5BMConnection(chosen)) elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"): - self.addConnection(network.tcp.Socks4aBMConnection(chosen)) + self.addConnection(Socks4aBMConnection(chosen)) elif not chosen.host.endswith(".onion"): - self.addConnection(network.tcp.TCPConnection(chosen)) + self.addConnection(TCPConnection(chosen)) except socket.error as e: if e.errno == errno.ENETUNREACH: continue diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 3e49d906..edd4fb5b 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -1,62 +1,78 @@ -from random import choice, shuffle +from collections import namedtuple +from random import choice, sample from threading import RLock from time import time from bmconfigparser import BMConfigParser +import network.connectionpool +from debug import logging +from queues import invQueue from singleton import Singleton +import state # randomise routes after 600 seconds REASSIGN_INTERVAL = 600 -FLUFF_TRIGGER_TIMEOUT = 300 +# trigger fluff due to expiration in 2 minutes +FLUFF_TRIGGER_TIMEOUT = 120 MAX_STEMS = 2 +Stem = namedtuple('Stem', ['child', 'stream', 'timeout']) + @Singleton class Dandelion(): def __init__(self): + # currently assignable child stems self.stem = [] + # currently assigned parent <-> child mappings self.nodeMap = {} + # currently existing objects in stem mode self.hashMap = {} - self.fluff = {} - self.timeout = {} + # when to rerandomise routes self.refresh = time() + REASSIGN_INTERVAL self.lock = RLock() - def addHash(self, hashId, source): - if BMConfigParser().safeGetInt('network', 'dandelion') == 0: + def addHash(self, hashId, source=None, stream=1): + if not state.dandelion: return with self.lock: - self.hashMap[hashId] = self.getNodeStem(source) - self.timeout[hashId] = time() + FLUFF_TRIGGER_TIMEOUT + self.hashMap[hashId] = Stem( + self.getNodeStem(source), + stream, + time() + FLUFF_TRIGGER_TIMEOUT) - def removeHash(self, hashId): + def setHashStream(self, hashId, stream=1): + with self.lock: + if hashId in self.hashMap: + self.hashMap[hashId] = Stem( + self.hashMap[hashId].child, + stream, + time() + FLUFF_TRIGGER_TIMEOUT) + + def removeHash(self, hashId, reason="no reason specified"): + logging.debug("%s entering fluff mode due to %s.", ''.join('%02x'%ord(i) for i in hashId), reason) with self.lock: try: del self.hashMap[hashId] except KeyError: pass - try: - del self.timeout[hashId] - except KeyError: - pass - try: - del self.fluff[hashId] - except KeyError: - pass - def fluffTrigger(self, hashId): - with self.lock: - self.fluff[hashId] = None + def hasHash(self, hashId): + return hashId in self.hashMap + + def objectChildStem(self, hashId): + return self.hashMap[hashId].child def maybeAddStem(self, connection): # fewer than MAX_STEMS outbound connections at last reshuffle? with self.lock: if len(self.stem) < MAX_STEMS: self.stem.append(connection) - # active mappings pointing nowhere - for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] is None): + for k in (k for k, v in self.nodeMap.iteritems() if v is None): self.nodeMap[k] = connection - for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] is None): - self.hashMap[k] = connection + for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child is None}.iteritems(): + self.hashMap[k] = Stem(connection, v.stream, time() + FLUFF_TRIGGER_TIMEOUT) + invQueue.put((v.stream, k, v.child)) + def maybeRemoveStem(self, connection): # is the stem active? @@ -64,12 +80,10 @@ class Dandelion(): if connection in self.stem: self.stem.remove(connection) # active mappings to pointing to the removed node - for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] == connection): + for k in (k for k, v in self.nodeMap.iteritems() if v == connection): self.nodeMap[k] = None - for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] == connection): - self.hashMap[k] = None - if len(self.stem) < MAX_STEMS: - self.stem.append(connection) + for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child == connection}.iteritems(): + self.hashMap[k] = Stem(None, v.stream, time() + FLUFF_TRIGGER_TIMEOUT) def pickStem(self, parent=None): try: @@ -92,26 +106,26 @@ class Dandelion(): try: return self.nodeMap[node] except KeyError: - self.nodeMap[node] = self.pickStem() + self.nodeMap[node] = self.pickStem(node) return self.nodeMap[node] - def getHashStem(self, hashId): - with self.lock: - return self.hashMap[hashId] - def expire(self): with self.lock: deadline = time() - toDelete = [k for k, v in self.hashMap.iteritems() if self.timeout[k] < deadline] - for k in toDelete: - del self.timeout[k] - del self.hashMap[k] + # only expire those that have a child node, i.e. those without a child not will stick around + toDelete = [[v.stream, k, v.child] for k, v in self.hashMap.iteritems() if v.timeout < deadline and v.child] + for row in toDelete: + self.removeHash(row[1], 'expiration') + invQueue.put((row[0], row[1], row[2])) - def reRandomiseStems(self, connections): - shuffle(connections) + def reRandomiseStems(self): with self.lock: - # random two connections - self.stem = connections[:MAX_STEMS] + try: + # random two connections + self.stem = sample(network.connectionpool.BMConnectionPool().outboundConnections.values(), MAX_STEMS) + # not enough stems available + except ValueError: + self.stem = network.connectionpool.BMConnectionPool().outboundConnections.values() self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 88f7c12e..7eee2761 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -3,6 +3,7 @@ import threading import time import addresses +from dandelion import Dandelion from debug import logger from helper_threading import StoppableThread from inventory import Inventory @@ -54,7 +55,7 @@ class DownloadThread(threading.Thread, StoppableThread): payload = bytearray() payload.extend(addresses.encodeVarint(len(request))) for chunk in request: - if chunk in Inventory(): + if chunk in Inventory() and not Dandelion().hasHash(chunk): try: del i.objectsNewToMe[chunk] except KeyError: @@ -70,4 +71,4 @@ class DownloadThread(threading.Thread, StoppableThread): if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: self.cleanPending() if not requested: - self.stop.wait(5) + self.stop.wait(1) diff --git a/src/network/invthread.py b/src/network/invthread.py index 5852df0b..d0d758fb 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -18,26 +18,28 @@ class InvThread(threading.Thread, StoppableThread): self.initStop() self.name = "InvBroadcaster" + def handleLocallyGenerated(self, stream, hashId): + Dandelion().addHash(hashId, stream=stream) + for connection in BMConnectionPool().inboundConnections.values() + \ + BMConnectionPool().outboundConnections.values(): + if state.dandelion and connection != Dandelion().objectChildStem(hashId): + continue + connection.objectsNewToThem[hashId] = time() + def run(self): while not state.shutdown: chunk = [] while True: + # Dandelion fluff trigger by expiration + Dandelion().expire() try: data = invQueue.get(False) chunk.append((data[0], data[1])) # locally generated - if len(data) == 2: - Dandelion().addHash(data[1], None) - BMConnectionPool().handleReceivedObject(data[0], data[1]) - # came over the network - else: - source = BMConnectionPool().getConnectionByAddr(data[2]) - BMConnectionPool().handleReceivedObject(data[0], data[1], source) + if len(data) == 2 or data[2] is None: + self.handleLocallyGenerated(data[0], data[1]) except Queue.Empty: break - # connection not found, handle it as if generated locally - except KeyError: - BMConnectionPool().handleReceivedObject(data[0], data[1]) if chunk: for connection in BMConnectionPool().inboundConnections.values() + \ @@ -53,12 +55,13 @@ class InvThread(threading.Thread, StoppableThread): except KeyError: continue try: - if connection == Dandelion().hashMap[inv[1]]: + if connection == Dandelion().objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off - # send a normal inv if stem node doesn't support dandelion - if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion") and \ - connection.services | protocol.NODE_DANDELION > 0: + if randint(1, 100) >= state.dandelion: + fluffs.append(inv[1]) + # send a dinv only if the stem node supports dandelion + elif connection.services & protocol.NODE_DANDELION > 0: stems.append(inv[1]) else: fluffs.append(inv[1]) @@ -79,6 +82,6 @@ class InvThread(threading.Thread, StoppableThread): invQueue.task_done() if Dandelion().refresh < time(): - BMConnectionPool().reRandomiseDandelionStems() + Dandelion().reRandomiseStems() self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 327304d9..66b0685b 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -1,9 +1,8 @@ -from Queue import Queue import time from threading import RLock -from debug import logger from inventory import Inventory +import network.connectionpool from network.dandelion import Dandelion from randomtrackingdict import RandomTrackingDict from state import missingObjects @@ -80,13 +79,32 @@ class ObjectTracker(object): del self.objectsNewToThem[hashId] except KeyError: pass - # Fluff trigger by cycle detection - if hashId not in Inventory() or hashId in Dandelion().hashMap: - if hashId in Dandelion().hashMap: - Dandelion().fluffTrigger(hashId) - if hashId not in missingObjects: - missingObjects[hashId] = time.time() - self.objectsNewToMe[hashId] = True + if hashId not in missingObjects: + missingObjects[hashId] = time.time() + self.objectsNewToMe[hashId] = True + + def handleReceivedObject(self, streamNumber, hashid): + for i in network.connectionpool.BMConnectionPool().inboundConnections.values() + network.connectionpool.BMConnectionPool().outboundConnections.values(): + if not i.fullyEstablished: + continue + try: + del i.objectsNewToMe[hashid] + except KeyError: + if streamNumber in i.streams and \ + (not Dandelion().hasHash(hashid) or \ + Dandelion().objectChildStem(hashid) == i): + with i.objectsNewToThemLock: + i.objectsNewToThem[hashid] = time.time() + # update stream number, which we didn't have when we just received the dinv + # also resets expiration of the stem mode + Dandelion().setHashStream(hashid, streamNumber) + + if i == self: + try: + with i.objectsNewToThemLock: + del i.objectsNewToThem[hashid] + except KeyError: + pass def hasAddr(self, addr): if haveBloom: @@ -109,4 +127,3 @@ class ObjectTracker(object): # tracking inv # - per node hash of items that neither the remote node nor we have # - diff --git a/src/network/tcp.py b/src/network/tcp.py index 5a27aca3..33c4b6ca 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -168,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher): with self.objectsNewToThemLock: for objHash in Inventory().unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv - if objHash in Dandelion().hashMap: + if Dandelion().hasHash(objHash): continue bigInvList[objHash] = 0 self.objectsNewToThem[objHash] = time.time() diff --git a/src/protocol.py b/src/protocol.py index ef31b6c1..e5b2c5c2 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -196,7 +196,7 @@ def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server payload += pack('>q', NODE_NETWORK | (NODE_SSL if haveSSL(server) else 0) | - (NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0) + (NODE_DANDELION if state.dandelion else 0) ) payload += pack('>q', int(time.time())) @@ -213,7 +213,7 @@ def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server payload += pack('>q', NODE_NETWORK | (NODE_SSL if haveSSL(server) else 0) | - (NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0) + (NODE_DANDELION if state.dandelion else 0) ) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. diff --git a/src/state.py b/src/state.py index 32433e2d..73e4f789 100644 --- a/src/state.py +++ b/src/state.py @@ -53,3 +53,5 @@ def resetNetworkProtocolAvailability(): networkProtocolAvailability = {'IPv4': None, 'IPv6': None, 'onion': None} resetNetworkProtocolAvailability() + +dandelion = 0