Moved dandelion runtime var from state to network->dandelion
This commit is contained in:
parent
3a04e351cc
commit
a209d65a26
|
@ -14,7 +14,6 @@ def start(config, state):
|
|||
from .announcethread import AnnounceThread
|
||||
import connectionpool # pylint: disable=relative-import
|
||||
from .addrthread import AddrThread
|
||||
from .dandelion import Dandelion
|
||||
from .downloadthread import DownloadThread
|
||||
from .invthread import InvThread
|
||||
from .networkthread import BMNetworkThread
|
||||
|
@ -23,8 +22,6 @@ def start(config, state):
|
|||
from .uploadthread import UploadThread
|
||||
|
||||
readKnownNodes()
|
||||
# init, needs to be early because other thread may access it early
|
||||
state.Dandelion = Dandelion()
|
||||
connectionpool.pool.connectToStream(1)
|
||||
for thread in (
|
||||
BMNetworkThread(), InvThread(), AddrThread(),
|
||||
|
|
|
@ -6,6 +6,7 @@ import time
|
|||
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from highlevelcrypto import calculateInventoryHash
|
||||
|
||||
|
@ -112,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
|
|||
or advertise it unnecessarily)
|
||||
"""
|
||||
# if it's a stem duplicate, pretend we don't have it
|
||||
if state.Dandelion.hasHash(self.inventoryHash):
|
||||
if dandelion.instance.hasHash(self.inventoryHash):
|
||||
return
|
||||
if self.inventoryHash in state.Inventory:
|
||||
raise BMObjectAlreadyHaveError()
|
||||
|
|
|
@ -15,6 +15,7 @@ import addresses
|
|||
import knownnodes
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from bmconfigparser import config
|
||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
||||
|
@ -337,27 +338,27 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
self.pendingUpload[str(i)] = now
|
||||
return True
|
||||
|
||||
def _command_inv(self, dandelion=False):
|
||||
def _command_inv(self, extend_dandelion_stem=False):
|
||||
"""
|
||||
Common inv announce implementation:
|
||||
both inv and dinv depending on *dandelion* kwarg
|
||||
both inv and dinv depending on *extend_dandelion_stem* kwarg
|
||||
"""
|
||||
items = self.decode_payload_content("l32s")
|
||||
|
||||
if len(items) > protocol.MAX_OBJECT_COUNT:
|
||||
logger.error(
|
||||
'Too many items in %sinv message!', 'd' if dandelion else '')
|
||||
'Too many items in %sinv message!', 'd' if extend_dandelion_stem else '')
|
||||
raise BMProtoExcessiveDataError()
|
||||
|
||||
# ignore dinv if dandelion turned off
|
||||
if dandelion and not state.dandelion_enabled:
|
||||
if extend_dandelion_stem and not state.dandelion_enabled:
|
||||
return True
|
||||
|
||||
for i in map(str, items):
|
||||
if i in state.Inventory and not state.Dandelion.hasHash(i):
|
||||
if i in state.Inventory and not dandelion.instance.hasHash(i):
|
||||
continue
|
||||
if dandelion and not state.Dandelion.hasHash(i):
|
||||
state.Dandelion.addHash(i, self)
|
||||
if extend_dandelion_stem and not dandelion.instance.hasHash(i):
|
||||
dandelion.instance.addHash(i, self)
|
||||
self.handleReceivedInventory(i)
|
||||
|
||||
return True
|
||||
|
@ -419,9 +420,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash(
|
||||
if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash(
|
||||
self.object.inventoryHash):
|
||||
state.Dandelion.removeHash(
|
||||
dandelion.instance.removeHash(
|
||||
self.object.inventoryHash, "cycle detection")
|
||||
|
||||
state.Inventory[self.object.inventoryHash] = (
|
||||
|
|
|
@ -192,3 +192,6 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
self.nodeMap = {}
|
||||
# hashMap stays to cater for pending stems
|
||||
self.refresh = time() + REASSIGN_INTERVAL
|
||||
|
||||
|
||||
instance = Dandelion()
|
||||
|
|
|
@ -6,6 +6,7 @@ import state
|
|||
import addresses
|
||||
import helper_random
|
||||
import protocol
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from objectracker import missingObjects
|
||||
from threads import StoppableThread
|
||||
|
@ -59,7 +60,7 @@ class DownloadThread(StoppableThread):
|
|||
payload = bytearray()
|
||||
chunkCount = 0
|
||||
for chunk in request:
|
||||
if chunk in state.Inventory and not state.Dandelion.hasHash(chunk):
|
||||
if chunk in state.Inventory and not dandelion.instance.hasHash(chunk):
|
||||
try:
|
||||
del i.objectsNewToMe[chunk]
|
||||
except KeyError:
|
||||
|
|
|
@ -8,6 +8,7 @@ from time import time
|
|||
import addresses
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from queues import invQueue
|
||||
from threads import StoppableThread
|
||||
|
@ -39,10 +40,10 @@ class InvThread(StoppableThread):
|
|||
@staticmethod
|
||||
def handleLocallyGenerated(stream, hashId):
|
||||
"""Locally generated inventory items require special handling"""
|
||||
state.Dandelion.addHash(hashId, stream=stream)
|
||||
dandelion.instance.addHash(hashId, stream=stream)
|
||||
for connection in connectionpool.pool.connections():
|
||||
if state.dandelion_enabled and connection != \
|
||||
state.Dandelion.objectChildStem(hashId):
|
||||
dandelion.instance.objectChildStem(hashId):
|
||||
continue
|
||||
connection.objectsNewToThem[hashId] = time()
|
||||
|
||||
|
@ -51,7 +52,7 @@ class InvThread(StoppableThread):
|
|||
chunk = []
|
||||
while True:
|
||||
# Dandelion fluff trigger by expiration
|
||||
handleExpiredDandelion(state.Dandelion.expire())
|
||||
handleExpiredDandelion(dandelion.instance.expire())
|
||||
try:
|
||||
data = invQueue.get(False)
|
||||
chunk.append((data[0], data[1]))
|
||||
|
@ -74,7 +75,7 @@ class InvThread(StoppableThread):
|
|||
except KeyError:
|
||||
continue
|
||||
try:
|
||||
if connection == state.Dandelion.objectChildStem(inv[1]):
|
||||
if connection == dandelion.instance.objectChildStem(inv[1]):
|
||||
# Fluff trigger by RNG
|
||||
# auto-ignore if config set to 0, i.e. dandelion is off
|
||||
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
|
||||
|
@ -104,7 +105,7 @@ class InvThread(StoppableThread):
|
|||
for _ in range(len(chunk)):
|
||||
invQueue.task_done()
|
||||
|
||||
if state.Dandelion.refresh < time():
|
||||
state.Dandelion.reRandomiseStems()
|
||||
if dandelion.instance.refresh < time():
|
||||
dandelion.instance.reRandomiseStems()
|
||||
|
||||
self.stop.wait(1)
|
||||
|
|
|
@ -4,7 +4,7 @@ Module for tracking objects
|
|||
import time
|
||||
from threading import RLock
|
||||
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
|
||||
|
@ -107,14 +107,14 @@ class ObjectTracker(object):
|
|||
del i.objectsNewToMe[hashid]
|
||||
except KeyError:
|
||||
if streamNumber in i.streams and (
|
||||
not state.Dandelion.hasHash(hashid)
|
||||
or state.Dandelion.objectChildStem(hashid) == i):
|
||||
not dandelion.instance.hasHash(hashid)
|
||||
or dandelion.instance.objectChildStem(hashid) == i):
|
||||
with i.objectsNewToThemLock:
|
||||
i.objectsNewToThem[hashid] = time.time()
|
||||
# update stream number,
|
||||
# which we didn't have when we just received the dinv
|
||||
# also resets expiration of the stem mode
|
||||
state.Dandelion.setHashStream(hashid, streamNumber)
|
||||
dandelion.instance.setHashStream(hashid, streamNumber)
|
||||
|
||||
if i == self:
|
||||
try:
|
||||
|
|
|
@ -15,6 +15,7 @@ import helper_random
|
|||
import l10n
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from bmconfigparser import config
|
||||
from highlevelcrypto import randomBytes
|
||||
|
@ -168,7 +169,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
knownnodes.increaseRating(self.destination)
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
state.Dandelion.maybeAddStem(self)
|
||||
dandelion.instance.maybeAddStem(self)
|
||||
self.sendAddr()
|
||||
self.sendBigInv()
|
||||
|
||||
|
@ -230,7 +231,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
with self.objectsNewToThemLock:
|
||||
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
|
||||
# don't advertise stem objects on bigInv
|
||||
if state.Dandelion.hasHash(objHash):
|
||||
if dandelion.instance.hasHash(objHash):
|
||||
continue
|
||||
bigInvList[objHash] = 0
|
||||
objectCount = 0
|
||||
|
@ -292,7 +293,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
if host_is_global:
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
state.Dandelion.maybeRemoveStem(self)
|
||||
dandelion.instance.maybeRemoveStem(self)
|
||||
else:
|
||||
self.checkTimeOffsetNotification()
|
||||
if host_is_global:
|
||||
|
|
|
@ -6,6 +6,7 @@ import time
|
|||
import helper_random
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
from threads import StoppableThread
|
||||
|
@ -40,8 +41,8 @@ class UploadThread(StoppableThread):
|
|||
chunk_count = 0
|
||||
for chunk in request:
|
||||
del i.pendingUpload[chunk]
|
||||
if state.Dandelion.hasHash(chunk) and \
|
||||
i != state.Dandelion.objectChildStem(chunk):
|
||||
if dandelion.instance.hasHash(chunk) and \
|
||||
i != dandelion.instance.objectChildStem(chunk):
|
||||
i.antiIntersectionDelay()
|
||||
self.logger.info(
|
||||
'%s asked for a stem object we didn\'t offer to it.',
|
||||
|
|
|
@ -96,6 +96,3 @@ class Placeholder(object): # pylint:disable=too-few-public-methods
|
|||
|
||||
|
||||
Inventory = Placeholder("Inventory")
|
||||
|
||||
|
||||
Dandelion = Placeholder("Dandelion")
|
||||
|
|
Reference in New Issue
Block a user