moved Dandelion in state - global runtime variable from singleton

This commit is contained in:
anand k 2024-04-21 20:54:04 +05:30
parent 5faef8d40e
commit 1c8ae8fef3
No known key found for this signature in database
GPG Key ID: 515AC24FA525DDE0
10 changed files with 27 additions and 31 deletions

View File

@ -16,6 +16,7 @@ __all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"]
def start(config, state):
"""Start network threads"""
import state
from .addrthread import AddrThread
from .dandelion import Dandelion
from .downloadthread import DownloadThread
@ -27,7 +28,7 @@ def start(config, state):
readKnownNodes()
# init, needs to be early because other thread may access it early
Dandelion()
state.Dandelion = Dandelion()
BMConnectionPool().connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),

View File

@ -7,7 +7,6 @@ import time
import protocol
import state
from highlevelcrypto import calculateInventoryHash
from network.dandelion import Dandelion
logger = logging.getLogger('default')
@ -112,7 +111,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
or advertise it unnecessarily)
"""
# if it's a stem duplicate, pretend we don't have it
if Dandelion().hasHash(self.inventoryHash):
if state.Dandelion.hasHash(self.inventoryHash):
return
if self.inventoryHash in state.Inventory:
raise BMObjectAlreadyHaveError()

View File

@ -25,7 +25,6 @@ from network.bmobject import (
BMObjectInsufficientPOWError, BMObjectInvalidError,
BMObjectUnwantedStreamError
)
from network.dandelion import Dandelion
from network.proxy import ProxyError
from node import Node, Peer
@ -355,10 +354,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True
for i in map(str, items):
if i in state.Inventory and not Dandelion().hasHash(i):
if i in state.Inventory and not state.Dandelion.hasHash(i):
continue
if dandelion and not Dandelion().hasHash(i):
Dandelion().addHash(i, self)
if dandelion and not state.Dandelion.hasHash(i):
state.Dandelion.addHash(i, self)
self.handleReceivedInventory(i)
return True
@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError:
pass
if self.object.inventoryHash in state.Inventory and Dandelion().hasHash(
if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash(
self.object.inventoryHash):
Dandelion().removeHash(
state.Dandelion.removeHash(
self.object.inventoryHash, "cycle detection")
state.Inventory[self.object.inventoryHash] = (

View File

@ -10,7 +10,6 @@ from time import time
import connectionpool
import state
from queues import invQueue
from singleton import Singleton
# randomise routes after 600 seconds
REASSIGN_INTERVAL = 600
@ -26,7 +25,6 @@ Stem = namedtuple('Stem', ['child', 'stream', 'timeout'])
logger = logging.getLogger('default')
@Singleton
class Dandelion: # pylint: disable=old-style-class
"""Dandelion class for tracking stem/fluff stages."""
def __init__(self):

View File

@ -6,7 +6,6 @@ import state
import addresses
import helper_random
import protocol
from dandelion import Dandelion
from network.connectionpool import BMConnectionPool
from objectracker import missingObjects
from threads import StoppableThread
@ -60,7 +59,7 @@ class DownloadThread(StoppableThread):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in state.Inventory and not Dandelion().hasHash(chunk):
if chunk in state.Inventory and not state.Dandelion.hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:

View File

@ -9,7 +9,6 @@ import addresses
import protocol
import state
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from queues import invQueue
from threads import StoppableThread
@ -40,10 +39,10 @@ class InvThread(StoppableThread):
@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream)
state.Dandelion.addHash(hashId, stream=stream)
for connection in BMConnectionPool().connections():
if state.dandelion and connection != \
Dandelion().objectChildStem(hashId):
state.Dandelion.objectChildStem(hashId):
continue
connection.objectsNewToThem[hashId] = time()
@ -52,7 +51,7 @@ class InvThread(StoppableThread):
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(Dandelion().expire())
handleExpiredDandelion(state.Dandelion.expire())
try:
data = invQueue.get(False)
chunk.append((data[0], data[1]))
@ -75,7 +74,7 @@ class InvThread(StoppableThread):
except KeyError:
continue
try:
if connection == Dandelion().objectChildStem(inv[1]):
if connection == state.Dandelion.objectChildStem(inv[1]):
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion: # nosec B311
@ -105,7 +104,7 @@ class InvThread(StoppableThread):
for _ in range(len(chunk)):
invQueue.task_done()
if Dandelion().refresh < time():
Dandelion().reRandomiseStems()
if state.Dandelion.refresh < time():
state.Dandelion.reRandomiseStems()
self.stop.wait(1)

View File

@ -4,8 +4,8 @@ Module for tracking objects
import time
from threading import RLock
import state
import network.connectionpool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
haveBloom = False
@ -107,14 +107,14 @@ class ObjectTracker(object):
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams and (
not Dandelion().hasHash(hashid)
or Dandelion().objectChildStem(hashid) == i):
not state.Dandelion.hasHash(hashid)
or state.Dandelion.objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time()
# update stream number,
# which we didn't have when we just received the dinv
# also resets expiration of the stem mode
Dandelion().setHashStream(hashid, streamNumber)
state.Dandelion.setHashStream(hashid, streamNumber)
if i == self:
try:

View File

@ -25,7 +25,6 @@ import connectionpool
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher
from network.bmproto import BMProto
from network.dandelion import Dandelion
from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection
@ -169,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
Dandelion().maybeAddStem(self)
state.Dandelion.maybeAddStem(self)
self.sendAddr()
self.sendBigInv()
@ -231,7 +230,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock:
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv
if Dandelion().hasHash(objHash):
if state.Dandelion.hasHash(objHash):
continue
bigInvList[objHash] = 0
objectCount = 0
@ -293,7 +292,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if host_is_global:
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
Dandelion().maybeRemoveStem(self)
state.Dandelion.maybeRemoveStem(self)
else:
self.checkTimeOffsetNotification()
if host_is_global:

View File

@ -7,7 +7,6 @@ import helper_random
import protocol
import state
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread
@ -41,8 +40,8 @@ class UploadThread(StoppableThread):
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if Dandelion().hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk):
if state.Dandelion.hasHash(chunk) and \
i != state.Dandelion.objectChildStem(chunk):
i.antiIntersectionDelay()
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',

View File

@ -97,3 +97,6 @@ class Placeholder(object): # pylint:disable=too-few-public-methods
Inventory = Placeholder("Inventory")
Dandelion = Placeholder("Dandelion")