moved Dandelion in state - global runtime variable from singleton #2216

Merged
anand-skss merged 2 commits from test into v0.6 2024-04-22 07:13:39 +02:00
13 changed files with 38 additions and 42 deletions

View File

@ -156,12 +156,12 @@ class Main(object):
set_thread_name("PyBitmessage") set_thread_name("PyBitmessage")
state.dandelion = config.safeGetInt('network', 'dandelion') state.dandelion_enabled = config.safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them, # dandelion requires outbound connections, without them,
# stem objects will get stuck forever # stem objects will get stuck forever
if state.dandelion and not config.safeGetBoolean( if state.dandelion_enabled and not config.safeGetBoolean(
'bitmessagesettings', 'sendoutgoingconnections'): 'bitmessagesettings', 'sendoutgoingconnections'):
state.dandelion = 0 state.dandelion_enabled = 0
if state.testmode or config.safeGetBoolean( if state.testmode or config.safeGetBoolean(
'bitmessagesettings', 'extralowdifficulty'): 'bitmessagesettings', 'extralowdifficulty'):

View File

@ -16,6 +16,7 @@ __all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"]
def start(config, state): def start(config, state):
"""Start network threads""" """Start network threads"""
import state
from .addrthread import AddrThread from .addrthread import AddrThread
from .dandelion import Dandelion from .dandelion import Dandelion
from .downloadthread import DownloadThread from .downloadthread import DownloadThread
@ -27,7 +28,7 @@ def start(config, state):
readKnownNodes() readKnownNodes()
# init, needs to be early because other thread may access it early # init, needs to be early because other thread may access it early
Dandelion() state.Dandelion = Dandelion()
BMConnectionPool().connectToStream(1) BMConnectionPool().connectToStream(1)
for thread in ( for thread in (
BMNetworkThread(), InvThread(), AddrThread(), BMNetworkThread(), InvThread(), AddrThread(),

View File

@ -7,7 +7,6 @@ import time
import protocol import protocol
import state import state
from highlevelcrypto import calculateInventoryHash from highlevelcrypto import calculateInventoryHash
from network.dandelion import Dandelion
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -112,7 +111,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
or advertise it unnecessarily) or advertise it unnecessarily)
""" """
# if it's a stem duplicate, pretend we don't have it # if it's a stem duplicate, pretend we don't have it
if Dandelion().hasHash(self.inventoryHash): if state.Dandelion.hasHash(self.inventoryHash):
return return
if self.inventoryHash in state.Inventory: if self.inventoryHash in state.Inventory:
raise BMObjectAlreadyHaveError() raise BMObjectAlreadyHaveError()

View File

@ -25,7 +25,6 @@ from network.bmobject import (
BMObjectInsufficientPOWError, BMObjectInvalidError, BMObjectInsufficientPOWError, BMObjectInvalidError,
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.dandelion import Dandelion
from network.proxy import ProxyError from network.proxy import ProxyError
from node import Node, Peer from node import Node, Peer
@ -351,14 +350,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
raise BMProtoExcessiveDataError() raise BMProtoExcessiveDataError()
# ignore dinv if dandelion turned off # ignore dinv if dandelion turned off
if dandelion and not state.dandelion: if dandelion and not state.dandelion_enabled:
return True return True
for i in map(str, items): for i in map(str, items):
if i in state.Inventory and not Dandelion().hasHash(i): if i in state.Inventory and not state.Dandelion.hasHash(i):
continue continue
if dandelion and not Dandelion().hasHash(i): if dandelion and not state.Dandelion.hasHash(i):
Dandelion().addHash(i, self) state.Dandelion.addHash(i, self)
self.handleReceivedInventory(i) self.handleReceivedInventory(i)
return True return True
@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError: except KeyError:
pass pass
if self.object.inventoryHash in state.Inventory and Dandelion().hasHash( if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash(
self.object.inventoryHash): self.object.inventoryHash):
Dandelion().removeHash( state.Dandelion.removeHash(
self.object.inventoryHash, "cycle detection") self.object.inventoryHash, "cycle detection")
state.Inventory[self.object.inventoryHash] = ( state.Inventory[self.object.inventoryHash] = (

View File

@ -10,7 +10,6 @@ from time import time
import connectionpool import connectionpool
import state import state
from queues import invQueue from queues import invQueue
from singleton import Singleton
# randomise routes after 600 seconds # randomise routes after 600 seconds
REASSIGN_INTERVAL = 600 REASSIGN_INTERVAL = 600
@ -26,7 +25,6 @@ Stem = namedtuple('Stem', ['child', 'stream', 'timeout'])
logger = logging.getLogger('default') logger = logging.getLogger('default')
@Singleton
class Dandelion: # pylint: disable=old-style-class class Dandelion: # pylint: disable=old-style-class
"""Dandelion class for tracking stem/fluff stages.""" """Dandelion class for tracking stem/fluff stages."""
def __init__(self): def __init__(self):
@ -51,7 +49,7 @@ class Dandelion: # pylint: disable=old-style-class
def addHash(self, hashId, source=None, stream=1): def addHash(self, hashId, source=None, stream=1):
"""Add inventory vector to dandelion stem""" """Add inventory vector to dandelion stem"""
if not state.dandelion: if not state.dandelion_enabled:
return return
with self.lock: with self.lock:
self.hashMap[hashId] = Stem( self.hashMap[hashId] = Stem(

View File

@ -6,7 +6,6 @@ import state
import addresses import addresses
import helper_random import helper_random
import protocol import protocol
from dandelion import Dandelion
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from objectracker import missingObjects from objectracker import missingObjects
from threads import StoppableThread from threads import StoppableThread
@ -60,7 +59,7 @@ class DownloadThread(StoppableThread):
payload = bytearray() payload = bytearray()
chunkCount = 0 chunkCount = 0
for chunk in request: for chunk in request:
if chunk in state.Inventory and not Dandelion().hasHash(chunk): if chunk in state.Inventory and not state.Dandelion.hasHash(chunk):
try: try:
del i.objectsNewToMe[chunk] del i.objectsNewToMe[chunk]
except KeyError: except KeyError:

View File

@ -9,7 +9,6 @@ import addresses
import protocol import protocol
import state import state
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from queues import invQueue from queues import invQueue
from threads import StoppableThread from threads import StoppableThread
@ -40,10 +39,10 @@ class InvThread(StoppableThread):
@staticmethod @staticmethod
def handleLocallyGenerated(stream, hashId): def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling""" """Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream) state.Dandelion.addHash(hashId, stream=stream)
for connection in BMConnectionPool().connections(): for connection in BMConnectionPool().connections():
if state.dandelion and connection != \ if state.dandelion_enabled and connection != \
Dandelion().objectChildStem(hashId): state.Dandelion.objectChildStem(hashId):
continue continue
connection.objectsNewToThem[hashId] = time() connection.objectsNewToThem[hashId] = time()
@ -52,7 +51,7 @@ class InvThread(StoppableThread):
chunk = [] chunk = []
while True: while True:
# Dandelion fluff trigger by expiration # Dandelion fluff trigger by expiration
handleExpiredDandelion(Dandelion().expire()) handleExpiredDandelion(state.Dandelion.expire())
try: try:
data = invQueue.get(False) data = invQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
@ -75,10 +74,10 @@ class InvThread(StoppableThread):
except KeyError: except KeyError:
continue continue
try: try:
if connection == Dandelion().objectChildStem(inv[1]): if connection == state.Dandelion.objectChildStem(inv[1]):
# Fluff trigger by RNG # Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off # auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion: # nosec B311 if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & protocol.NODE_DANDELION > 0:
@ -105,7 +104,7 @@ class InvThread(StoppableThread):
for _ in range(len(chunk)): for _ in range(len(chunk)):
invQueue.task_done() invQueue.task_done()
if Dandelion().refresh < time(): if state.Dandelion.refresh < time():
Dandelion().reRandomiseStems() state.Dandelion.reRandomiseStems()
self.stop.wait(1) self.stop.wait(1)

View File

@ -4,8 +4,8 @@ Module for tracking objects
import time import time
from threading import RLock from threading import RLock
import state
import network.connectionpool import network.connectionpool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
haveBloom = False haveBloom = False
@ -107,14 +107,14 @@ class ObjectTracker(object):
del i.objectsNewToMe[hashid] del i.objectsNewToMe[hashid]
except KeyError: except KeyError:
if streamNumber in i.streams and ( if streamNumber in i.streams and (
not Dandelion().hasHash(hashid) not state.Dandelion.hasHash(hashid)
or Dandelion().objectChildStem(hashid) == i): or state.Dandelion.objectChildStem(hashid) == i):
with i.objectsNewToThemLock: with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time() i.objectsNewToThem[hashid] = time.time()
# update stream number, # update stream number,
# which we didn't have when we just received the dinv # which we didn't have when we just received the dinv
# also resets expiration of the stem mode # also resets expiration of the stem mode
Dandelion().setHashStream(hashid, streamNumber) state.Dandelion.setHashStream(hashid, streamNumber)
if i == self: if i == self:
try: try:

View File

@ -25,7 +25,6 @@ import connectionpool
import knownnodes import knownnodes
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProto from network.bmproto import BMProto
from network.dandelion import Dandelion
from network.objectracker import ObjectTracker from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection from network.socks5 import Socks5Connection
@ -169,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination) knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
Dandelion().maybeAddStem(self) state.Dandelion.maybeAddStem(self)
self.sendAddr() self.sendAddr()
self.sendBigInv() self.sendBigInv()
@ -231,7 +230,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock: with self.objectsNewToThemLock:
for objHash in state.Inventory.unexpired_hashes_by_stream(stream): for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv # don't advertise stem objects on bigInv
if Dandelion().hasHash(objHash): if state.Dandelion.hasHash(objHash):
continue continue
bigInvList[objHash] = 0 bigInvList[objHash] = 0
objectCount = 0 objectCount = 0
@ -293,7 +292,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if host_is_global: if host_is_global:
knownnodes.addKnownNode( knownnodes.addKnownNode(
self.streams, self.destination, time.time()) self.streams, self.destination, time.time())
Dandelion().maybeRemoveStem(self) state.Dandelion.maybeRemoveStem(self)
else: else:
self.checkTimeOffsetNotification() self.checkTimeOffsetNotification()
if host_is_global: if host_is_global:

View File

@ -7,7 +7,6 @@ import helper_random
import protocol import protocol
import state import state
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread from threads import StoppableThread
@ -41,8 +40,8 @@ class UploadThread(StoppableThread):
chunk_count = 0 chunk_count = 0
for chunk in request: for chunk in request:
del i.pendingUpload[chunk] del i.pendingUpload[chunk]
if Dandelion().hasHash(chunk) and \ if state.Dandelion.hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk): i != state.Dandelion.objectChildStem(chunk):
i.antiIntersectionDelay() i.antiIntersectionDelay()
self.logger.info( self.logger.info(
'%s asked for a stem object we didn\'t offer to it.', '%s asked for a stem object we didn\'t offer to it.',

View File

@ -351,7 +351,7 @@ def assembleVersionMessage(
'>q', '>q',
NODE_NETWORK NODE_NETWORK
| (NODE_SSL if haveSSL(server) else 0) | (NODE_SSL if haveSSL(server) else 0)
| (NODE_DANDELION if state.dandelion else 0) | (NODE_DANDELION if state.dandelion_enabled else 0)
) )
payload += pack('>q', int(time.time())) payload += pack('>q', int(time.time()))
@ -375,7 +375,7 @@ def assembleVersionMessage(
'>q', '>q',
NODE_NETWORK NODE_NETWORK
| (NODE_SSL if haveSSL(server) else 0) | (NODE_SSL if haveSSL(server) else 0)
| (NODE_DANDELION if state.dandelion else 0) | (NODE_DANDELION if state.dandelion_enabled else 0)
) )
# = 127.0.0.1. This will be ignored by the remote host. # = 127.0.0.1. This will be ignored by the remote host.
# The actual remote connected IP will be used. # The actual remote connected IP will be used.

View File

@ -44,7 +44,7 @@ ownAddresses = {}
discoveredPeers = {} discoveredPeers = {}
dandelion = 0 dandelion_enabled = 0
kivy = False kivy = False
@ -97,3 +97,6 @@ class Placeholder(object): # pylint:disable=too-few-public-methods
Inventory = Placeholder("Inventory") Inventory = Placeholder("Inventory")
Dandelion = Placeholder("Dandelion")

View File

@ -324,7 +324,7 @@ class TestCore(unittest.TestCase):
decoded = self._decode_msg(msg, "IQQiiQlsLv") decoded = self._decode_msg(msg, "IQQiiQlsLv")
peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:] peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:]
self.assertEqual( self.assertEqual(
peer, Node(11 if state.dandelion else 3, '127.0.0.1', 8444)) peer, Node(11 if state.dandelion_enabled else 3, '127.0.0.1', 8444))
self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/') self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/')
self.assertEqual(streams, [1]) self.assertEqual(streams, [1])
# with multiple streams # with multiple streams