diff --git a/src/network/__init__.py b/src/network/__init__.py index 678d9f59..58fca104 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -14,7 +14,6 @@ def start(config, state): from .announcethread import AnnounceThread from network import connectionpool from .addrthread import AddrThread - from .dandelion import Dandelion from .downloadthread import DownloadThread from .invthread import InvThread from .networkthread import BMNetworkThread @@ -23,8 +22,6 @@ def start(config, state): from .uploadthread import UploadThread readKnownNodes() - # init, needs to be early because other thread may access it early - state.Dandelion = Dandelion() connectionpool.pool.connectToStream(1) for thread in ( BMNetworkThread(), InvThread(), AddrThread(), diff --git a/src/network/bmobject.py b/src/network/bmobject.py index a8b0d761..35798bb9 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -7,6 +7,7 @@ import time import protocol import state import network.connectionpool # use long name to address recursive import +import dandelion from highlevelcrypto import calculateInventoryHash logger = logging.getLogger('default') @@ -112,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes or advertise it unnecessarily) """ # if it's a stem duplicate, pretend we don't have it - if state.Dandelion.hasHash(self.inventoryHash): + if dandelion.instance.hasHash(self.inventoryHash): return if self.inventoryHash in state.Inventory: raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index a981ce0e..8e3841a4 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -17,6 +17,7 @@ from network import knownnodes import protocol import state import network.connectionpool # use long name to address recursive import +import dandelion from bmconfigparser import config from queues import invQueue, objectProcessorQueue, portCheckerQueue from randomtrackingdict import RandomTrackingDict @@ -350,27 +351,27 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.pendingUpload[i] = now return True - def _command_inv(self, dandelion=False): + def _command_inv(self, extend_dandelion_stem=False): """ Common inv announce implementation: - both inv and dinv depending on *dandelion* kwarg + both inv and dinv depending on *extend_dandelion_stem* kwarg """ items = self.decode_payload_content("l32s") if len(items) > protocol.MAX_OBJECT_COUNT: logger.error( - 'Too many items in %sinv message!', 'd' if dandelion else '') + 'Too many items in %sinv message!', 'd' if extend_dandelion_stem else '') raise BMProtoExcessiveDataError() # ignore dinv if dandelion turned off - if dandelion and not state.dandelion_enabled: + if extend_dandelion_stem and not state.dandelion_enabled: return True for i in items: - if i in state.Inventory and not state.Dandelion.hasHash(i): + if i in state.Inventory and not dandelion.instance.hasHash(i): continue - if dandelion and not state.Dandelion.hasHash(i): - state.Dandelion.addHash(i, self) + if extend_dandelion_stem and not dandelion.instance.hasHash(i): + dandelion.instance.addHash(i, self) self.handleReceivedInventory(i) return True @@ -436,9 +437,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass - if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash( + if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash( self.object.inventoryHash): - state.Dandelion.removeHash( + dandelion.instance.removeHash( self.object.inventoryHash, "cycle detection") if six.PY2: diff --git a/src/network/dandelion.py b/src/network/dandelion.py index 3e67d1a7..0736a80a 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -194,3 +194,6 @@ class Dandelion: # pylint: disable=old-style-class self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL + + +instance = Dandelion() diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index f6fa7025..34c4b5c3 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -10,6 +10,7 @@ import protocol from network import connectionpool from .objectracker import missingObjects from .threads import StoppableThread +import dandelion class DownloadThread(StoppableThread): @@ -60,7 +61,7 @@ class DownloadThread(StoppableThread): payload = bytearray() chunkCount = 0 for chunk in request: - if chunk in state.Inventory and not state.Dandelion.hasHash(chunk): + if chunk in state.Inventory and not dandelion.instance.hasHash(chunk): try: del i.objectsNewToMe[chunk] except KeyError: diff --git a/src/network/invthread.py b/src/network/invthread.py index a7830a8c..8bc1f837 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,6 +9,7 @@ import addresses import protocol import state from network import connectionpool +import dandelion from queues import invQueue from .threads import StoppableThread @@ -39,10 +40,10 @@ class InvThread(StoppableThread): @staticmethod def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" - state.Dandelion.addHash(hashId, stream=stream) + dandelion.instance.addHash(hashId, stream=stream) for connection in connectionpool.pool.connections(): if state.dandelion_enabled and connection != \ - state.Dandelion.objectChildStem(hashId): + dandelion.instance.objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() @@ -51,7 +52,7 @@ class InvThread(StoppableThread): chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(state.Dandelion.expire()) + handleExpiredDandelion(dandelion.instance.expire()) try: data = invQueue.get(False) chunk.append((data[0], data[1])) @@ -74,7 +75,7 @@ class InvThread(StoppableThread): except KeyError: continue try: - if connection == state.Dandelion.objectChildStem(inv[1]): + if connection == dandelion.instance.objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311 @@ -104,7 +105,7 @@ class InvThread(StoppableThread): for _ in range(len(chunk)): invQueue.task_done() - if state.Dandelion.refresh < time(): - state.Dandelion.reRandomiseStems() + if dandelion.instance.refresh < time(): + dandelion.instance.reRandomiseStems() self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index bacf8134..eccbaee3 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -5,8 +5,8 @@ import time from threading import RLock import six -import state import network.connectionpool # use long name to address recursive import +import dandelion from randomtrackingdict import RandomTrackingDict haveBloom = False @@ -111,14 +111,14 @@ class ObjectTracker(object): del i.objectsNewToMe[hashid] except KeyError: if streamNumber in i.streams and ( - not state.Dandelion.hasHash(hashid) - or state.Dandelion.objectChildStem(hashid) == i): + not dandelion.instance.hasHash(hashid) + or dandelion.instance.objectChildStem(hashid) == i): with i.objectsNewToThemLock: i.objectsNewToThem[hashid_bytes] = time.time() # update stream number, # which we didn't have when we just received the dinv # also resets expiration of the stem mode - state.Dandelion.setHashStream(hashid, streamNumber) + dandelion.instance.setHashStream(hashid, streamNumber) if i == self: try: diff --git a/src/network/tcp.py b/src/network/tcp.py index 95b7eb58..3b27c0f9 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -17,6 +17,7 @@ import l10n import protocol import state import network.connectionpool # use long name to address recursive import +import dandelion from bmconfigparser import config from highlevelcrypto import randomBytes from queues import invQueue, receiveDataQueue, UISignalQueue @@ -175,7 +176,7 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.increaseRating(self.destination) knownnodes.addKnownNode( self.streams, self.destination, time.time()) - state.Dandelion.maybeAddStem(self) + dandelion.instance.maybeAddStem(self) self.sendAddr() self.sendBigInv() @@ -237,7 +238,7 @@ class TCPConnection(BMProto, TLSDispatcher): with self.objectsNewToThemLock: for objHash in state.Inventory.unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv - if state.Dandelion.hasHash(objHash): + if dandelion.instance.hasHash(objHash): continue bigInvList[objHash] = 0 objectCount = 0 @@ -299,7 +300,7 @@ class TCPConnection(BMProto, TLSDispatcher): if host_is_global: knownnodes.addKnownNode( self.streams, self.destination, time.time()) - state.Dandelion.maybeRemoveStem(self) + dandelion.instance.maybeRemoveStem(self) else: self.checkTimeOffsetNotification() if host_is_global: diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 7290d139..e1e8d121 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -7,6 +7,7 @@ import helper_random import protocol import state from network import connectionpool +import dandelion from randomtrackingdict import RandomTrackingDict from .threads import StoppableThread @@ -40,8 +41,8 @@ class UploadThread(StoppableThread): chunk_count = 0 for chunk in request: del i.pendingUpload[chunk] - if state.Dandelion.hasHash(chunk) and \ - i != state.Dandelion.objectChildStem(chunk): + if dandelion.instance.hasHash(chunk) and \ + i != dandelion.instance.objectChildStem(chunk): i.antiIntersectionDelay() self.logger.info( '%s asked for a stem object we didn\'t offer to it.', diff --git a/src/state.py b/src/state.py index e530425d..a40ebbc2 100644 --- a/src/state.py +++ b/src/state.py @@ -96,6 +96,3 @@ class Placeholder(object): # pylint:disable=too-few-public-methods Inventory = Placeholder("Inventory") - - -Dandelion = Placeholder("Dandelion")