moved dandelion_enabled from state to Dandelion class as enabled attr

This commit is contained in:
anand k 2024-05-29 09:18:53 +05:30
parent a209d65a26
commit a71b44e95c
No known key found for this signature in database
GPG Key ID: 515AC24FA525DDE0
13 changed files with 76 additions and 68 deletions

View File

@ -12,6 +12,7 @@ The PyBitmessage startup script
import os import os
import sys import sys
try: try:
import pathmagic import pathmagic
except ImportError: except ImportError:
@ -156,13 +157,6 @@ class Main(object):
set_thread_name("PyBitmessage") set_thread_name("PyBitmessage")
state.dandelion_enabled = config.safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them,
# stem objects will get stuck forever
if state.dandelion_enabled and not config.safeGetBoolean(
'bitmessagesettings', 'sendoutgoingconnections'):
state.dandelion_enabled = 0
if state.testmode or config.safeGetBoolean( if state.testmode or config.safeGetBoolean(
'bitmessagesettings', 'extralowdifficulty'): 'bitmessagesettings', 'extralowdifficulty'):
defaults.networkDefaultProofOfWorkNonceTrialsPerByte = int( defaults.networkDefaultProofOfWorkNonceTrialsPerByte = int(

View File

@ -1,9 +1,10 @@
""" """
Network subsystem package Network subsystem package
""" """
from .dandelion import Dandelion
from .threads import StoppableThread from .threads import StoppableThread
dandelion_ins = Dandelion()
__all__ = ["StoppableThread"] __all__ = ["StoppableThread"]
@ -21,6 +22,11 @@ def start(config, state):
from .receivequeuethread import ReceiveQueueThread from .receivequeuethread import ReceiveQueueThread
from .uploadthread import UploadThread from .uploadthread import UploadThread
# check and set dandelion enabled value at network startup
dandelion_ins.init_dandelion_enabled(config)
# pass pool instance into dandelion class instance
dandelion_ins.init_pool(connectionpool.pool)
readKnownNodes() readKnownNodes()
connectionpool.pool.connectToStream(1) connectionpool.pool.connectToStream(1)
for thread in ( for thread in (

View File

@ -6,8 +6,8 @@ import time
import protocol import protocol
import state import state
import dandelion
import connectionpool import connectionpool
from network import dandelion_ins
from highlevelcrypto import calculateInventoryHash from highlevelcrypto import calculateInventoryHash
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -113,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
or advertise it unnecessarily) or advertise it unnecessarily)
""" """
# if it's a stem duplicate, pretend we don't have it # if it's a stem duplicate, pretend we don't have it
if dandelion.instance.hasHash(self.inventoryHash): if dandelion_ins.hasHash(self.inventoryHash):
return return
if self.inventoryHash in state.Inventory: if self.inventoryHash in state.Inventory:
raise BMObjectAlreadyHaveError() raise BMObjectAlreadyHaveError()

View File

@ -15,7 +15,6 @@ import addresses
import knownnodes import knownnodes
import protocol import protocol
import state import state
import dandelion
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue from queues import invQueue, objectProcessorQueue, portCheckerQueue
@ -27,7 +26,7 @@ from network.bmobject import (
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.proxy import ProxyError from network.proxy import ProxyError
from network import dandelion_ins
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects
@ -351,14 +350,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
raise BMProtoExcessiveDataError() raise BMProtoExcessiveDataError()
# ignore dinv if dandelion turned off # ignore dinv if dandelion turned off
if extend_dandelion_stem and not state.dandelion_enabled: if extend_dandelion_stem and not dandelion_ins.enabled:
return True return True
for i in map(str, items): for i in map(str, items):
if i in state.Inventory and not dandelion.instance.hasHash(i): if i in state.Inventory and not dandelion_ins.hasHash(i):
continue continue
if extend_dandelion_stem and not dandelion.instance.hasHash(i): if extend_dandelion_stem and not dandelion_ins.hasHash(i):
dandelion.instance.addHash(i, self) dandelion_ins.addHash(i, self)
self.handleReceivedInventory(i) self.handleReceivedInventory(i)
return True return True
@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError: except KeyError:
pass pass
if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash( if self.object.inventoryHash in state.Inventory and dandelion_ins.hasHash(
self.object.inventoryHash): self.object.inventoryHash):
dandelion.instance.removeHash( dandelion_ins.removeHash(
self.object.inventoryHash, "cycle detection") self.object.inventoryHash, "cycle detection")
state.Inventory[self.object.inventoryHash] = ( state.Inventory[self.object.inventoryHash] = (
@ -541,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
if not self.isOutbound: if not self.isOutbound:
self.append_write_buf(protocol.assembleVersionMessage( self.append_write_buf(protocol.assembleVersionMessage(
self.destination.host, self.destination.port, self.destination.host, self.destination.port,
connectionpool.pool.streams, True, connectionpool.pool.streams, dandelion_ins.enabled, True,
nodeid=self.nodeid)) nodeid=self.nodeid))
logger.debug( logger.debug(
'%(host)s:%(port)i sending version', '%(host)s:%(port)i sending version',

View File

@ -7,9 +7,6 @@ from random import choice, expovariate, sample
from threading import RLock from threading import RLock
from time import time from time import time
import connectionpool
import state
from queues import invQueue
# randomise routes after 600 seconds # randomise routes after 600 seconds
REASSIGN_INTERVAL = 600 REASSIGN_INTERVAL = 600
@ -37,6 +34,8 @@ class Dandelion: # pylint: disable=old-style-class
# when to rerandomise routes # when to rerandomise routes
self.refresh = time() + REASSIGN_INTERVAL self.refresh = time() + REASSIGN_INTERVAL
self.lock = RLock() self.lock = RLock()
self.enabled = None
self.pool = None
@staticmethod @staticmethod
def poissonTimeout(start=None, average=0): def poissonTimeout(start=None, average=0):
@ -47,10 +46,23 @@ class Dandelion: # pylint: disable=old-style-class
average = FLUFF_TRIGGER_MEAN_DELAY average = FLUFF_TRIGGER_MEAN_DELAY
return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY
def init_pool(self, pool):
"""pass pool instance"""
self.pool = pool
def init_dandelion_enabled(self, config):
"""Check if Dandelion is enabled and set value in enabled attribute"""
dandelion_enabled = config.safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them,
# stem objects will get stuck forever
if not config.safeGetBoolean(
'bitmessagesettings', 'sendoutgoingconnections'):
dandelion_enabled = 0
self.enabled = dandelion_enabled
def addHash(self, hashId, source=None, stream=1): def addHash(self, hashId, source=None, stream=1):
"""Add inventory vector to dandelion stem""" """Add inventory vector to dandelion stem return status of dandelion enabled"""
if not state.dandelion_enabled: assert self.enabled is not None
return
with self.lock: with self.lock:
self.hashMap[hashId] = Stem( self.hashMap[hashId] = Stem(
self.getNodeStem(source), self.getNodeStem(source),
@ -89,7 +101,7 @@ class Dandelion: # pylint: disable=old-style-class
"""Child (i.e. next) node for an inventory vector during stem mode""" """Child (i.e. next) node for an inventory vector during stem mode"""
return self.hashMap[hashId].child return self.hashMap[hashId].child
def maybeAddStem(self, connection): def maybeAddStem(self, connection, invQueue):
""" """
If we had too few outbound connections, add the current one to the If we had too few outbound connections, add the current one to the
current stem list. Dandelion as designed by the authors should current stem list. Dandelion as designed by the authors should
@ -163,7 +175,7 @@ class Dandelion: # pylint: disable=old-style-class
self.nodeMap[node] = self.pickStem(node) self.nodeMap[node] = self.pickStem(node)
return self.nodeMap[node] return self.nodeMap[node]
def expire(self): def expire(self, invQueue):
"""Switch expired objects from stem to fluff mode""" """Switch expired objects from stem to fluff mode"""
with self.lock: with self.lock:
deadline = time() deadline = time()
@ -179,19 +191,18 @@ class Dandelion: # pylint: disable=old-style-class
def reRandomiseStems(self): def reRandomiseStems(self):
"""Re-shuffle stem mapping (parent <-> child pairs)""" """Re-shuffle stem mapping (parent <-> child pairs)"""
assert self.pool is not None
if self.refresh > time():
return
with self.lock: with self.lock:
try: try:
# random two connections # random two connections
self.stem = sample( self.stem = sample(
connectionpool.BMConnectionPool( self.pool.outboundConnections.values(), MAX_STEMS)
).outboundConnections.values(), MAX_STEMS)
# not enough stems available # not enough stems available
except ValueError: except ValueError:
self.stem = connectionpool.BMConnectionPool( self.stem = self.pool.outboundConnections.values()
).outboundConnections.values()
self.nodeMap = {} self.nodeMap = {}
# hashMap stays to cater for pending stems # hashMap stays to cater for pending stems
self.refresh = time() + REASSIGN_INTERVAL self.refresh = time() + REASSIGN_INTERVAL
instance = Dandelion()

View File

@ -6,8 +6,8 @@ import state
import addresses import addresses
import helper_random import helper_random
import protocol import protocol
import dandelion
import connectionpool import connectionpool
from network import dandelion_ins
from objectracker import missingObjects from objectracker import missingObjects
from threads import StoppableThread from threads import StoppableThread
@ -60,7 +60,7 @@ class DownloadThread(StoppableThread):
payload = bytearray() payload = bytearray()
chunkCount = 0 chunkCount = 0
for chunk in request: for chunk in request:
if chunk in state.Inventory and not dandelion.instance.hasHash(chunk): if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
try: try:
del i.objectsNewToMe[chunk] del i.objectsNewToMe[chunk]
except KeyError: except KeyError:

View File

@ -8,8 +8,8 @@ from time import time
import addresses import addresses
import protocol import protocol
import state import state
import dandelion
import connectionpool import connectionpool
from network import dandelion_ins
from queues import invQueue from queues import invQueue
from threads import StoppableThread from threads import StoppableThread
@ -40,10 +40,10 @@ class InvThread(StoppableThread):
@staticmethod @staticmethod
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
dandelion.instance.addHash(hashId, stream=stream) dandelion_ins.addHash(hashId, stream=stream)
for connection in connectionpool.pool.connections(): for connection in connectionpool.pool.connections():
if state.dandelion_enabled and connection != \ if dandelion_ins.enabled and connection != \
dandelion.instance.objectChildStem(hashId): dandelion_ins.objectChildStem(hashId):
continue continue
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
@ -52,7 +52,7 @@ class InvThread(StoppableThread):
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
handleExpiredDandelion(dandelion.instance.expire()) handleExpiredDandelion(dandelion_ins.expire(invQueue))
try: try:
data = invQueue.get(False) data = invQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
@ -75,10 +75,10 @@ class InvThread(StoppableThread):
except KeyError: except KeyError:
continue continue
try: try:
if connection == dandelion.instance.objectChildStem(inv[1]): if connection == dandelion_ins.objectChildStem(inv[1]):
# Fluff trigger by RNG # Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off # auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311 if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & protocol.NODE_DANDELION > 0:
@ -105,7 +105,6 @@ class InvThread(StoppableThread):
for _ in range(len(chunk)): for _ in range(len(chunk)):
invQueue.task_done() invQueue.task_done()
if dandelion.instance.refresh < time(): dandelion_ins.reRandomiseStems()
dandelion.instance.reRandomiseStems()
self.stop.wait(1) self.stop.wait(1)

View File

@ -4,8 +4,8 @@ Module for tracking objects
import time import time
from threading import RLock from threading import RLock
import dandelion
import connectionpool import connectionpool
from network import dandelion_ins
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
haveBloom = False haveBloom = False
@ -107,14 +107,14 @@ class ObjectTracker(object):
del i.objectsNewToMe[hashid] del i.objectsNewToMe[hashid]
except KeyError: except KeyError:
if streamNumber in i.streams and ( if streamNumber in i.streams and (
not dandelion.instance.hasHash(hashid) not dandelion_ins.hasHash(hashid)
or dandelion.instance.objectChildStem(hashid) == i): or dandelion_ins.objectChildStem(hashid) == i):
with i.objectsNewToThemLock: with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time() i.objectsNewToThem[hashid] = time.time()
# update stream number, # update stream number,
# which we didn't have when we just received the dinv # which we didn't have when we just received the dinv
# also resets expiration of the stem mode # also resets expiration of the stem mode
dandelion.instance.setHashStream(hashid, streamNumber) dandelion_ins.setHashStream(hashid, streamNumber)
if i == self: if i == self:
try: try:

View File

@ -15,10 +15,10 @@ import helper_random
import l10n import l10n
import protocol import protocol
import state import state
import dandelion
import connectionpool import connectionpool
from bmconfigparser import config from bmconfigparser import config
from highlevelcrypto import randomBytes from highlevelcrypto import randomBytes
from network import dandelion_ins
from queues import invQueue, receiveDataQueue, UISignalQueue from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate from tr import _translate
@ -169,7 +169,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination) knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
dandelion.instance.maybeAddStem(self) dandelion_ins.maybeAddStem(self, invQueue)
self.sendAddr() self.sendAddr()
self.sendBigInv() self.sendBigInv()
@ -231,7 +231,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock: with self.objectsNewToThemLock:
for objHash in state.Inventory.unexpired_hashes_by_stream(stream): for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv # don't advertise stem objects on bigInv
if dandelion.instance.hasHash(objHash): if dandelion_ins.hasHash(objHash):
continue continue
bigInvList[objHash] = 0 bigInvList[objHash] = 0
objectCount = 0 objectCount = 0
@ -268,7 +268,7 @@ class TCPConnection(BMProto, TLSDispatcher):
self.append_write_buf( self.append_write_buf(
protocol.assembleVersionMessage( protocol.assembleVersionMessage(
self.destination.host, self.destination.port, self.destination.host, self.destination.port,
connectionpool.pool.streams, connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid)) False, nodeid=self.nodeid))
self.connectedAt = time.time() self.connectedAt = time.time()
receiveDataQueue.put(self.destination) receiveDataQueue.put(self.destination)
@ -293,7 +293,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if host_is_global: if host_is_global:
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
dandelion.instance.maybeRemoveStem(self) dandelion_ins.maybeRemoveStem(self)
else: else:
self.checkTimeOffsetNotification() self.checkTimeOffsetNotification()
if host_is_global: if host_is_global:
@ -319,7 +319,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection):
self.append_write_buf( self.append_write_buf(
protocol.assembleVersionMessage( protocol.assembleVersionMessage(
self.destination.host, self.destination.port, self.destination.host, self.destination.port,
connectionpool.pool.streams, connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid)) False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size) self.set_state("bm_header", expectBytes=protocol.Header.size)
return True return True
@ -343,7 +343,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection):
self.append_write_buf( self.append_write_buf(
protocol.assembleVersionMessage( protocol.assembleVersionMessage(
self.destination.host, self.destination.port, self.destination.host, self.destination.port,
connectionpool.pool.streams, connectionpool.pool.streams, dandelion_ins.enabled,
False, nodeid=self.nodeid)) False, nodeid=self.nodeid))
self.set_state("bm_header", expectBytes=protocol.Header.size) self.set_state("bm_header", expectBytes=protocol.Header.size)
return True return True

View File

@ -6,9 +6,9 @@ import time
import helper_random import helper_random
import protocol import protocol
import state import state
import dandelion
import connectionpool import connectionpool
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from network import dandelion_ins
from threads import StoppableThread from threads import StoppableThread
@ -41,8 +41,8 @@ class UploadThread(StoppableThread):
chunk_count = 0 chunk_count = 0
for chunk in request: for chunk in request:
del i.pendingUpload[chunk] del i.pendingUpload[chunk]
if dandelion.instance.hasHash(chunk) and \ if dandelion_ins.hasHash(chunk) and \
i != dandelion.instance.objectChildStem(chunk): i != dandelion_ins.objectChildStem(chunk):
i.antiIntersectionDelay() i.antiIntersectionDelay()
self.logger.info( self.logger.info(
'%s asked for a stem object we didn\'t offer to it.', '%s asked for a stem object we didn\'t offer to it.',

View File

@ -336,8 +336,8 @@ def assembleAddrMessage(peerList):
return retval return retval
def assembleVersionMessage( def assembleVersionMessage( # pylint: disable=too-many-arguments
remoteHost, remotePort, participatingStreams, server=False, nodeid=None remoteHost, remotePort, participatingStreams, dandelion_enabled=True, server=False, nodeid=None,
): ):
""" """
Construct the payload of a version message, Construct the payload of a version message,
@ -350,7 +350,7 @@ def assembleVersionMessage(
'>q', '>q',
NODE_NETWORK NODE_NETWORK
| (NODE_SSL if haveSSL(server) else 0) | (NODE_SSL if haveSSL(server) else 0)
| (NODE_DANDELION if state.dandelion_enabled else 0) | (NODE_DANDELION if dandelion_enabled else 0)
) )
payload += pack('>q', int(time.time())) payload += pack('>q', int(time.time()))
@ -374,7 +374,7 @@ def assembleVersionMessage(
'>q', '>q',
NODE_NETWORK NODE_NETWORK
| (NODE_SSL if haveSSL(server) else 0) | (NODE_SSL if haveSSL(server) else 0)
| (NODE_DANDELION if state.dandelion_enabled else 0) | (NODE_DANDELION if dandelion_enabled else 0)
) )
# = 127.0.0.1. This will be ignored by the remote host. # = 127.0.0.1. This will be ignored by the remote host.
# The actual remote connected IP will be used. # The actual remote connected IP will be used.

View File

@ -43,8 +43,6 @@ ownAddresses = {}
discoveredPeers = {} discoveredPeers = {}
dandelion_enabled = 0
kivy = False kivy = False
kivyapp = None kivyapp = None

View File

@ -319,16 +319,17 @@ class TestCore(unittest.TestCase):
def test_version(self): def test_version(self):
"""check encoding/decoding of the version message""" """check encoding/decoding of the version message"""
dandelion_enabled = True
# with single stream # with single stream
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1]) msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1], dandelion_enabled)
decoded = self._decode_msg(msg, "IQQiiQlsLv") decoded = self._decode_msg(msg, "IQQiiQlsLv")
peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:] peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:]
self.assertEqual( self.assertEqual(
peer, Node(11 if state.dandelion_enabled else 3, '127.0.0.1', 8444)) peer, Node(11 if dandelion_enabled else 3, '127.0.0.1', 8444))
self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/') self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/')
self.assertEqual(streams, [1]) self.assertEqual(streams, [1])
# with multiple streams # with multiple streams
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3]) msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3], dandelion_enabled)
decoded = self._decode_msg(msg, "IQQiiQlslv") decoded = self._decode_msg(msg, "IQQiiQlslv")
peer, _, ua = decoded[4:7] peer, _, ua = decoded[4:7]
streams = decoded[7:] streams = decoded[7:]