diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 9acd1278..ab131a4c 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -12,6 +12,7 @@ The PyBitmessage startup script import os import sys + try: import pathmagic except ImportError: @@ -156,13 +157,6 @@ class Main(object): set_thread_name("PyBitmessage") - state.dandelion_enabled = config.safeGetInt('network', 'dandelion') - # dandelion requires outbound connections, without them, - # stem objects will get stuck forever - if state.dandelion_enabled and not config.safeGetBoolean( - 'bitmessagesettings', 'sendoutgoingconnections'): - state.dandelion_enabled = 0 - if state.testmode or config.safeGetBoolean( 'bitmessagesettings', 'extralowdifficulty'): defaults.networkDefaultProofOfWorkNonceTrialsPerByte = int( diff --git a/src/network/__init__.py b/src/network/__init__.py index 61e1d9d1..42e9d035 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -1,9 +1,10 @@ """ Network subsystem package """ - +from .dandelion import Dandelion from .threads import StoppableThread +dandelion_ins = Dandelion() __all__ = ["StoppableThread"] @@ -21,6 +22,11 @@ def start(config, state): from .receivequeuethread import ReceiveQueueThread from .uploadthread import UploadThread + # check and set dandelion enabled value at network startup + dandelion_ins.init_dandelion_enabled(config) + # pass pool instance into dandelion class instance + dandelion_ins.init_pool(connectionpool.pool) + readKnownNodes() connectionpool.pool.connectToStream(1) for thread in ( diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 2f9fc0a7..83311b9b 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -6,8 +6,8 @@ import time import protocol import state -import dandelion import connectionpool +from network import dandelion_ins from highlevelcrypto import calculateInventoryHash logger = logging.getLogger('default') @@ -113,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes or advertise it unnecessarily) """ # if it's a stem duplicate, pretend we don't have it - if dandelion.instance.hasHash(self.inventoryHash): + if dandelion_ins.hasHash(self.inventoryHash): return if self.inventoryHash in state.Inventory: raise BMObjectAlreadyHaveError() diff --git a/src/network/bmproto.py b/src/network/bmproto.py index cbe39017..797dab5e 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -15,7 +15,6 @@ import addresses import knownnodes import protocol import state -import dandelion import connectionpool from bmconfigparser import config from queues import invQueue, objectProcessorQueue, portCheckerQueue @@ -27,7 +26,7 @@ from network.bmobject import ( BMObjectUnwantedStreamError ) from network.proxy import ProxyError - +from network import dandelion_ins from node import Node, Peer from objectracker import ObjectTracker, missingObjects @@ -351,14 +350,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker): raise BMProtoExcessiveDataError() # ignore dinv if dandelion turned off - if extend_dandelion_stem and not state.dandelion_enabled: + if extend_dandelion_stem and not dandelion_ins.enabled: return True for i in map(str, items): - if i in state.Inventory and not dandelion.instance.hasHash(i): + if i in state.Inventory and not dandelion_ins.hasHash(i): continue - if extend_dandelion_stem and not dandelion.instance.hasHash(i): - dandelion.instance.addHash(i, self) + if extend_dandelion_stem and not dandelion_ins.hasHash(i): + dandelion_ins.addHash(i, self) self.handleReceivedInventory(i) return True @@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): except KeyError: pass - if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash( + if self.object.inventoryHash in state.Inventory and dandelion_ins.hasHash( self.object.inventoryHash): - dandelion.instance.removeHash( + dandelion_ins.removeHash( self.object.inventoryHash, "cycle detection") state.Inventory[self.object.inventoryHash] = ( @@ -541,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): if not self.isOutbound: self.append_write_buf(protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, True, + connectionpool.pool.streams, dandelion_ins.enabled, True, nodeid=self.nodeid)) logger.debug( '%(host)s:%(port)i sending version', diff --git a/src/network/dandelion.py b/src/network/dandelion.py index d4c51cad..564a35f9 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -7,9 +7,6 @@ from random import choice, expovariate, sample from threading import RLock from time import time -import connectionpool -import state -from queues import invQueue # randomise routes after 600 seconds REASSIGN_INTERVAL = 600 @@ -37,6 +34,8 @@ class Dandelion: # pylint: disable=old-style-class # when to rerandomise routes self.refresh = time() + REASSIGN_INTERVAL self.lock = RLock() + self.enabled = None + self.pool = None @staticmethod def poissonTimeout(start=None, average=0): @@ -47,10 +46,23 @@ class Dandelion: # pylint: disable=old-style-class average = FLUFF_TRIGGER_MEAN_DELAY return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY + def init_pool(self, pool): + """pass pool instance""" + self.pool = pool + + def init_dandelion_enabled(self, config): + """Check if Dandelion is enabled and set value in enabled attribute""" + dandelion_enabled = config.safeGetInt('network', 'dandelion') + # dandelion requires outbound connections, without them, + # stem objects will get stuck forever + if not config.safeGetBoolean( + 'bitmessagesettings', 'sendoutgoingconnections'): + dandelion_enabled = 0 + self.enabled = dandelion_enabled + def addHash(self, hashId, source=None, stream=1): - """Add inventory vector to dandelion stem""" - if not state.dandelion_enabled: - return + """Add inventory vector to dandelion stem return status of dandelion enabled""" + assert self.enabled is not None with self.lock: self.hashMap[hashId] = Stem( self.getNodeStem(source), @@ -89,7 +101,7 @@ class Dandelion: # pylint: disable=old-style-class """Child (i.e. next) node for an inventory vector during stem mode""" return self.hashMap[hashId].child - def maybeAddStem(self, connection): + def maybeAddStem(self, connection, invQueue): """ If we had too few outbound connections, add the current one to the current stem list. Dandelion as designed by the authors should @@ -163,7 +175,7 @@ class Dandelion: # pylint: disable=old-style-class self.nodeMap[node] = self.pickStem(node) return self.nodeMap[node] - def expire(self): + def expire(self, invQueue): """Switch expired objects from stem to fluff mode""" with self.lock: deadline = time() @@ -179,19 +191,18 @@ class Dandelion: # pylint: disable=old-style-class def reRandomiseStems(self): """Re-shuffle stem mapping (parent <-> child pairs)""" + assert self.pool is not None + if self.refresh > time(): + return + with self.lock: try: # random two connections self.stem = sample( - connectionpool.BMConnectionPool( - ).outboundConnections.values(), MAX_STEMS) + self.pool.outboundConnections.values(), MAX_STEMS) # not enough stems available except ValueError: - self.stem = connectionpool.BMConnectionPool( - ).outboundConnections.values() + self.stem = self.pool.outboundConnections.values() self.nodeMap = {} # hashMap stays to cater for pending stems self.refresh = time() + REASSIGN_INTERVAL - - -instance = Dandelion() diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 4fd1c668..30a3f2fe 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -6,8 +6,8 @@ import state import addresses import helper_random import protocol -import dandelion import connectionpool +from network import dandelion_ins from objectracker import missingObjects from threads import StoppableThread @@ -60,7 +60,7 @@ class DownloadThread(StoppableThread): payload = bytearray() chunkCount = 0 for chunk in request: - if chunk in state.Inventory and not dandelion.instance.hasHash(chunk): + if chunk in state.Inventory and not dandelion_ins.hasHash(chunk): try: del i.objectsNewToMe[chunk] except KeyError: diff --git a/src/network/invthread.py b/src/network/invthread.py index 8b90e5f7..0b79710a 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -8,8 +8,8 @@ from time import time import addresses import protocol import state -import dandelion import connectionpool +from network import dandelion_ins from queues import invQueue from threads import StoppableThread @@ -40,10 +40,10 @@ class InvThread(StoppableThread): @staticmethod def handleLocallyGenerated(stream, hashId): """Locally generated inventory items require special handling""" - dandelion.instance.addHash(hashId, stream=stream) + dandelion_ins.addHash(hashId, stream=stream) for connection in connectionpool.pool.connections(): - if state.dandelion_enabled and connection != \ - dandelion.instance.objectChildStem(hashId): + if dandelion_ins.enabled and connection != \ + dandelion_ins.objectChildStem(hashId): continue connection.objectsNewToThem[hashId] = time() @@ -52,7 +52,7 @@ class InvThread(StoppableThread): chunk = [] while True: # Dandelion fluff trigger by expiration - handleExpiredDandelion(dandelion.instance.expire()) + handleExpiredDandelion(dandelion_ins.expire(invQueue)) try: data = invQueue.get(False) chunk.append((data[0], data[1])) @@ -75,10 +75,10 @@ class InvThread(StoppableThread): except KeyError: continue try: - if connection == dandelion.instance.objectChildStem(inv[1]): + if connection == dandelion_ins.objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off - if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311 + if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311 fluffs.append(inv[1]) # send a dinv only if the stem node supports dandelion elif connection.services & protocol.NODE_DANDELION > 0: @@ -105,7 +105,6 @@ class InvThread(StoppableThread): for _ in range(len(chunk)): invQueue.task_done() - if dandelion.instance.refresh < time(): - dandelion.instance.reRandomiseStems() + dandelion_ins.reRandomiseStems() self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 0e8268cf..91bb0552 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,8 +4,8 @@ Module for tracking objects import time from threading import RLock -import dandelion import connectionpool +from network import dandelion_ins from randomtrackingdict import RandomTrackingDict haveBloom = False @@ -107,14 +107,14 @@ class ObjectTracker(object): del i.objectsNewToMe[hashid] except KeyError: if streamNumber in i.streams and ( - not dandelion.instance.hasHash(hashid) - or dandelion.instance.objectChildStem(hashid) == i): + not dandelion_ins.hasHash(hashid) + or dandelion_ins.objectChildStem(hashid) == i): with i.objectsNewToThemLock: i.objectsNewToThem[hashid] = time.time() # update stream number, # which we didn't have when we just received the dinv # also resets expiration of the stem mode - dandelion.instance.setHashStream(hashid, streamNumber) + dandelion_ins.setHashStream(hashid, streamNumber) if i == self: try: diff --git a/src/network/tcp.py b/src/network/tcp.py index 35937aa8..a739e256 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -15,10 +15,10 @@ import helper_random import l10n import protocol import state -import dandelion import connectionpool from bmconfigparser import config from highlevelcrypto import randomBytes +from network import dandelion_ins from queues import invQueue, receiveDataQueue, UISignalQueue from tr import _translate @@ -169,7 +169,7 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.increaseRating(self.destination) knownnodes.addKnownNode( self.streams, self.destination, time.time()) - dandelion.instance.maybeAddStem(self) + dandelion_ins.maybeAddStem(self, invQueue) self.sendAddr() self.sendBigInv() @@ -231,7 +231,7 @@ class TCPConnection(BMProto, TLSDispatcher): with self.objectsNewToThemLock: for objHash in state.Inventory.unexpired_hashes_by_stream(stream): # don't advertise stem objects on bigInv - if dandelion.instance.hasHash(objHash): + if dandelion_ins.hasHash(objHash): continue bigInvList[objHash] = 0 objectCount = 0 @@ -268,7 +268,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, + connectionpool.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.connectedAt = time.time() receiveDataQueue.put(self.destination) @@ -293,7 +293,7 @@ class TCPConnection(BMProto, TLSDispatcher): if host_is_global: knownnodes.addKnownNode( self.streams, self.destination, time.time()) - dandelion.instance.maybeRemoveStem(self) + dandelion_ins.maybeRemoveStem(self) else: self.checkTimeOffsetNotification() if host_is_global: @@ -319,7 +319,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, + connectionpool.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True @@ -343,7 +343,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection): self.append_write_buf( protocol.assembleVersionMessage( self.destination.host, self.destination.port, - connectionpool.pool.streams, + connectionpool.pool.streams, dandelion_ins.enabled, False, nodeid=self.nodeid)) self.set_state("bm_header", expectBytes=protocol.Header.size) return True diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 1fc3734e..e91f08fa 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -6,9 +6,9 @@ import time import helper_random import protocol import state -import dandelion import connectionpool from randomtrackingdict import RandomTrackingDict +from network import dandelion_ins from threads import StoppableThread @@ -41,8 +41,8 @@ class UploadThread(StoppableThread): chunk_count = 0 for chunk in request: del i.pendingUpload[chunk] - if dandelion.instance.hasHash(chunk) and \ - i != dandelion.instance.objectChildStem(chunk): + if dandelion_ins.hasHash(chunk) and \ + i != dandelion_ins.objectChildStem(chunk): i.antiIntersectionDelay() self.logger.info( '%s asked for a stem object we didn\'t offer to it.', diff --git a/src/protocol.py b/src/protocol.py index 7f9830e5..96c980bb 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -336,8 +336,8 @@ def assembleAddrMessage(peerList): return retval -def assembleVersionMessage( - remoteHost, remotePort, participatingStreams, server=False, nodeid=None +def assembleVersionMessage( # pylint: disable=too-many-arguments + remoteHost, remotePort, participatingStreams, dandelion_enabled=True, server=False, nodeid=None, ): """ Construct the payload of a version message, @@ -350,7 +350,7 @@ def assembleVersionMessage( '>q', NODE_NETWORK | (NODE_SSL if haveSSL(server) else 0) - | (NODE_DANDELION if state.dandelion_enabled else 0) + | (NODE_DANDELION if dandelion_enabled else 0) ) payload += pack('>q', int(time.time())) @@ -374,7 +374,7 @@ def assembleVersionMessage( '>q', NODE_NETWORK | (NODE_SSL if haveSSL(server) else 0) - | (NODE_DANDELION if state.dandelion_enabled else 0) + | (NODE_DANDELION if dandelion_enabled else 0) ) # = 127.0.0.1. This will be ignored by the remote host. # The actual remote connected IP will be used. diff --git a/src/state.py b/src/state.py index a40ebbc2..90c9cf0d 100644 --- a/src/state.py +++ b/src/state.py @@ -43,8 +43,6 @@ ownAddresses = {} discoveredPeers = {} -dandelion_enabled = 0 - kivy = False kivyapp = None diff --git a/src/tests/core.py b/src/tests/core.py index f1a11a06..fd9b0d08 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -319,16 +319,17 @@ class TestCore(unittest.TestCase): def test_version(self): """check encoding/decoding of the version message""" + dandelion_enabled = True # with single stream - msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1]) + msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1], dandelion_enabled) decoded = self._decode_msg(msg, "IQQiiQlsLv") peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:] self.assertEqual( - peer, Node(11 if state.dandelion_enabled else 3, '127.0.0.1', 8444)) + peer, Node(11 if dandelion_enabled else 3, '127.0.0.1', 8444)) self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/') self.assertEqual(streams, [1]) # with multiple streams - msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3]) + msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3], dandelion_enabled) decoded = self._decode_msg(msg, "IQQiiQlslv") peer, _, ua = decoded[4:7] streams = decoded[7:]