moved Dandelion in state - global runtime variable from singleton #2216
|
@ -156,12 +156,12 @@ class Main(object):
|
|||
|
||||
set_thread_name("PyBitmessage")
|
||||
|
||||
state.dandelion = config.safeGetInt('network', 'dandelion')
|
||||
state.dandelion_enabled = config.safeGetInt('network', 'dandelion')
|
||||
# dandelion requires outbound connections, without them,
|
||||
# stem objects will get stuck forever
|
||||
if state.dandelion and not config.safeGetBoolean(
|
||||
if state.dandelion_enabled and not config.safeGetBoolean(
|
||||
'bitmessagesettings', 'sendoutgoingconnections'):
|
||||
state.dandelion = 0
|
||||
state.dandelion_enabled = 0
|
||||
|
||||
if state.testmode or config.safeGetBoolean(
|
||||
'bitmessagesettings', 'extralowdifficulty'):
|
||||
|
|
|
@ -16,6 +16,7 @@ __all__ = ["AnnounceThread", "BMConnectionPool", "StoppableThread"]
|
|||
|
||||
def start(config, state):
|
||||
"""Start network threads"""
|
||||
import state
|
||||
from .addrthread import AddrThread
|
||||
from .dandelion import Dandelion
|
||||
from .downloadthread import DownloadThread
|
||||
|
@ -27,7 +28,7 @@ def start(config, state):
|
|||
|
||||
readKnownNodes()
|
||||
# init, needs to be early because other thread may access it early
|
||||
Dandelion()
|
||||
state.Dandelion = Dandelion()
|
||||
BMConnectionPool().connectToStream(1)
|
||||
for thread in (
|
||||
BMNetworkThread(), InvThread(), AddrThread(),
|
||||
|
|
|
@ -7,7 +7,6 @@ import time
|
|||
import protocol
|
||||
import state
|
||||
from highlevelcrypto import calculateInventoryHash
|
||||
from network.dandelion import Dandelion
|
||||
|
||||
logger = logging.getLogger('default')
|
||||
|
||||
|
@ -112,7 +111,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
|
|||
or advertise it unnecessarily)
|
||||
"""
|
||||
# if it's a stem duplicate, pretend we don't have it
|
||||
if Dandelion().hasHash(self.inventoryHash):
|
||||
if state.Dandelion.hasHash(self.inventoryHash):
|
||||
return
|
||||
if self.inventoryHash in state.Inventory:
|
||||
raise BMObjectAlreadyHaveError()
|
||||
|
|
|
@ -25,7 +25,6 @@ from network.bmobject import (
|
|||
BMObjectInsufficientPOWError, BMObjectInvalidError,
|
||||
BMObjectUnwantedStreamError
|
||||
)
|
||||
from network.dandelion import Dandelion
|
||||
from network.proxy import ProxyError
|
||||
|
||||
from node import Node, Peer
|
||||
|
@ -351,14 +350,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
raise BMProtoExcessiveDataError()
|
||||
|
||||
# ignore dinv if dandelion turned off
|
||||
if dandelion and not state.dandelion:
|
||||
if dandelion and not state.dandelion_enabled:
|
||||
return True
|
||||
|
||||
for i in map(str, items):
|
||||
if i in state.Inventory and not Dandelion().hasHash(i):
|
||||
if i in state.Inventory and not state.Dandelion.hasHash(i):
|
||||
continue
|
||||
if dandelion and not Dandelion().hasHash(i):
|
||||
Dandelion().addHash(i, self)
|
||||
if dandelion and not state.Dandelion.hasHash(i):
|
||||
state.Dandelion.addHash(i, self)
|
||||
self.handleReceivedInventory(i)
|
||||
|
||||
return True
|
||||
|
@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.object.inventoryHash in state.Inventory and Dandelion().hasHash(
|
||||
if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash(
|
||||
self.object.inventoryHash):
|
||||
Dandelion().removeHash(
|
||||
state.Dandelion.removeHash(
|
||||
self.object.inventoryHash, "cycle detection")
|
||||
|
||||
state.Inventory[self.object.inventoryHash] = (
|
||||
|
|
|
@ -10,7 +10,6 @@ from time import time
|
|||
import connectionpool
|
||||
import state
|
||||
from queues import invQueue
|
||||
from singleton import Singleton
|
||||
|
||||
# randomise routes after 600 seconds
|
||||
REASSIGN_INTERVAL = 600
|
||||
|
@ -26,7 +25,6 @@ Stem = namedtuple('Stem', ['child', 'stream', 'timeout'])
|
|||
logger = logging.getLogger('default')
|
||||
|
||||
|
||||
@Singleton
|
||||
class Dandelion: # pylint: disable=old-style-class
|
||||
"""Dandelion class for tracking stem/fluff stages."""
|
||||
def __init__(self):
|
||||
|
@ -51,7 +49,7 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
|
||||
def addHash(self, hashId, source=None, stream=1):
|
||||
"""Add inventory vector to dandelion stem"""
|
||||
if not state.dandelion:
|
||||
if not state.dandelion_enabled:
|
||||
return
|
||||
with self.lock:
|
||||
self.hashMap[hashId] = Stem(
|
||||
|
|
|
@ -6,7 +6,6 @@ import state
|
|||
import addresses
|
||||
import helper_random
|
||||
import protocol
|
||||
from dandelion import Dandelion
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from objectracker import missingObjects
|
||||
from threads import StoppableThread
|
||||
|
@ -60,7 +59,7 @@ class DownloadThread(StoppableThread):
|
|||
payload = bytearray()
|
||||
chunkCount = 0
|
||||
for chunk in request:
|
||||
if chunk in state.Inventory and not Dandelion().hasHash(chunk):
|
||||
if chunk in state.Inventory and not state.Dandelion.hasHash(chunk):
|
||||
try:
|
||||
del i.objectsNewToMe[chunk]
|
||||
except KeyError:
|
||||
|
|
|
@ -9,7 +9,6 @@ import addresses
|
|||
import protocol
|
||||
import state
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from network.dandelion import Dandelion
|
||||
from queues import invQueue
|
||||
from threads import StoppableThread
|
||||
|
||||
|
@ -40,10 +39,10 @@ class InvThread(StoppableThread):
|
|||
@staticmethod
|
||||
def handleLocallyGenerated(stream, hashId):
|
||||
"""Locally generated inventory items require special handling"""
|
||||
Dandelion().addHash(hashId, stream=stream)
|
||||
state.Dandelion.addHash(hashId, stream=stream)
|
||||
for connection in BMConnectionPool().connections():
|
||||
if state.dandelion and connection != \
|
||||
Dandelion().objectChildStem(hashId):
|
||||
if state.dandelion_enabled and connection != \
|
||||
state.Dandelion.objectChildStem(hashId):
|
||||
continue
|
||||
connection.objectsNewToThem[hashId] = time()
|
||||
|
||||
|
@ -52,7 +51,7 @@ class InvThread(StoppableThread):
|
|||
chunk = []
|
||||
while True:
|
||||
# Dandelion fluff trigger by expiration
|
||||
handleExpiredDandelion(Dandelion().expire())
|
||||
handleExpiredDandelion(state.Dandelion.expire())
|
||||
try:
|
||||
data = invQueue.get(False)
|
||||
chunk.append((data[0], data[1]))
|
||||
|
@ -75,10 +74,10 @@ class InvThread(StoppableThread):
|
|||
except KeyError:
|
||||
continue
|
||||
try:
|
||||
if connection == Dandelion().objectChildStem(inv[1]):
|
||||
if connection == state.Dandelion.objectChildStem(inv[1]):
|
||||
# Fluff trigger by RNG
|
||||
# auto-ignore if config set to 0, i.e. dandelion is off
|
||||
if random.randint(1, 100) >= state.dandelion: # nosec B311
|
||||
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
|
||||
fluffs.append(inv[1])
|
||||
# send a dinv only if the stem node supports dandelion
|
||||
elif connection.services & protocol.NODE_DANDELION > 0:
|
||||
|
@ -105,7 +104,7 @@ class InvThread(StoppableThread):
|
|||
for _ in range(len(chunk)):
|
||||
invQueue.task_done()
|
||||
|
||||
if Dandelion().refresh < time():
|
||||
Dandelion().reRandomiseStems()
|
||||
if state.Dandelion.refresh < time():
|
||||
state.Dandelion.reRandomiseStems()
|
||||
|
||||
self.stop.wait(1)
|
||||
|
|
|
@ -4,8 +4,8 @@ Module for tracking objects
|
|||
import time
|
||||
from threading import RLock
|
||||
|
||||
import state
|
||||
import network.connectionpool
|
||||
from network.dandelion import Dandelion
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
|
||||
haveBloom = False
|
||||
|
@ -107,14 +107,14 @@ class ObjectTracker(object):
|
|||
del i.objectsNewToMe[hashid]
|
||||
except KeyError:
|
||||
if streamNumber in i.streams and (
|
||||
not Dandelion().hasHash(hashid)
|
||||
or Dandelion().objectChildStem(hashid) == i):
|
||||
not state.Dandelion.hasHash(hashid)
|
||||
or state.Dandelion.objectChildStem(hashid) == i):
|
||||
with i.objectsNewToThemLock:
|
||||
i.objectsNewToThem[hashid] = time.time()
|
||||
# update stream number,
|
||||
# which we didn't have when we just received the dinv
|
||||
# also resets expiration of the stem mode
|
||||
Dandelion().setHashStream(hashid, streamNumber)
|
||||
state.Dandelion.setHashStream(hashid, streamNumber)
|
||||
|
||||
if i == self:
|
||||
try:
|
||||
|
|
|
@ -25,7 +25,6 @@ import connectionpool
|
|||
import knownnodes
|
||||
from network.advanceddispatcher import AdvancedDispatcher
|
||||
from network.bmproto import BMProto
|
||||
from network.dandelion import Dandelion
|
||||
from network.objectracker import ObjectTracker
|
||||
from network.socks4a import Socks4aConnection
|
||||
from network.socks5 import Socks5Connection
|
||||
|
@ -169,7 +168,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
knownnodes.increaseRating(self.destination)
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
Dandelion().maybeAddStem(self)
|
||||
state.Dandelion.maybeAddStem(self)
|
||||
self.sendAddr()
|
||||
self.sendBigInv()
|
||||
|
||||
|
@ -231,7 +230,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
with self.objectsNewToThemLock:
|
||||
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
|
||||
# don't advertise stem objects on bigInv
|
||||
if Dandelion().hasHash(objHash):
|
||||
if state.Dandelion.hasHash(objHash):
|
||||
continue
|
||||
bigInvList[objHash] = 0
|
||||
objectCount = 0
|
||||
|
@ -293,7 +292,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
if host_is_global:
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
Dandelion().maybeRemoveStem(self)
|
||||
state.Dandelion.maybeRemoveStem(self)
|
||||
else:
|
||||
self.checkTimeOffsetNotification()
|
||||
if host_is_global:
|
||||
|
|
|
@ -7,7 +7,6 @@ import helper_random
|
|||
import protocol
|
||||
import state
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from network.dandelion import Dandelion
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
from threads import StoppableThread
|
||||
|
||||
|
@ -41,8 +40,8 @@ class UploadThread(StoppableThread):
|
|||
chunk_count = 0
|
||||
for chunk in request:
|
||||
del i.pendingUpload[chunk]
|
||||
if Dandelion().hasHash(chunk) and \
|
||||
i != Dandelion().objectChildStem(chunk):
|
||||
if state.Dandelion.hasHash(chunk) and \
|
||||
i != state.Dandelion.objectChildStem(chunk):
|
||||
i.antiIntersectionDelay()
|
||||
self.logger.info(
|
||||
'%s asked for a stem object we didn\'t offer to it.',
|
||||
|
|
|
@ -351,7 +351,7 @@ def assembleVersionMessage(
|
|||
'>q',
|
||||
NODE_NETWORK
|
||||
| (NODE_SSL if haveSSL(server) else 0)
|
||||
| (NODE_DANDELION if state.dandelion else 0)
|
||||
| (NODE_DANDELION if state.dandelion_enabled else 0)
|
||||
)
|
||||
payload += pack('>q', int(time.time()))
|
||||
|
||||
|
@ -375,7 +375,7 @@ def assembleVersionMessage(
|
|||
'>q',
|
||||
NODE_NETWORK
|
||||
| (NODE_SSL if haveSSL(server) else 0)
|
||||
| (NODE_DANDELION if state.dandelion else 0)
|
||||
| (NODE_DANDELION if state.dandelion_enabled else 0)
|
||||
)
|
||||
# = 127.0.0.1. This will be ignored by the remote host.
|
||||
# The actual remote connected IP will be used.
|
||||
|
|
|
@ -44,7 +44,7 @@ ownAddresses = {}
|
|||
|
||||
discoveredPeers = {}
|
||||
|
||||
dandelion = 0
|
||||
dandelion_enabled = 0
|
||||
|
||||
kivy = False
|
||||
|
||||
|
@ -97,3 +97,6 @@ class Placeholder(object): # pylint:disable=too-few-public-methods
|
|||
|
||||
|
||||
Inventory = Placeholder("Inventory")
|
||||
|
||||
|
||||
Dandelion = Placeholder("Dandelion")
|
||||
|
|
|
@ -324,7 +324,7 @@ class TestCore(unittest.TestCase):
|
|||
decoded = self._decode_msg(msg, "IQQiiQlsLv")
|
||||
peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:]
|
||||
self.assertEqual(
|
||||
peer, Node(11 if state.dandelion else 3, '127.0.0.1', 8444))
|
||||
peer, Node(11 if state.dandelion_enabled else 3, '127.0.0.1', 8444))
|
||||
self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/')
|
||||
self.assertEqual(streams, [1])
|
||||
# with multiple streams
|
||||
|
|
Reference in New Issue
Block a user