diff --git a/src/network/__init__.py b/src/network/__init__.py index d89670a7..61e1d9d1 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -14,7 +14,6 @@ def start(config, state): from .announcethread import AnnounceThread import connectionpool # pylint: disable=relative-import from .addrthread import AddrThread - from .dandelion import Dandelion from .downloadthread import DownloadThread from .invthread import InvThread from .networkthread import BMNetworkThread @@ -23,8 +22,6 @@ def start(config, state): from .uploadthread import UploadThread readKnownNodes() - # init, needs to be early because other thread may access it early - state.Dandelion = Dandelion() connectionpool.pool.connectToStream(1) for thread in ( BMNetworkThread(), InvThread(), AddrThread(), diff --git a/src/network/bmobject.py b/src/network/bmobject.py index c91bf1b3..2f9fc0a7 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -6,6 +6,7 @@ import time import protocol import state +import dandelion import connectionpool from highlevelcrypto import calculateInventoryHash @@ -112,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes or advertise it unnecessarily) """ # if it's a stem duplicate, pretend we don't have it - if state.Dandelion.hasHash(self.inventoryHash): + if dandelion.instance.hasHash(self.inventoryHash): return if self.inventoryHash in state.Inventory: raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index ed1d48c4..cbe39017 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -15,6 +15,7 @@ import addresses import knownnodes import protocol import state +import dandelion import connectionpool from bmconfigparser import config from queues import invQueue, objectProcessorQueue, portCheckerQueue @@ -337,27 +338,27 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.pendingUpload[str(i)] = now return True - def _command_inv(self, dandelion=False): + def _command_inv(self, extend_dandelion_stem=False): """ Common inv announce implementation: - both inv and dinv depending on *dandelion* kwarg + both inv and dinv depending on *extend_dandelion_stem* kwarg """ items = self.decode_payload_content("l32s") if len(items) > protocol.MAX_OBJECT_COUNT: logger.error( - 'Too many items in %sinv message!', 'd' if dandelion else '') + 'Too many items in %sinv message!', 'd' if extend_dandelion_stem else '') raise BMProtoExcessiveDataError() # ignore dinv if dandelion turned off - if dandelion and not state.dandelion_enabled: + if extend_dandelion_stem and not state.dandelion_enabled: return True for i in map(str, items): - if i in state.Inventory and not state.Dandelion.hasHash(i): + if i in state.Inventory and not dandelion.instance.hasHash(i): continue - if dandelion and not state.Dandelion.hasHash(i): - state.Dandelion.addHash(i, self) + if extend_dandelion_stem and not dandelion.instance.hasHash(i): + dandelion.instance.addHash(i, self) self.handleReceivedInventory(i) return True @@ -419,9 +420,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass - if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash( + if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash( self.object.inventoryHash): - state.Dandelion.removeHash( + dandelion.instance.removeHash( self.object.inventoryHash, "cycle detection") state.Inventory[self.object.inventoryHash] = ( diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 35e70c95..d4c51cad 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -192,3 +192,6 @@ class Dandelion: # pylint: disable=old-style-class self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL + + +instance = Dandelion() diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 4f108c72..4fd1c668 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -6,6 +6,7 @@ import state import addresses import helper_random import protocol +import dandelion import connectionpool from objectracker import missingObjects from threads import StoppableThread @@ -59,7 +60,7 @@ class DownloadThread(StoppableThread): payload = bytearray() chunkCount = 0 for chunk in request: - if chunk in state.Inventory and not state.Dandelion.hasHash(chunk): + if chunk in state.Inventory and not dandelion.instance.hasHash(chunk): try: del i.objectsNewToMe[chunk] except KeyError: diff --git a/src/network/invthread.py b/src/network/invthread.py index b55408d4..8b90e5f7 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -8,6 +8,7 @@ from time import time import addresses import protocol import state +import dandelion import connectionpool from queues import invQueue from threads import StoppableThread @@ -39,10 +40,10 @@ class InvThread(StoppableThread): @staticmethod def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" - state.Dandelion.addHash(hashId, stream=stream) + dandelion.instance.addHash(hashId, stream=stream) for connection in connectionpool.pool.connections(): if state.dandelion_enabled and connection != \ - state.Dandelion.objectChildStem(hashId): + dandelion.instance.objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() @@ -51,7 +52,7 @@ class InvThread(StoppableThread): chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(state.Dandelion.expire()) + handleExpiredDandelion(dandelion.instance.expire()) try: data = invQueue.get(False) chunk.append((data[0], data[1])) @@ -74,7 +75,7 @@ class InvThread(StoppableThread): except KeyError: continue try: - if connection == state.Dandelion.objectChildStem(inv[1]): + if connection == dandelion.instance.objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311 @@ -104,7 +105,7 @@ class InvThread(StoppableThread): for _ in range(len(chunk)): invQueue.task_done() - if state.Dandelion.refresh < time(): - state.Dandelion.reRandomiseStems() + if dandelion.instance.refresh < time(): + dandelion.instance.reRandomiseStems() self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index a458e5d2..0e8268cf 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,7 +4,7 @@ Module for tracking objects import time from threading import RLock -import state +import dandelion import connectionpool from randomtrackingdict import RandomTrackingDict @@ -107,14 +107,14 @@ class ObjectTracker(object): del i.objectsNewToMe[hashid] except KeyError: if streamNumber in i.streams and ( - not state.Dandelion.hasHash(hashid) - or state.Dandelion.objectChildStem(hashid) == i): + not dandelion.instance.hasHash(hashid) + or dandelion.instance.objectChildStem(hashid) == i): with i.objectsNewToThemLock: i.objectsNewToThem[hashid] = time.time() # update stream number, # which we didn't have when we just received the dinv # also resets expiration of the stem mode - state.Dandelion.setHashStream(hashid, streamNumber) + dandelion.instance.setHashStream(hashid, streamNumber) if i == self: try: diff --git a/src/network/tcp.py b/src/network/tcp.py index 139715a6..35937aa8 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -15,6 +15,7 @@ import helper_random import l10n import protocol import state +import dandelion import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes @@ -168,7 +169,7 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.increaseRating(self.destination) knownnodes.addKnownNode( self.streams, self.destination, time.time()) - state.Dandelion.maybeAddStem(self) + dandelion.instance.maybeAddStem(self) self.sendAddr() self.sendBigInv() @@ -230,7 +231,7 @@ class TCPConnection(BMProto, TLSDispatcher): with self.objectsNewToThemLock: for objHash in state.Inventory.unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv - if state.Dandelion.hasHash(objHash): + if dandelion.instance.hasHash(objHash): continue bigInvList[objHash] = 0 objectCount = 0 @@ -292,7 +293,7 @@ class TCPConnection(BMProto, TLSDispatcher): if host_is_global: knownnodes.addKnownNode( self.streams, self.destination, time.time()) - state.Dandelion.maybeRemoveStem(self) + dandelion.instance.maybeRemoveStem(self) else: self.checkTimeOffsetNotification() if host_is_global: diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 90048c0a..1fc3734e 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -6,6 +6,7 @@ import time import helper_random import protocol import state +import dandelion import connectionpool from randomtrackingdict import RandomTrackingDict from threads import StoppableThread @@ -40,8 +41,8 @@ class UploadThread(StoppableThread): chunk_count = 0 for chunk in request: del i.pendingUpload[chunk] - if state.Dandelion.hasHash(chunk) and \ - i != state.Dandelion.objectChildStem(chunk): + if dandelion.instance.hasHash(chunk) and \ + i != dandelion.instance.objectChildStem(chunk): i.antiIntersectionDelay() self.logger.info( '%s asked for a stem object we didn\'t offer to it.', diff --git a/src/state.py b/src/state.py index e530425d..a40ebbc2 100644 --- a/src/state.py +++ b/src/state.py @@ -96,6 +96,3 @@ class Placeholder(object): # pylint:disable=too-few-public-methods Inventory = Placeholder("Inventory") - - -Dandelion = Placeholder("Dandelion")