Dandelion update

- dandelion fixes
- try to wait as long as possible before expiration if there are no
outbound connections
- expire in invThread rather than singleCleaner thread
- deduplication of code in inv and dinv command methods
- turn on by default, seems to work correctly now
- turn off dandelion if outbound connections are disabled
- start tracking downloads earlier, and faster download loop
- remove some obsolete lines
- minor PEP8 updates
This commit is contained in:
Peter Šurda 2018-02-03 11:46:39 +01:00
parent 9db0f5bcb3
commit fd1a6c1fa1
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
13 changed files with 149 additions and 138 deletions

View File

@ -232,6 +232,11 @@ class Main:
helper_threading.set_thread_name("PyBitmessage")
state.dandelion = BMConfigParser().safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them, stem objects will get stuck forever
if state.dandelion and not BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections'):
state.dandelion = 0
helper_bootstrap.knownNodes()
# Start the address generation thread
addressGeneratorThread = addressGenerator()

View File

@ -20,7 +20,7 @@ BMConfigDefaults = {
},
"network": {
"bind": '',
"dandelion": 0,
"dandelion": 90,
},
"inventory": {
"storage": "sqlite",

View File

@ -10,7 +10,6 @@ from helper_sql import *
from helper_threading import *
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from debug import logger
import knownnodes
import queues
@ -133,8 +132,6 @@ class singleCleaner(threading.Thread, StoppableThread):
# inv/object tracking
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
connection.clean()
# dandelion fluff trigger by expiration
Dandelion().expire()
# discovery tracking
exp = time.time() - singleCleaner.expireDiscoveredPeers

View File

@ -74,7 +74,7 @@ class BMObject(object):
def checkAlreadyHave(self):
# if it's a stem duplicate, pretend we don't have it
if self.inventoryHash in Dandelion().hashMap:
if Dandelion().hasHash(self.inventoryHash):
return
if self.inventoryHash in Inventory():
raise BMObjectAlreadyHaveError()

View File

@ -66,8 +66,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.payloadOffset = 0
self.expectBytes = protocol.Header.size
self.object = None
self.dandelionRoutes = []
self.dandelionRefresh = 0
def state_bm_header(self):
self.magic, self.command, self.payloadLength, self.checksum = protocol.Header.unpack(self.read_buf[:protocol.Header.size])
@ -282,8 +280,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
#TODO make this more asynchronous
random.shuffle(items)
for i in map(str, items):
if i in Dandelion().hashMap and \
self != Dandelion().hashMap[i]:
if Dandelion().hasHash(i) and \
self != Dandelion().objectChildStem(i):
self.antiIntersectionDelay()
logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination)
break
@ -298,41 +296,36 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# when using random reordering, as the recipient won't know exactly which objects we refuse to deliver
return True
def bm_command_inv(self):
def _command_inv(self, dandelion=False):
items = self.decode_payload_content("l32s")
if len(items) >= BMProto.maxObjectCount:
logger.error("Too many items in inv message!")
logger.error("Too many items in %sinv message!", "d" if dandelion else "")
raise BMProtoExcessiveDataError()
else:
pass
# ignore dinv if dandelion turned off
if dandelion and not state.dandelion:
return True
for i in map(str, items):
if i in Inventory() and not Dandelion().hasHash(i):
continue
if dandelion and not Dandelion().hasHash(i):
Dandelion().addHash(i, self)
self.handleReceivedInventory(i)
return True
def bm_command_inv(self):
return self._command_inv(False)
def bm_command_dinv(self):
"""
Dandelion stem announce
"""
items = self.decode_payload_content("l32s")
if len(items) >= BMProto.maxObjectCount:
logger.error("Too many items in dinv message!")
raise BMProtoExcessiveDataError()
else:
pass
# ignore command if dandelion turned off
if BMConfigParser().safeGetBoolean("network", "dandelion") == 0:
return True
for i in map(str, items):
self.handleReceivedInventory(i)
Dandelion().addHash(i, self)
return True
return self._command_inv(True)
def bm_command_object(self):
objectOffset = self.payloadOffset
@ -368,8 +361,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError:
pass
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag))
self.handleReceivedObject(self.object.streamNumber, self.object.inventoryHash)
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination))
return True

View File

@ -8,10 +8,10 @@ from bmconfigparser import BMConfigParser
from debug import logger
import helper_bootstrap
from network.proxy import Proxy
import network.bmproto
from network.bmproto import BMProto
from network.dandelion import Dandelion
import network.tcp
import network.udp
from network.tcp import TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection
from network.udp import UDPSocket
from network.connectionchooser import chooseConnection
import network.asyncore_pollchoose as asyncore
import protocol
@ -33,31 +33,6 @@ class BMConnectionPool(object):
self.spawnWait = 2
self.bootstrapped = False
def handleReceivedObject(self, streamNumber, hashid, connection = None):
for i in self.inboundConnections.values() + self.outboundConnections.values():
if not isinstance(i, network.bmproto.BMProto):
continue
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
except KeyError:
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time()
if i == connection:
try:
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError:
pass
if hashid in Dandelion().fluff:
Dandelion().removeHash(hashid)
def reRandomiseDandelionStems(self):
# Choose 2 peers randomly
# TODO: handle streams
Dandelion().reRandomiseStems(self.outboundConnections.values())
def connectToStream(self, streamNumber):
self.streams.append(streamNumber)
@ -88,7 +63,7 @@ class BMConnectionPool(object):
return False
def addConnection(self, connection):
if isinstance(connection, network.udp.UDPSocket):
if isinstance(connection, UDPSocket):
return
if connection.isOutbound:
self.outboundConnections[connection.destination] = connection
@ -99,9 +74,9 @@ class BMConnectionPool(object):
self.inboundConnections[connection.destination.host] = connection
def removeConnection(self, connection):
if isinstance(connection, network.udp.UDPSocket):
if isinstance(connection, UDPSocket):
del self.udpSockets[connection.listening.host]
elif isinstance(connection, network.tcp.TCPServer):
elif isinstance(connection, TCPServer):
del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)]
elif connection.isOutbound:
try:
@ -135,18 +110,18 @@ class BMConnectionPool(object):
bind = self.getListeningIP()
port = BMConfigParser().safeGetInt("bitmessagesettings", "port")
# correct port even if it changed
ls = network.tcp.TCPServer(host=bind, port=port)
ls = TCPServer(host=bind, port=port)
self.listeningSockets[ls.destination] = ls
def startUDPSocket(self, bind=None):
if bind is None:
host = self.getListeningIP()
udpSocket = network.udp.UDPSocket(host=host, announcing=True)
udpSocket = UDPSocket(host=host, announcing=True)
else:
if bind is False:
udpSocket = network.udp.UDPSocket(announcing=False)
udpSocket = UDPSocket(announcing=False)
else:
udpSocket = network.udp.UDPSocket(host=bind, announcing=True)
udpSocket = UDPSocket(host=bind, announcing=True)
self.udpSockets[udpSocket.listening.host] = udpSocket
def loop(self):
@ -192,11 +167,11 @@ class BMConnectionPool(object):
# continue
try:
if (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS5"):
self.addConnection(network.tcp.Socks5BMConnection(chosen))
self.addConnection(Socks5BMConnection(chosen))
elif (BMConfigParser().safeGet("bitmessagesettings", "socksproxytype") == "SOCKS4a"):
self.addConnection(network.tcp.Socks4aBMConnection(chosen))
self.addConnection(Socks4aBMConnection(chosen))
elif not chosen.host.endswith(".onion"):
self.addConnection(network.tcp.TCPConnection(chosen))
self.addConnection(TCPConnection(chosen))
except socket.error as e:
if e.errno == errno.ENETUNREACH:
continue

View File

@ -1,62 +1,78 @@
from random import choice, shuffle
from collections import namedtuple
from random import choice, sample
from threading import RLock
from time import time
from bmconfigparser import BMConfigParser
import network.connectionpool
from debug import logging
from queues import invQueue
from singleton import Singleton
import state
# randomise routes after 600 seconds
REASSIGN_INTERVAL = 600
FLUFF_TRIGGER_TIMEOUT = 300
# trigger fluff due to expiration in 2 minutes
FLUFF_TRIGGER_TIMEOUT = 120
MAX_STEMS = 2
Stem = namedtuple('Stem', ['child', 'stream', 'timeout'])
@Singleton
class Dandelion():
def __init__(self):
# currently assignable child stems
self.stem = []
# currently assigned parent <-> child mappings
self.nodeMap = {}
# currently existing objects in stem mode
self.hashMap = {}
self.fluff = {}
self.timeout = {}
# when to rerandomise routes
self.refresh = time() + REASSIGN_INTERVAL
self.lock = RLock()
def addHash(self, hashId, source):
if BMConfigParser().safeGetInt('network', 'dandelion') == 0:
def addHash(self, hashId, source=None, stream=1):
if not state.dandelion:
return
with self.lock:
self.hashMap[hashId] = self.getNodeStem(source)
self.timeout[hashId] = time() + FLUFF_TRIGGER_TIMEOUT
self.hashMap[hashId] = Stem(
self.getNodeStem(source),
stream,
time() + FLUFF_TRIGGER_TIMEOUT)
def removeHash(self, hashId):
def setHashStream(self, hashId, stream=1):
with self.lock:
if hashId in self.hashMap:
self.hashMap[hashId] = Stem(
self.hashMap[hashId].child,
stream,
time() + FLUFF_TRIGGER_TIMEOUT)
def removeHash(self, hashId, reason="no reason specified"):
logging.debug("%s entering fluff mode due to %s.", ''.join('%02x'%ord(i) for i in hashId), reason)
with self.lock:
try:
del self.hashMap[hashId]
except KeyError:
pass
try:
del self.timeout[hashId]
except KeyError:
pass
try:
del self.fluff[hashId]
except KeyError:
pass
def fluffTrigger(self, hashId):
with self.lock:
self.fluff[hashId] = None
def hasHash(self, hashId):
return hashId in self.hashMap
def objectChildStem(self, hashId):
return self.hashMap[hashId].child
def maybeAddStem(self, connection):
# fewer than MAX_STEMS outbound connections at last reshuffle?
with self.lock:
if len(self.stem) < MAX_STEMS:
self.stem.append(connection)
# active mappings pointing nowhere
for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] is None):
for k in (k for k, v in self.nodeMap.iteritems() if v is None):
self.nodeMap[k] = connection
for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] is None):
self.hashMap[k] = connection
for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child is None}.iteritems():
self.hashMap[k] = Stem(connection, v.stream, time() + FLUFF_TRIGGER_TIMEOUT)
invQueue.put((v.stream, k, v.child))
def maybeRemoveStem(self, connection):
# is the stem active?
@ -64,12 +80,10 @@ class Dandelion():
if connection in self.stem:
self.stem.remove(connection)
# active mappings to pointing to the removed node
for k in (k for k, v in self.nodeMap.iteritems() if self.nodeMap[k] == connection):
for k in (k for k, v in self.nodeMap.iteritems() if v == connection):
self.nodeMap[k] = None
for k in (k for k, v in self.hashMap.iteritems() if self.hashMap[k] == connection):
self.hashMap[k] = None
if len(self.stem) < MAX_STEMS:
self.stem.append(connection)
for k, v in {k: v for k, v in self.hashMap.iteritems() if v.child == connection}.iteritems():
self.hashMap[k] = Stem(None, v.stream, time() + FLUFF_TRIGGER_TIMEOUT)
def pickStem(self, parent=None):
try:
@ -92,26 +106,26 @@ class Dandelion():
try:
return self.nodeMap[node]
except KeyError:
self.nodeMap[node] = self.pickStem()
self.nodeMap[node] = self.pickStem(node)
return self.nodeMap[node]
def getHashStem(self, hashId):
with self.lock:
return self.hashMap[hashId]
def expire(self):
with self.lock:
deadline = time()
toDelete = [k for k, v in self.hashMap.iteritems() if self.timeout[k] < deadline]
for k in toDelete:
del self.timeout[k]
del self.hashMap[k]
# only expire those that have a child node, i.e. those without a child not will stick around
toDelete = [[v.stream, k, v.child] for k, v in self.hashMap.iteritems() if v.timeout < deadline and v.child]
for row in toDelete:
self.removeHash(row[1], 'expiration')
invQueue.put((row[0], row[1], row[2]))
def reRandomiseStems(self, connections):
shuffle(connections)
def reRandomiseStems(self):
with self.lock:
# random two connections
self.stem = connections[:MAX_STEMS]
try:
# random two connections
self.stem = sample(network.connectionpool.BMConnectionPool().outboundConnections.values(), MAX_STEMS)
# not enough stems available
except ValueError:
self.stem = network.connectionpool.BMConnectionPool().outboundConnections.values()
self.nodeMap = {}
# hashMap stays to cater for pending stems
self.refresh = time() + REASSIGN_INTERVAL

View File

@ -3,6 +3,7 @@ import threading
import time
import addresses
from dandelion import Dandelion
from debug import logger
from helper_threading import StoppableThread
from inventory import Inventory
@ -54,7 +55,7 @@ class DownloadThread(threading.Thread, StoppableThread):
payload = bytearray()
payload.extend(addresses.encodeVarint(len(request)))
for chunk in request:
if chunk in Inventory():
if chunk in Inventory() and not Dandelion().hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:
@ -70,4 +71,4 @@ class DownloadThread(threading.Thread, StoppableThread):
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval:
self.cleanPending()
if not requested:
self.stop.wait(5)
self.stop.wait(1)

View File

@ -18,26 +18,28 @@ class InvThread(threading.Thread, StoppableThread):
self.initStop()
self.name = "InvBroadcaster"
def handleLocallyGenerated(self, stream, hashId):
Dandelion().addHash(hashId, stream=stream)
for connection in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
continue
connection.objectsNewToThem[hashId] = time()
def run(self):
while not state.shutdown:
chunk = []
while True:
# Dandelion fluff trigger by expiration
Dandelion().expire()
try:
data = invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2:
Dandelion().addHash(data[1], None)
BMConnectionPool().handleReceivedObject(data[0], data[1])
# came over the network
else:
source = BMConnectionPool().getConnectionByAddr(data[2])
BMConnectionPool().handleReceivedObject(data[0], data[1], source)
if len(data) == 2 or data[2] is None:
self.handleLocallyGenerated(data[0], data[1])
except Queue.Empty:
break
# connection not found, handle it as if generated locally
except KeyError:
BMConnectionPool().handleReceivedObject(data[0], data[1])
if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \
@ -53,12 +55,13 @@ class InvThread(threading.Thread, StoppableThread):
except KeyError:
continue
try:
if connection == Dandelion().hashMap[inv[1]]:
if connection == Dandelion().objectChildStem(inv[1]):
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
# send a normal inv if stem node doesn't support dandelion
if randint(1, 100) < BMConfigParser().safeGetBoolean("network", "dandelion") and \
connection.services | protocol.NODE_DANDELION > 0:
if randint(1, 100) >= state.dandelion:
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
@ -79,6 +82,6 @@ class InvThread(threading.Thread, StoppableThread):
invQueue.task_done()
if Dandelion().refresh < time():
BMConnectionPool().reRandomiseDandelionStems()
Dandelion().reRandomiseStems()
self.stop.wait(1)

View File

@ -1,9 +1,8 @@
from Queue import Queue
import time
from threading import RLock
from debug import logger
from inventory import Inventory
import network.connectionpool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
from state import missingObjects
@ -80,13 +79,32 @@ class ObjectTracker(object):
del self.objectsNewToThem[hashId]
except KeyError:
pass
# Fluff trigger by cycle detection
if hashId not in Inventory() or hashId in Dandelion().hashMap:
if hashId in Dandelion().hashMap:
Dandelion().fluffTrigger(hashId)
if hashId not in missingObjects:
missingObjects[hashId] = time.time()
self.objectsNewToMe[hashId] = True
if hashId not in missingObjects:
missingObjects[hashId] = time.time()
self.objectsNewToMe[hashId] = True
def handleReceivedObject(self, streamNumber, hashid):
for i in network.connectionpool.BMConnectionPool().inboundConnections.values() + network.connectionpool.BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams and \
(not Dandelion().hasHash(hashid) or \
Dandelion().objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time()
# update stream number, which we didn't have when we just received the dinv
# also resets expiration of the stem mode
Dandelion().setHashStream(hashid, streamNumber)
if i == self:
try:
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError:
pass
def hasAddr(self, addr):
if haveBloom:
@ -109,4 +127,3 @@ class ObjectTracker(object):
# tracking inv
# - per node hash of items that neither the remote node nor we have
#

View File

@ -168,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock:
for objHash in Inventory().unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv
if objHash in Dandelion().hashMap:
if Dandelion().hasHash(objHash):
continue
bigInvList[objHash] = 0
self.objectsNewToThem[objHash] = time.time()

View File

@ -196,7 +196,7 @@ def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server
payload += pack('>q',
NODE_NETWORK |
(NODE_SSL if haveSSL(server) else 0) |
(NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0)
(NODE_DANDELION if state.dandelion else 0)
)
payload += pack('>q', int(time.time()))
@ -213,7 +213,7 @@ def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server
payload += pack('>q',
NODE_NETWORK |
(NODE_SSL if haveSSL(server) else 0) |
(NODE_DANDELION if BMConfigParser().safeGetInt('network', 'dandelion') > 0 else 0)
(NODE_DANDELION if state.dandelion else 0)
)
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.

View File

@ -53,3 +53,5 @@ def resetNetworkProtocolAvailability():
networkProtocolAvailability = {'IPv4': None, 'IPv6': None, 'onion': None}
resetNetworkProtocolAvailability()
dandelion = 0