Dandelion updates

- fixes and feedback from @gfanti and @amiller
- addresses #1049
- minor refactoring
- two global child stems with fixed mapping between parent and
child stem
- allow child stems which don't support dandelion
- only allow outbound connections to be stems
- adjust stems if opening/closing outbound connections (should
allow partial dandelion functionality when not enough outbound
connections are available instead of breaking)
This commit is contained in:
Peter Šurda 2017-10-20 01:21:49 +02:00
parent 15857e6551
commit 2d34e73648
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
9 changed files with 122 additions and 68 deletions

View File

@ -54,7 +54,7 @@ from bmconfigparser import BMConfigParser
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import DandelionStems
from network.dandelion import Dandelion
from network.networkthread import BMNetworkThread
from network.receivequeuethread import ReceiveQueueThread
from network.announcethread import AnnounceThread
@ -251,7 +251,7 @@ class Main:
sqlLookup.start()
Inventory() # init
DandelionStems() # init, needs to be early because other thread may access it early
Dandelion() # init, needs to be early because other thread may access it early
# SMTP delivery thread
if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '':

View File

@ -10,7 +10,7 @@ from helper_sql import *
from helper_threading import *
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import DandelionStems
from network.dandelion import Dandelion
from debug import logger
import knownnodes
import queues
@ -136,9 +136,7 @@ class singleCleaner(threading.Thread, StoppableThread):
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
connection.clean()
# dandelion fluff trigger by expiration
for h, t in DandelionStems().timeouts:
if time.time() > t:
DandelionStems().remove(h)
Dandelion().expire()
# discovery tracking
exp = time.time() - singleCleaner.expireDiscoveredPeers

View File

@ -4,7 +4,7 @@ import time
from addresses import calculateInventoryHash
from debug import logger
from inventory import Inventory
from network.dandelion import DandelionStems
from network.dandelion import Dandelion
import protocol
import state
@ -68,7 +68,7 @@ class BMObject(object):
def checkAlreadyHave(self):
# if it's a stem duplicate, pretend we don't have it
if self.inventoryHash in DandelionStems().stem:
if self.inventoryHash in Dandelion().hashMap:
return
if self.inventoryHash in Inventory():
raise BMObjectAlreadyHaveError()

View File

@ -10,7 +10,7 @@ from debug import logger
from inventory import Inventory
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher
from network.dandelion import DandelionStems, REASSIGN_INTERVAL
from network.dandelion import Dandelion
from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, \
BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError
import network.connectionpool
@ -279,8 +279,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
#TODO make this more asynchronous
random.shuffle(items)
for i in map(str, items):
if i in DandelionStems().stem and \
self != DandelionStems().stem[i]:
if i in Dandelion().hashMap and \
self != Dandelion().hashMap[i]:
self.antiIntersectionDelay()
logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination)
break
@ -325,12 +325,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
if BMConfigParser().safeGetBoolean("network", "dandelion") == 0:
return True
if self.dandelionRefresh < time.time():
self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self)
self.dandelionRefresh = time.time() + REASSIGN_INTERVAL
for i in map(str, items):
DandelionStems().add(i, self, self.dandelionRoutes)
Dandelion().addHash(i, self)
self.handleReceivedInventory(i)
return True

View File

@ -9,6 +9,7 @@ from debug import logger
import helper_bootstrap
from network.proxy import Proxy
import network.bmproto
from network.dandelion import Dandelion
import network.tcp
import network.udp
from network.connectionchooser import chooseConnection
@ -51,23 +52,10 @@ class BMConnectionPool(object):
except KeyError:
pass
def dandelionRouteSelector(self, node):
def reRandomiseDandelionStems(self):
# Choose 2 peers randomly
# TODO: handle streams
peers = []
connections = self.outboundConnections.values()
random.shuffle(connections)
for i in connections:
if i == node:
continue
try:
if i.services | protocol.NODE_DANDELION:
peers.append(i)
if len(peers) == 2:
break
except AttributeError:
continue
return peers
Dandelion().reRandomiseStems(self.outboundConnections.values())
def connectToStream(self, streamNumber):
self.streams.append(streamNumber)

View File

@ -1,4 +1,4 @@
from random import choice
from random import choice, shuffle
from threading import RLock
from time import time
@ -8,31 +8,101 @@ from singleton import Singleton
# randomise routes after 600 seconds
REASSIGN_INTERVAL = 600
FLUFF_TRIGGER_TIMEOUT = 300
MAX_STEMS = 2
@Singleton
class DandelionStems():
class Dandelion():
def __init__(self):
self.stem = {}
self.source = {}
self.timeouts = {}
self.stem = []
self.nodeMap = {}
self.hashMap = {}
self.timeout = {}
self.refresh = time() + REASSIGN_INTERVAL
self.lock = RLock()
def add(self, hashId, source, stems):
def addHash(self, hashId, source):
if BMConfigParser().safeGetInt('network', 'dandelion') == 0:
return
with self.lock:
try:
self.stem[hashId] = choice(stems)
except IndexError:
self.stem = None
self.source[hashId] = source
self.timeouts[hashId] = time()
self.hashMap[hashId] = self.getNodeStem(source)
self.timeout[hashId] = time() + FLUFF_TRIGGER_TIMEOUT
def remove(self, hashId):
def removeHash(self, hashId):
with self.lock:
try:
del self.stem[hashId]
del self.source[hashId]
del self.timeouts[hashId]
del self.hashMap[hashId]
except KeyError:
pass
try:
del self.timeout[hashId]
except KeyError:
pass
def maybeAddStem(self, connection):
# fewer than MAX_STEMS outbound connections at last reshuffle?
with self.lock:
if len(self.stem) < MAX_STEMS:
self.stem.append(connection)
# active mappings pointing nowhere
for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] is None):
self.nodeMap[k] = connection
for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] is None):
self.hashMap[k] = connection
def maybeRemoveStem(self, connection):
# is the stem active?
with self.lock:
if connection in self.stem:
self.stem.remove(connection)
# active mappings to pointing to the removed node
for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] == connection):
self.nodeMap[k] = None
for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] == connection):
self.hashMap[k] = None
if len(self.stem) < MAX_STEMS:
self.stem.append(connection)
def pickStem(self, parent=None):
try:
# pick a random from available stems
stem = choice(range(len(self.stem)))
if self.stem[stem] == parent:
# one stem available and it's the parent
if len(self.stem) == 1:
return None
# else, pick the other one
return self.stem[1 - stem]
# all ok
return self.stem[stem]
except IndexError:
# no stems available
return None
def getNodeStem(self, node=None):
with self.lock:
try:
return self.nodeMap[node]
except KeyError:
self.nodeMap[node] = self.pickStem()
return self.nodeMap[node]
def getHashStem(self, hashId):
with self.lock:
return self.hashMap[hashId]
def expire(self):
with self.lock:
deadline = time()
toDelete = [k for k, v in self.hashMap.iteritems() if self.timeout[k] < deadline]
for k in toDelete:
del self.timeout[k]
del self.hashMap[k]
def reRandomiseStems(self, connections):
shuffle(connections)
with self.lock:
# random two connections
self.stem = connections[:MAX_STEMS]
self.nodeMap = {}
# hashMap stays to cater for pending stems
self.refresh = time() + REASSIGN_INTERVAL

View File

@ -7,7 +7,7 @@ import addresses
from bmconfigparser import BMConfigParser
from helper_threading import StoppableThread
from network.connectionpool import BMConnectionPool
from network.dandelion import DandelionStems, REASSIGN_INTERVAL
from network.dandelion import Dandelion
from queues import invQueue
import protocol
import state
@ -17,26 +17,17 @@ class InvThread(threading.Thread, StoppableThread):
threading.Thread.__init__(self, name="InvBroadcaster")
self.initStop()
self.name = "InvBroadcaster"
# for locally generated objects
self.dandelionRoutes = []
self.dandelionRefresh = 0
def dandelionLocalRouteRefresh(self):
if self.dandelionRefresh < time():
self.dandelionRoutes = BMConnectionPool().dandelionRouteSelector(None)
self.dandelionRefresh = time() + REASSIGN_INTERVAL
def run(self):
while not state.shutdown:
chunk = []
while True:
self.dandelionLocalRouteRefresh()
try:
data = invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2:
DandelionStems().add(data[1], None, self.dandelionRoutes)
Dandelion().addHash(data[1], None)
BMConnectionPool().handleReceivedObject(data[0], data[1])
# came over the network
else:
@ -61,17 +52,19 @@ class InvThread(threading.Thread, StoppableThread):
del connection.objectsNewToThem[inv[1]]
except KeyError:
continue
if inv[1] in DandelionStems().stem:
if connection == DandelionStems().stem[inv[1]]:
try:
if connection == Dandelion().hashMap[inv[1]]:
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion"):
# send a normal inv if stem node doesn't support dandelion
if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion") and \
connection.services | protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
continue
else:
except KeyError:
fluffs.append(inv[1])
if fluffs:
shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket('inv', \
@ -80,7 +73,12 @@ class InvThread(threading.Thread, StoppableThread):
shuffle(stems)
connection.append_write_buf(protocol.CreatePacket('dinv', \
addresses.encodeVarint(len(stems)) + "".join(stems)))
invQueue.iterate()
for i in range(len(chunk)):
invQueue.task_done()
if Dandelion().refresh < time():
BMConnectionPool().reRandomiseDandelionStems()
self.stop.wait(1)

View File

@ -4,7 +4,7 @@ from threading import RLock
from debug import logger
from inventory import Inventory
from network.dandelion import DandelionStems
from network.dandelion import Dandelion
haveBloom = False
@ -84,9 +84,9 @@ class ObjectTracker(object):
if hashId not in Inventory():
with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True
elif hashId in DandelionStems().stem:
elif hashId in Dandelion().hashMap:
# Fluff trigger by cycle detection
DandelionStems().remove(hashId)
Dandelion().removeHash(hashId)
with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True

View File

@ -18,7 +18,7 @@ from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProtoError, BMProtoInsufficientDataError, BMProtoExcessiveDataError, BMProto
from network.bmobject import BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObjectExpiredError, BMObjectUnwantedStreamError, BMObjectInvalidError, BMObjectAlreadyHaveError
import network.connectionpool
from network.dandelion import DandelionStems
from network.dandelion import Dandelion
from network.node import Node
import network.asyncore_pollchoose as asyncore
from network.proxy import Proxy, ProxyError, GeneralProxyError
@ -106,6 +106,8 @@ class TCPConnection(BMProto, TLSDispatcher):
self.fullyEstablished = True
if self.isOutbound:
knownnodes.increaseRating(self.destination)
if self.isOutbound:
Dandelion().maybeAddStem(self)
self.sendAddr()
self.sendBigInv()
@ -166,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock:
for objHash in Inventory().unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv
if objHash in DandelionStems().stem:
if objHash in Dandelion().hashMap:
continue
bigInvList[objHash] = 0
self.objectsNewToThem[objHash] = time.time()
@ -218,6 +220,8 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.decreaseRating(self.destination)
if self.fullyEstablished:
UISignalQueue.put(('updateNetworkStatusTab', (self.isOutbound, False, self.destination)))
if self.isOutbound:
Dandelion().maybeRemoveStem(self)
BMProto.handle_close(self)