moved dandelion_enabled from state to dandelion instance as enabled attribute #2246
|
@ -12,6 +12,7 @@ The PyBitmessage startup script
|
|||
import os
|
||||
import sys
|
||||
|
||||
|
||||
try:
|
||||
import pathmagic
|
||||
except ImportError:
|
||||
|
@ -156,13 +157,6 @@ class Main(object):
|
|||
|
||||
set_thread_name("PyBitmessage")
|
||||
|
||||
state.dandelion_enabled = config.safeGetInt('network', 'dandelion')
|
||||
# dandelion requires outbound connections, without them,
|
||||
# stem objects will get stuck forever
|
||||
if state.dandelion_enabled and not config.safeGetBoolean(
|
||||
'bitmessagesettings', 'sendoutgoingconnections'):
|
||||
state.dandelion_enabled = 0
|
||||
|
||||
if state.testmode or config.safeGetBoolean(
|
||||
'bitmessagesettings', 'extralowdifficulty'):
|
||||
defaults.networkDefaultProofOfWorkNonceTrialsPerByte = int(
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
"""
|
||||
Network subsystem package
|
||||
"""
|
||||
|
||||
from .dandelion import Dandelion
|
||||
from .threads import StoppableThread
|
||||
|
||||
dandelion_ins = Dandelion()
|
||||
|
||||
__all__ = ["StoppableThread"]
|
||||
|
||||
|
@ -21,6 +22,11 @@ def start(config, state):
|
|||
from .receivequeuethread import ReceiveQueueThread
|
||||
from .uploadthread import UploadThread
|
||||
|
||||
# check and set dandelion enabled value at network startup
|
||||
dandelion_ins.init_dandelion_enabled(config)
|
||||
# pass pool instance into dandelion class instance
|
||||
dandelion_ins.init_pool(connectionpool.pool)
|
||||
|
||||
readKnownNodes()
|
||||
connectionpool.pool.connectToStream(1)
|
||||
for thread in (
|
||||
|
|
|
@ -6,8 +6,8 @@ import time
|
|||
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from network import dandelion_ins
|
||||
from highlevelcrypto import calculateInventoryHash
|
||||
|
||||
logger = logging.getLogger('default')
|
||||
|
@ -113,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
|
|||
or advertise it unnecessarily)
|
||||
"""
|
||||
# if it's a stem duplicate, pretend we don't have it
|
||||
if dandelion.instance.hasHash(self.inventoryHash):
|
||||
if dandelion_ins.hasHash(self.inventoryHash):
|
||||
return
|
||||
if self.inventoryHash in state.Inventory:
|
||||
raise BMObjectAlreadyHaveError()
|
||||
|
|
|
@ -15,7 +15,6 @@ import addresses
|
|||
import knownnodes
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from bmconfigparser import config
|
||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
||||
|
@ -27,7 +26,7 @@ from network.bmobject import (
|
|||
BMObjectUnwantedStreamError
|
||||
)
|
||||
from network.proxy import ProxyError
|
||||
|
||||
from network import dandelion_ins
|
||||
from node import Node, Peer
|
||||
from objectracker import ObjectTracker, missingObjects
|
||||
|
||||
|
@ -351,14 +350,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
raise BMProtoExcessiveDataError()
|
||||
|
||||
|
||||
# ignore dinv if dandelion turned off
|
||||
if extend_dandelion_stem and not state.dandelion_enabled:
|
||||
if extend_dandelion_stem and not dandelion_ins.enabled:
|
||||
return True
|
||||
|
||||
for i in map(str, items):
|
||||
if i in state.Inventory and not dandelion.instance.hasHash(i):
|
||||
if i in state.Inventory and not dandelion_ins.hasHash(i):
|
||||
continue
|
||||
if extend_dandelion_stem and not dandelion.instance.hasHash(i):
|
||||
dandelion.instance.addHash(i, self)
|
||||
if extend_dandelion_stem and not dandelion_ins.hasHash(i):
|
||||
dandelion_ins.addHash(i, self)
|
||||
self.handleReceivedInventory(i)
|
||||
|
||||
return True
|
||||
|
@ -420,9 +419,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash(
|
||||
if self.object.inventoryHash in state.Inventory and dandelion_ins.hasHash(
|
||||
self.object.inventoryHash):
|
||||
dandelion.instance.removeHash(
|
||||
dandelion_ins.removeHash(
|
||||
self.object.inventoryHash, "cycle detection")
|
||||
|
||||
state.Inventory[self.object.inventoryHash] = (
|
||||
|
@ -541,7 +540,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
if not self.isOutbound:
|
||||
self.append_write_buf(protocol.assembleVersionMessage(
|
||||
self.destination.host, self.destination.port,
|
||||
connectionpool.pool.streams, True,
|
||||
connectionpool.pool.streams, dandelion_ins.enabled, True,
|
||||
nodeid=self.nodeid))
|
||||
logger.debug(
|
||||
'%(host)s:%(port)i sending version',
|
||||
|
|
|
@ -7,9 +7,6 @@ from random import choice, expovariate, sample
|
|||
from threading import RLock
|
||||
from time import time
|
||||
|
||||
import connectionpool
|
||||
import state
|
||||
from queues import invQueue
|
||||
|
||||
# randomise routes after 600 seconds
|
||||
REASSIGN_INTERVAL = 600
|
||||
|
@ -37,6 +34,8 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
# when to rerandomise routes
|
||||
self.refresh = time() + REASSIGN_INTERVAL
|
||||
self.lock = RLock()
|
||||
self.enabled = None
|
||||
self.pool = None
|
||||
|
||||
@staticmethod
|
||||
def poissonTimeout(start=None, average=0):
|
||||
|
@ -47,10 +46,23 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
average = FLUFF_TRIGGER_MEAN_DELAY
|
||||
return start + expovariate(1.0 / average) + FLUFF_TRIGGER_FIXED_DELAY
|
||||
|
||||
Maybe "pass pool instance" Maybe "pass pool instance"
Do we need a separate Do we need a separate `init_is_enabled` and `is_dandelion_enabled`?
merged it into merged it into `is_dandelion_enabled` by changing it to classmethod from staticmethod. and removed `init_is_enabled`.
Done Done
I would call this I would call this `init_dandelion_enabled`
We probably don't need to check for We probably don't need to check for `dandelion_enabled` here.
Value is coming from config at startup. so i think it is required. Value is coming from config at startup. so i think it is required.
done done
I mean we're checking twice, should work ok if we just check once. I mean we're checking twice, should work ok if we just check once.
|
||||
def init_pool(self, pool):
|
||||
"""pass pool instance"""
|
||||
self.pool = pool
|
||||
|
||||
def init_dandelion_enabled(self, config):
|
||||
"""Check if Dandelion is enabled and set value in enabled attribute"""
|
||||
dandelion_enabled = config.safeGetInt('network', 'dandelion')
|
||||
# dandelion requires outbound connections, without them,
|
||||
# stem objects will get stuck forever
|
||||
if not config.safeGetBoolean(
|
||||
'bitmessagesettings', 'sendoutgoingconnections'):
|
||||
dandelion_enabled = 0
|
||||
self.enabled = dandelion_enabled
|
||||
|
||||
def addHash(self, hashId, source=None, stream=1):
|
||||
"""Add inventory vector to dandelion stem"""
|
||||
if not state.dandelion_enabled:
|
||||
return
|
||||
"""Add inventory vector to dandelion stem return status of dandelion enabled"""
|
||||
assert self.enabled is not None
|
||||
with self.lock:
|
||||
self.hashMap[hashId] = Stem(
|
||||
self.getNodeStem(source),
|
||||
|
@ -89,7 +101,7 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
"""Child (i.e. next) node for an inventory vector during stem mode"""
|
||||
return self.hashMap[hashId].child
|
||||
|
||||
def maybeAddStem(self, connection):
|
||||
def maybeAddStem(self, connection, invQueue):
|
||||
"""
|
||||
If we had too few outbound connections, add the current one to the
|
||||
current stem list. Dandelion as designed by the authors should
|
||||
|
@ -163,7 +175,7 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
self.nodeMap[node] = self.pickStem(node)
|
||||
return self.nodeMap[node]
|
||||
|
||||
def expire(self):
|
||||
def expire(self, invQueue):
|
||||
"""Switch expired objects from stem to fluff mode"""
|
||||
with self.lock:
|
||||
deadline = time()
|
||||
|
@ -179,19 +191,18 @@ class Dandelion: # pylint: disable=old-style-class
|
|||
|
||||
def reRandomiseStems(self):
|
||||
"""Re-shuffle stem mapping (parent <-> child pairs)"""
|
||||
assert self.pool is not None
|
||||
We could use lazy initialisation on We could use lazy initialisation on `pool` too and keep it as an attribute, then we don't need to pass it later.
Done Done
|
||||
if self.refresh > time():
|
||||
return
|
||||
|
||||
with self.lock:
|
||||
try:
|
||||
# random two connections
|
||||
self.stem = sample(
|
||||
duplicate duplicate
Fixed Fixed
|
||||
connectionpool.BMConnectionPool(
|
||||
).outboundConnections.values(), MAX_STEMS)
|
||||
self.pool.outboundConnections.values(), MAX_STEMS)
|
||||
# not enough stems available
|
||||
except ValueError:
|
||||
self.stem = connectionpool.BMConnectionPool(
|
||||
).outboundConnections.values()
|
||||
self.stem = self.pool.outboundConnections.values()
|
||||
self.nodeMap = {}
|
||||
# hashMap stays to cater for pending stems
|
||||
self.refresh = time() + REASSIGN_INTERVAL
|
||||
|
||||
|
||||
instance = Dandelion()
|
||||
|
|
|
@ -6,8 +6,8 @@ import state
|
|||
import addresses
|
||||
import helper_random
|
||||
import protocol
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from network import dandelion_ins
|
||||
from objectracker import missingObjects
|
||||
from threads import StoppableThread
|
||||
|
||||
|
@ -60,7 +60,7 @@ class DownloadThread(StoppableThread):
|
|||
payload = bytearray()
|
||||
chunkCount = 0
|
||||
for chunk in request:
|
||||
if chunk in state.Inventory and not dandelion.instance.hasHash(chunk):
|
||||
if chunk in state.Inventory and not dandelion_ins.hasHash(chunk):
|
||||
try:
|
||||
del i.objectsNewToMe[chunk]
|
||||
except KeyError:
|
||||
|
|
|
@ -8,8 +8,8 @@ from time import time
|
|||
import addresses
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from network import dandelion_ins
|
||||
from queues import invQueue
|
||||
from threads import StoppableThread
|
||||
|
||||
|
@ -40,10 +40,10 @@ class InvThread(StoppableThread):
|
|||
@staticmethod
|
||||
def handleLocallyGenerated(stream, hashId):
|
||||
here, here, `addHash` could return `True` / `False` depending on whether dandelion is on/off, and we could use this for further checks.
here, here, `objectChildStem` could return value depending on whether dandelion is enabled or not
|
||||
"""Locally generated inventory items require special handling"""
|
||||
dandelion.instance.addHash(hashId, stream=stream)
|
||||
dandelion_ins.addHash(hashId, stream=stream)
|
||||
for connection in connectionpool.pool.connections():
|
||||
if state.dandelion_enabled and connection != \
|
||||
dandelion.instance.objectChildStem(hashId):
|
||||
if dandelion_ins.enabled and connection != \
|
||||
dandelion_ins.objectChildStem(hashId):
|
||||
continue
|
||||
connection.objectsNewToThem[hashId] = time()
|
||||
|
||||
|
@ -52,7 +52,7 @@ class InvThread(StoppableThread):
|
|||
chunk = []
|
||||
while True:
|
||||
# Dandelion fluff trigger by expiration
|
||||
handleExpiredDandelion(dandelion.instance.expire())
|
||||
handleExpiredDandelion(dandelion_ins.expire(invQueue))
|
||||
try:
|
||||
data = invQueue.get(False)
|
||||
chunk.append((data[0], data[1]))
|
||||
|
@ -75,10 +75,10 @@ class InvThread(StoppableThread):
|
|||
except KeyError:
|
||||
continue
|
||||
try:
|
||||
Hmm, I forgot, the value is actually an int between 0 and 100. We need to rethink. Hmm, I forgot, the value is actually an int between 0 and 100. We need to rethink.
when dandelion is on what would be the probable return value. that help us resolve the fixing comparison between when dandelion is on what would be the probable return value. that help us resolve the fixing comparison between `int` and `bool`
|
||||
if connection == dandelion.instance.objectChildStem(inv[1]):
|
||||
if connection == dandelion_ins.objectChildStem(inv[1]):
|
||||
# Fluff trigger by RNG
|
||||
# auto-ignore if config set to 0, i.e. dandelion is off
|
||||
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
|
||||
if random.randint(1, 100) >= dandelion_ins.enabled: # nosec B311
|
||||
fluffs.append(inv[1])
|
||||
# send a dinv only if the stem node supports dandelion
|
||||
elif connection.services & protocol.NODE_DANDELION > 0:
|
||||
|
@ -105,7 +105,6 @@ class InvThread(StoppableThread):
|
|||
for _ in range(len(chunk)):
|
||||
this check could also be moved down (i.e. this check could also be moved down (i.e. `reRandomiseStems` could check check the time.
|
||||
invQueue.task_done()
|
||||
|
||||
if dandelion.instance.refresh < time():
|
||||
dandelion.instance.reRandomiseStems()
|
||||
dandelion_ins.reRandomiseStems()
|
||||
|
||||
self.stop.wait(1)
|
||||
|
|
|
@ -4,8 +4,8 @@ Module for tracking objects
|
|||
import time
|
||||
from threading import RLock
|
||||
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from network import dandelion_ins
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
|
||||
haveBloom = False
|
||||
|
@ -107,14 +107,14 @@ class ObjectTracker(object):
|
|||
del i.objectsNewToMe[hashid]
|
||||
except KeyError:
|
||||
if streamNumber in i.streams and (
|
||||
not dandelion.instance.hasHash(hashid)
|
||||
or dandelion.instance.objectChildStem(hashid) == i):
|
||||
not dandelion_ins.hasHash(hashid)
|
||||
or dandelion_ins.objectChildStem(hashid) == i):
|
||||
with i.objectsNewToThemLock:
|
||||
i.objectsNewToThem[hashid] = time.time()
|
||||
# update stream number,
|
||||
# which we didn't have when we just received the dinv
|
||||
# also resets expiration of the stem mode
|
||||
dandelion.instance.setHashStream(hashid, streamNumber)
|
||||
dandelion_ins.setHashStream(hashid, streamNumber)
|
||||
|
||||
if i == self:
|
||||
try:
|
||||
|
|
|
@ -15,10 +15,10 @@ import helper_random
|
|||
import l10n
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from bmconfigparser import config
|
||||
from highlevelcrypto import randomBytes
|
||||
from network import dandelion_ins
|
||||
from queues import invQueue, receiveDataQueue, UISignalQueue
|
||||
from tr import _translate
|
||||
|
||||
|
@ -169,7 +169,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
knownnodes.increaseRating(self.destination)
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
dandelion.instance.maybeAddStem(self)
|
||||
dandelion_ins.maybeAddStem(self, invQueue)
|
||||
self.sendAddr()
|
||||
self.sendBigInv()
|
||||
|
||||
|
@ -231,7 +231,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
with self.objectsNewToThemLock:
|
||||
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
|
||||
# don't advertise stem objects on bigInv
|
||||
if dandelion.instance.hasHash(objHash):
|
||||
if dandelion_ins.hasHash(objHash):
|
||||
continue
|
||||
bigInvList[objHash] = 0
|
||||
objectCount = 0
|
||||
|
@ -268,7 +268,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
self.append_write_buf(
|
||||
protocol.assembleVersionMessage(
|
||||
self.destination.host, self.destination.port,
|
||||
connectionpool.pool.streams,
|
||||
connectionpool.pool.streams, dandelion_ins.enabled,
|
||||
False, nodeid=self.nodeid))
|
||||
self.connectedAt = time.time()
|
||||
receiveDataQueue.put(self.destination)
|
||||
|
@ -293,7 +293,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
if host_is_global:
|
||||
knownnodes.addKnownNode(
|
||||
self.streams, self.destination, time.time())
|
||||
dandelion.instance.maybeRemoveStem(self)
|
||||
dandelion_ins.maybeRemoveStem(self)
|
||||
else:
|
||||
self.checkTimeOffsetNotification()
|
||||
if host_is_global:
|
||||
|
@ -319,7 +319,7 @@ class Socks5BMConnection(Socks5Connection, TCPConnection):
|
|||
self.append_write_buf(
|
||||
protocol.assembleVersionMessage(
|
||||
self.destination.host, self.destination.port,
|
||||
connectionpool.pool.streams,
|
||||
connectionpool.pool.streams, dandelion_ins.enabled,
|
||||
False, nodeid=self.nodeid))
|
||||
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
||||
return True
|
||||
|
@ -343,7 +343,7 @@ class Socks4aBMConnection(Socks4aConnection, TCPConnection):
|
|||
self.append_write_buf(
|
||||
protocol.assembleVersionMessage(
|
||||
self.destination.host, self.destination.port,
|
||||
connectionpool.pool.streams,
|
||||
connectionpool.pool.streams, dandelion_ins.enabled,
|
||||
False, nodeid=self.nodeid))
|
||||
self.set_state("bm_header", expectBytes=protocol.Header.size)
|
||||
return True
|
||||
|
|
|
@ -6,9 +6,9 @@ import time
|
|||
import helper_random
|
||||
import protocol
|
||||
import state
|
||||
import dandelion
|
||||
import connectionpool
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
from network import dandelion_ins
|
||||
from threads import StoppableThread
|
||||
|
||||
|
||||
|
@ -41,8 +41,8 @@ class UploadThread(StoppableThread):
|
|||
chunk_count = 0
|
||||
for chunk in request:
|
||||
del i.pendingUpload[chunk]
|
||||
if dandelion.instance.hasHash(chunk) and \
|
||||
i != dandelion.instance.objectChildStem(chunk):
|
||||
if dandelion_ins.hasHash(chunk) and \
|
||||
i != dandelion_ins.objectChildStem(chunk):
|
||||
i.antiIntersectionDelay()
|
||||
self.logger.info(
|
||||
'%s asked for a stem object we didn\'t offer to it.',
|
||||
|
|
|
@ -336,8 +336,8 @@ def assembleAddrMessage(peerList):
|
|||
return retval
|
||||
We can have We can have `dandelion_enabled` default to `True`. Then we can put it at the end of arguments as well.
|
||||
|
||||
|
||||
def assembleVersionMessage(
|
||||
remoteHost, remotePort, participatingStreams, server=False, nodeid=None
|
||||
def assembleVersionMessage( # pylint: disable=too-many-arguments
|
||||
remoteHost, remotePort, participatingStreams, dandelion_enabled=True, server=False, nodeid=None,
|
||||
):
|
||||
"""
|
||||
Construct the payload of a version message,
|
||||
|
@ -350,7 +350,7 @@ def assembleVersionMessage(
|
|||
'>q',
|
||||
NODE_NETWORK
|
||||
| (NODE_SSL if haveSSL(server) else 0)
|
||||
| (NODE_DANDELION if state.dandelion_enabled else 0)
|
||||
| (NODE_DANDELION if dandelion_enabled else 0)
|
||||
)
|
||||
payload += pack('>q', int(time.time()))
|
||||
|
||||
|
@ -374,7 +374,7 @@ def assembleVersionMessage(
|
|||
'>q',
|
||||
NODE_NETWORK
|
||||
| (NODE_SSL if haveSSL(server) else 0)
|
||||
| (NODE_DANDELION if state.dandelion_enabled else 0)
|
||||
| (NODE_DANDELION if dandelion_enabled else 0)
|
||||
)
|
||||
# = 127.0.0.1. This will be ignored by the remote host.
|
||||
# The actual remote connected IP will be used.
|
||||
|
|
|
@ -43,8 +43,6 @@ ownAddresses = {}
|
|||
|
||||
discoveredPeers = {}
|
||||
|
||||
dandelion_enabled = 0
|
||||
|
||||
kivy = False
|
||||
|
||||
kivyapp = None
|
||||
|
|
|
@ -319,16 +319,17 @@ class TestCore(unittest.TestCase):
|
|||
|
||||
def test_version(self):
|
||||
we don't really need an instance of dandelion here. We just need a variable and we pass the same variable to we don't really need an instance of dandelion here. We just need a variable and we pass the same variable to `assembleVersionMessage` and `assertEqual`
Done Done
|
||||
"""check encoding/decoding of the version message"""
|
||||
dandelion_enabled = True
|
||||
# with single stream
|
||||
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1])
|
||||
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1], dandelion_enabled)
|
||||
decoded = self._decode_msg(msg, "IQQiiQlsLv")
|
||||
peer, _, ua, streams = self._decode_msg(msg, "IQQiiQlsLv")[4:]
|
||||
self.assertEqual(
|
||||
peer, Node(11 if state.dandelion_enabled else 3, '127.0.0.1', 8444))
|
||||
peer, Node(11 if dandelion_enabled else 3, '127.0.0.1', 8444))
|
||||
self.assertEqual(ua, '/PyBitmessage:' + softwareVersion + '/')
|
||||
self.assertEqual(streams, [1])
|
||||
# with multiple streams
|
||||
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
|
||||
msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3], dandelion_enabled)
|
||||
decoded = self._decode_msg(msg, "IQQiiQlslv")
|
||||
peer, _, ua = decoded[4:7]
|
||||
streams = decoded[7:]
|
||||
|
|
what we can do here is to move this check into the
Dandelion.hasHash
method. Then bmproto doesn't need to check.we can move into Dandelion.hasHash. but
hasHash
here used in some checks in iteration. i think that could not work same as here, like return direct True.The easiest solution is just to use the same approach in
if
as infor
. It would result in dummy loops but my guess is that it's not a big performance hit. It could be optimised for performance later if needed.