Dandelion updates & fixes

- Addresses #1049
- Add dandelion routes for locally generated objects
- Minor bugfixes
- Send dinv commands on stem objects (instead of always sending inv
command)
This commit is contained in:
Peter Šurda 2017-09-25 08:49:21 +02:00
parent 6ce86b1d0a
commit d574b167d8
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 35 additions and 10 deletions

View File

@ -320,7 +320,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
if self.dandelionRefresh < time.time(): if self.dandelionRefresh < time.time():
self.dandelionRoutes = network.connectionpool.dandelionRouteSelector(self) self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self)
self.dandelionRefresh = time.time() + REASSIGN_INTERVAL self.dandelionRefresh = time.time() + REASSIGN_INTERVAL
for i in items: for i in items:

View File

@ -51,12 +51,12 @@ class BMConnectionPool(object):
except KeyError: except KeyError:
pass pass
def dandelionRouteSelector(node): def dandelionRouteSelector(self, node):
# Choose 2 peers randomly # Choose 2 peers randomly
# TODO: handle streams # TODO: handle streams
peers = [] peers = []
connections = BMConnectionPool().inboundConnections.values() + \ connections = self.inboundConnections.values() + \
BMConnectionPool().outboundConnections.values() self.outboundConnections.values()
random.shuffle(connections) random.shuffle(connections)
for i in connections: for i in connections:
if i == node: if i == node:

View File

@ -1,10 +1,13 @@
import Queue import Queue
from random import randint
import threading import threading
from time import time
import addresses import addresses
from bmconfigparser import BMConfigParser
from helper_threading import StoppableThread from helper_threading import StoppableThread
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import DandelionStems from network.dandelion import DandelionStems, REASSIGN_INTERVAL
from queues import invQueue from queues import invQueue
import protocol import protocol
import state import state
@ -14,15 +17,30 @@ class InvThread(threading.Thread, StoppableThread):
threading.Thread.__init__(self, name="InvBroadcaster") threading.Thread.__init__(self, name="InvBroadcaster")
self.initStop() self.initStop()
self.name = "InvBroadcaster" self.name = "InvBroadcaster"
# for locally generated objects
self.dandelionRoutes = []
self.dandelionRefresh = 0
def dandelionLocalRouteRefresh(self):
if self.dandelionRefresh < time():
self.dandelionRoutes = BMConnectionPool().dandelionRouteSelector(None)
self.dandelionRefresh = time() + REASSIGN_INTERVAL
def run(self): def run(self):
while not state.shutdown: while not state.shutdown:
chunk = [] chunk = []
while True: while True:
self.dandelionLocalRouteRefresh()
try: try:
data = invQueue.get(False) data = invQueue.get(False)
# locally generated
if len(data) == 2: if len(data) == 2:
BMConnectionPool().handleReceivedObject(data[0], data[1]) BMConnectionPool().handleReceivedObject(data[0], data[1])
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion"):
DandelionStems.add(data[1], self.dandelionRoutes)
# came over the network
else: else:
source = BMConnectionPool().getConnectionByAddr(data[2]) source = BMConnectionPool().getConnectionByAddr(data[2])
BMConnectionPool().handleReceivedObject(data[0], data[1], source) BMConnectionPool().handleReceivedObject(data[0], data[1], source)
@ -36,20 +54,27 @@ class InvThread(threading.Thread, StoppableThread):
if chunk: if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \ for connection in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values(): BMConnectionPool().outboundConnections.values():
hashes = [] fluffs = []
stems = []
for inv in chunk: for inv in chunk:
if inv[0] not in connection.streams: if inv[0] not in connection.streams:
continue continue
if inv in DandelionStems().stem and connection not in DandelionStems().stem[inv]: if inv[1] in DandelionStems().stem:
if connection in DandelionStems().stem[inv[1]]:
stems.append(inv[1])
continue continue
# else
try: try:
with connection.objectsNewToThemLock: with connection.objectsNewToThemLock:
del connection.objectsNewToThem[inv[1]] del connection.objectsNewToThem[inv[1]]
hashes.append(inv[1]) fluffs.append(inv[1])
except KeyError: except KeyError:
continue continue
if hashes: if fluffs:
connection.append_write_buf(protocol.CreatePacket('inv', \ connection.append_write_buf(protocol.CreatePacket('inv', \
addresses.encodeVarint(len(hashes)) + "".join(hashes))) addresses.encodeVarint(len(fluffs)) + "".join(fluffs)))
if stems:
connection.append_write_buf(protocol.CreatePacket('dinv', \
addresses.encodeVarint(len(stems)) + "".join(stems)))
invQueue.iterate() invQueue.iterate()
self.stop.wait(1) self.stop.wait(1)