Merge branch 'v0.6' into py3

This commit is contained in:
Kashiko Koibumi 2024-05-30 18:38:31 +09:00
commit 35abdf204a
No known key found for this signature in database
GPG Key ID: 8F06E069E37C40C4
10 changed files with 35 additions and 32 deletions

View File

@ -14,7 +14,6 @@ def start(config, state):
from .announcethread import AnnounceThread
from network import connectionpool
from .addrthread import AddrThread
from .dandelion import Dandelion
from .downloadthread import DownloadThread
from .invthread import InvThread
from .networkthread import BMNetworkThread
@ -23,8 +22,6 @@ def start(config, state):
from .uploadthread import UploadThread
readKnownNodes()
# init, needs to be early because other thread may access it early
state.Dandelion = Dandelion()
connectionpool.pool.connectToStream(1)
for thread in (
BMNetworkThread(), InvThread(), AddrThread(),

View File

@ -7,6 +7,7 @@ import time
import protocol
import state
import network.connectionpool # use long name to address recursive import
import dandelion
from highlevelcrypto import calculateInventoryHash
logger = logging.getLogger('default')
@ -112,7 +113,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
or advertise it unnecessarily)
"""
# if it's a stem duplicate, pretend we don't have it
if state.Dandelion.hasHash(self.inventoryHash):
if dandelion.instance.hasHash(self.inventoryHash):
return
if self.inventoryHash in state.Inventory:
raise BMObjectAlreadyHaveError()

View File

@ -17,6 +17,7 @@ from network import knownnodes
import protocol
import state
import network.connectionpool # use long name to address recursive import
import dandelion
from bmconfigparser import config
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
@ -350,27 +351,27 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.pendingUpload[i] = now
return True
def _command_inv(self, dandelion=False):
def _command_inv(self, extend_dandelion_stem=False):
"""
Common inv announce implementation:
both inv and dinv depending on *dandelion* kwarg
both inv and dinv depending on *extend_dandelion_stem* kwarg
"""
items = self.decode_payload_content("l32s")
if len(items) > protocol.MAX_OBJECT_COUNT:
logger.error(
'Too many items in %sinv message!', 'd' if dandelion else '')
'Too many items in %sinv message!', 'd' if extend_dandelion_stem else '')
raise BMProtoExcessiveDataError()
# ignore dinv if dandelion turned off
if dandelion and not state.dandelion_enabled:
if extend_dandelion_stem and not state.dandelion_enabled:
return True
for i in items:
if i in state.Inventory and not state.Dandelion.hasHash(i):
if i in state.Inventory and not dandelion.instance.hasHash(i):
continue
if dandelion and not state.Dandelion.hasHash(i):
state.Dandelion.addHash(i, self)
if extend_dandelion_stem and not dandelion.instance.hasHash(i):
dandelion.instance.addHash(i, self)
self.handleReceivedInventory(i)
return True
@ -436,9 +437,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError:
pass
if self.object.inventoryHash in state.Inventory and state.Dandelion.hasHash(
if self.object.inventoryHash in state.Inventory and dandelion.instance.hasHash(
self.object.inventoryHash):
state.Dandelion.removeHash(
dandelion.instance.removeHash(
self.object.inventoryHash, "cycle detection")
if six.PY2:

View File

@ -194,3 +194,6 @@ class Dandelion: # pylint: disable=old-style-class
self.nodeMap = {}
# hashMap stays to cater for pending stems
self.refresh = time() + REASSIGN_INTERVAL
instance = Dandelion()

View File

@ -10,6 +10,7 @@ import protocol
from network import connectionpool
from .objectracker import missingObjects
from .threads import StoppableThread
import dandelion
class DownloadThread(StoppableThread):
@ -60,7 +61,7 @@ class DownloadThread(StoppableThread):
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in state.Inventory and not state.Dandelion.hasHash(chunk):
if chunk in state.Inventory and not dandelion.instance.hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:

View File

@ -9,6 +9,7 @@ import addresses
import protocol
import state
from network import connectionpool
import dandelion
from queues import invQueue
from .threads import StoppableThread
@ -39,10 +40,10 @@ class InvThread(StoppableThread):
@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
state.Dandelion.addHash(hashId, stream=stream)
dandelion.instance.addHash(hashId, stream=stream)
for connection in connectionpool.pool.connections():
if state.dandelion_enabled and connection != \
state.Dandelion.objectChildStem(hashId):
dandelion.instance.objectChildStem(hashId):
continue
connection.objectsNewToThem[hashId] = time()
@ -51,7 +52,7 @@ class InvThread(StoppableThread):
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(state.Dandelion.expire())
handleExpiredDandelion(dandelion.instance.expire())
try:
data = invQueue.get(False)
chunk.append((data[0], data[1]))
@ -74,7 +75,7 @@ class InvThread(StoppableThread):
except KeyError:
continue
try:
if connection == state.Dandelion.objectChildStem(inv[1]):
if connection == dandelion.instance.objectChildStem(inv[1]):
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion_enabled: # nosec B311
@ -104,7 +105,7 @@ class InvThread(StoppableThread):
for _ in range(len(chunk)):
invQueue.task_done()
if state.Dandelion.refresh < time():
state.Dandelion.reRandomiseStems()
if dandelion.instance.refresh < time():
dandelion.instance.reRandomiseStems()
self.stop.wait(1)

View File

@ -5,8 +5,8 @@ import time
from threading import RLock
import six
import state
import network.connectionpool # use long name to address recursive import
import dandelion
from randomtrackingdict import RandomTrackingDict
haveBloom = False
@ -111,14 +111,14 @@ class ObjectTracker(object):
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams and (
not state.Dandelion.hasHash(hashid)
or state.Dandelion.objectChildStem(hashid) == i):
not dandelion.instance.hasHash(hashid)
or dandelion.instance.objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid_bytes] = time.time()
# update stream number,
# which we didn't have when we just received the dinv
# also resets expiration of the stem mode
state.Dandelion.setHashStream(hashid, streamNumber)
dandelion.instance.setHashStream(hashid, streamNumber)
if i == self:
try:

View File

@ -17,6 +17,7 @@ import l10n
import protocol
import state
import network.connectionpool # use long name to address recursive import
import dandelion
from bmconfigparser import config
from highlevelcrypto import randomBytes
from queues import invQueue, receiveDataQueue, UISignalQueue
@ -175,7 +176,7 @@ class TCPConnection(BMProto, TLSDispatcher):
knownnodes.increaseRating(self.destination)
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
state.Dandelion.maybeAddStem(self)
dandelion.instance.maybeAddStem(self)
self.sendAddr()
self.sendBigInv()
@ -237,7 +238,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with self.objectsNewToThemLock:
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
# don't advertise stem objects on bigInv
if state.Dandelion.hasHash(objHash):
if dandelion.instance.hasHash(objHash):
continue
bigInvList[objHash] = 0
objectCount = 0
@ -299,7 +300,7 @@ class TCPConnection(BMProto, TLSDispatcher):
if host_is_global:
knownnodes.addKnownNode(
self.streams, self.destination, time.time())
state.Dandelion.maybeRemoveStem(self)
dandelion.instance.maybeRemoveStem(self)
else:
self.checkTimeOffsetNotification()
if host_is_global:

View File

@ -7,6 +7,7 @@ import helper_random
import protocol
import state
from network import connectionpool
import dandelion
from randomtrackingdict import RandomTrackingDict
from .threads import StoppableThread
@ -40,8 +41,8 @@ class UploadThread(StoppableThread):
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if state.Dandelion.hasHash(chunk) and \
i != state.Dandelion.objectChildStem(chunk):
if dandelion.instance.hasHash(chunk) and \
i != dandelion.instance.objectChildStem(chunk):
i.antiIntersectionDelay()
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',

View File

@ -96,6 +96,3 @@ class Placeholder(object): # pylint:disable=too-few-public-methods
Inventory = Placeholder("Inventory")
Dandelion = Placeholder("Dandelion")