diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index f5793ba5..1a091c4c 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -54,6 +54,7 @@ from bmconfigparser import BMConfigParser from inventory import Inventory from network.connectionpool import BMConnectionPool +from network.dandelion import DandelionStems from network.networkthread import BMNetworkThread from network.receivequeuethread import ReceiveQueueThread from network.announcethread import AnnounceThread @@ -248,6 +249,7 @@ class Main: sqlLookup.start() Inventory() # init + DandelionStems() # init, needs to be early because other thread may access it early # SMTP delivery thread if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index acc4476a..bb4377a2 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -20,6 +20,7 @@ BMConfigDefaults = { }, "network": { "bind": '', + "dandelion": 0, }, "inventory": { "storage": "sqlite", diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index c37c3ecf..2e4140b6 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -10,6 +10,7 @@ from helper_sql import * from helper_threading import * from inventory import Inventory from network.connectionpool import BMConnectionPool +from network.dandelion import DandelionStems from debug import logger import knownnodes import queues @@ -126,6 +127,10 @@ class singleCleaner(threading.Thread, StoppableThread): # inv/object tracking for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): connection.clean() + # dandelion fluff trigger by expiration + for h, t in DandelionStems().timeouts: + if time.time() > t: + DandelionStems().remove(h) # discovery tracking exp = time.time() - singleCleaner.expireDiscoveredPeers diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 98a14b5e..4cde0c4f 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -4,6 +4,7 @@ import time from addresses import calculateInventoryHash from debug import logger from inventory import Inventory +from network.dandelion import DandelionStems import protocol import state @@ -66,6 +67,9 @@ class BMObject(object): raise BMObjectUnwantedStreamError() def checkAlreadyHave(self): + # if it's a stem duplicate, pretend we don't have it + if self.inventoryHash in DandelionStems().stem: + return if self.inventoryHash in Inventory(): raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index dbdc26d2..d5214471 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -1,6 +1,7 @@ import base64 import hashlib import time +import random import socket import struct @@ -9,6 +10,7 @@ from debug import logger from inventory import Inventory import knownnodes from network.advanceddispatcher import AdvancedDispatcher +from network.dandelion import DandelionStems, REASSIGN_INTERVAL from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, \ BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError import network.connectionpool @@ -61,6 +63,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.payloadOffset = 0 self.expectBytes = protocol.Header.size self.object = None + self.dandelionRoutes = [] + self.dandelionRefresh = 0 def state_bm_header(self): self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size]) @@ -266,13 +270,23 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # skip? if time.time() < self.skipUntil: return True - #TODO make this more asynchronous and allow reordering + #TODO make this more asynchronous + random.shuffle(items) for i in items: - try: - self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) - except KeyError: + if i in DandelionStems().stem and \ + self not in DandelionStems().stem[i]: self.antiIntersectionDelay() - logger.info('%s asked for an object we don\'t have.', self.destination) + logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination) + break + else: + try: + self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) + except KeyError: + self.antiIntersectionDelay() + logger.info('%s asked for an object we don\'t have.', self.destination) + break + # I think that aborting after the first missing/stem object is more secure + # when using random reordering, as the recipient won't know exactly which objects we refuse to deliver return True def bm_command_inv(self): @@ -289,6 +303,34 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True + def bm_command_dinv(self): + """ + Dandelion stem announce + """ + items = self.decode_payload_content("l32s") + + if len(items) >= BMProto.maxObjectCount: + logger.error("Too many items in dinv message!") + raise BMProtoExcessiveDataError() + else: + pass + + # ignore command if dandelion turned off + if BMConfigParser().safeGetBoolean("network", "dandelion") == 0: + return True + + if self.dandelionRefresh < time.time(): + self.dandelionRoutes = network.connectionpool.dandelionRouteSelector(self) + self.dandelionRefresh = time.time() + REASSIGN_INTERVAL + + for i in items: + # Fluff trigger by RNG, per item + if random.randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion"): + DandelionStem().add(i, self.dandelionRoutes) + self.handleReceivedInventory(i) + + return True + def bm_command_object(self): objectOffset = self.payloadOffset nonce, expiresTime, objectType, version, streamNumber = self.decode_payload_content("QQIvv") diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 3a7f9e6d..cde5c9eb 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -51,6 +51,22 @@ class BMConnectionPool(object): except KeyError: pass + def dandelionRouteSelector(node): + # Choose 2 peers randomly + # TODO: handle streams + peers = [] + connections = BMConnectionPool().inboundConnections.values() + \ + BMConnectionPool().outboundConnections.values() + random.shuffle(connections) + for i in connections: + if i == node: + continue + if i.services | protocol.NODE_DANDELION: + peers.append(i) + if len(peers) == 2: + break + return peers + def connectToStream(self, streamNumber): self.streams.append(streamNumber) diff --git a/src/network/dandelion.py b/src/network/dandelion.py new file mode 100644 index 00000000..ea27915f --- /dev/null +++ b/src/network/dandelion.py @@ -0,0 +1,29 @@ +import random +from threading import RLock + +import protocol +from singleton import Singleton + +# randomise routes after 600 seconds +REASSIGN_INTERVAL = 600 +FLUFF_TRIGGER_TIMEOUT = 300 + +@Singleton +class DandelionStems(): + def __init__(self): + self.stem = {} + self.timeouts = {} + self.lock = RLock() + + def add(self, hashId, stems): + with self.lock: + self.stem[hashId] = stems + self.timeouts[hashId] = time.time() + + def remove(self, hashId): + with self.lock: + try: + del self.stem[hashId] + del self.timeouts[hashId] + except KeyError: + pass diff --git a/src/network/invthread.py b/src/network/invthread.py index d680ea13..a868ce95 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -4,6 +4,7 @@ import threading import addresses from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool +from network.dandelion import DandelionStems from queues import invQueue import protocol import state @@ -39,6 +40,8 @@ class InvThread(threading.Thread, StoppableThread): for inv in chunk: if inv[0] not in connection.streams: continue + if inv in DandelionStems().stem and connection not in DandelionStems().stem[inv]: + continue try: with connection.objectsNewToThemLock: del connection.objectsNewToThem[inv[1]] diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 4541ea76..7149f4b1 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,6 +4,7 @@ from threading import RLock from debug import logger from inventory import Inventory +from network.dandelion import DandelionStems haveBloom = False @@ -83,6 +84,11 @@ class ObjectTracker(object): if hashId not in Inventory(): with self.objectsNewToMeLock: self.objectsNewToMe[hashId] = True + elif hashId in DandelionStems().stem: + # Fluff trigger by cycle detection + DandelionStems().remove(hashId) + with self.objectsNewToMeLock: + self.objectsNewToMe[hashId] = True def hasAddr(self, addr): if haveBloom: diff --git a/src/network/tcp.py b/src/network/tcp.py index 0fcbc160..60acb22c 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -18,6 +18,7 @@ from network.advanceddispatcher import AdvancedDispatcher from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError import network.connectionpool +from network.dandelion import DandelionStems from network.node import Node import network.asyncore_pollchoose as asyncore from network.proxy import Proxy, ProxyError, GeneralProxyError @@ -88,7 +89,7 @@ class TCPConnection(BMProto, TLSDispatcher): if self.skipUntil > time.time(): logger.debug("Initial skipping processing getdata for %.2fs", self.skipUntil - time.time()) else: - logger.debug("Skipping processing getdata due to missing object for %.2fs", self.skipUntil - time.time()) + logger.debug("Skipping processing getdata due to missing object for %.2fs", delay) self.skipUntil = time.time() + delay def state_connection_fully_established(self): @@ -165,6 +166,9 @@ class TCPConnection(BMProto, TLSDispatcher): # may lock for a long time, but I think it's better than thousands of small locks with self.objectsNewToThemLock: for objHash in Inventory().unexpired_hashes_by_stream(stream): + # don't advertise stem objects on bigInv + if objHash in DandelionStems().stem: + continue bigInvList[objHash] = 0 self.objectsNewToThem[objHash] = time.time() objectCount = 0 diff --git a/src/protocol.py b/src/protocol.py index 7ad0db17..ef31b6c1 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -23,6 +23,7 @@ from version import softwareVersion #Service flags NODE_NETWORK = 1 NODE_SSL = 2 +NODE_DANDELION = 8 #Bitfield flags BITFIELD_DOESACK = 1 @@ -191,7 +192,12 @@ def CreatePacket(command, payload=''): def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server = False, nodeid = None): payload = '' payload += pack('>L', 3) # protocol version. - payload += pack('>q', NODE_NETWORK|(NODE_SSL if haveSSL(server) else 0)) # bitflags of the services I offer. + # bitflags of the services I offer. + payload += pack('>q', + NODE_NETWORK | + (NODE_SSL if haveSSL(server) else 0) | + (NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0) + ) payload += pack('>q', int(time.time())) payload += pack( @@ -203,7 +209,12 @@ def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server payload += encodeHost(remoteHost) payload += pack('>H', remotePort) # remote IPv6 and port - payload += pack('>q', NODE_NETWORK|(NODE_SSL if haveSSL(server) else 0)) # bitflags of the services I offer. + # bitflags of the services I offer. + payload += pack('>q', + NODE_NETWORK | + (NODE_SSL if haveSSL(server) else 0) | + (NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0) + ) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. # we have a separate extPort and