This repository has been archived on 2024-12-01. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2024-12-01/src/network/objectracker.py
Peter Surda 3aa6f386db
Dandelion fixes
- dandelion would always think there is a cycle and trigger fluff
- cycle fluff trigger didn't correctly re-download and re-announce the
object. Now it remembers between (d)inv and object commands that it's
in a fluff trigger phase.
2017-11-18 09:47:17 +01:00

116 lines
3.6 KiB
Python

from Queue import Queue
import time
from threading import RLock
from debug import logger
from inventory import Inventory
from network.dandelion import Dandelion
from state import missingObjects
haveBloom = False
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
haveBloom = True
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
haveBloom = True
except ImportError:
pass
# it isn't actually implemented yet so no point in turning it on
haveBloom = False
class ObjectTracker(object):
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
trackingExpires = 3600
def __init__(self):
self.objectsNewToMe = {}
self.objectsNewToMeLock = RLock()
self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.initInvBloom()
self.initAddrBloom()
self.lastCleaned = time.time()
def initInvBloom(self):
if haveBloom:
# lock?
self.invBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def initAddrBloom(self):
if haveBloom:
# lock?
self.addrBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def clean(self):
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom:
# FIXME
if PendingDownloadQueue().size() == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
with self.objectsNewToMeLock:
tmp = self.objectsNewToMe.copy()
self.objectsNewToMe = tmp
deadline = time.time() - ObjectTracker.trackingExpires
with self.objectsNewToThemLock:
self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline}
self.lastCleaned = time.time()
def hasObj(self, hashid):
if haveBloom:
return hashid in self.invBloom
else:
return hashid in self.objectsNewToMe
def handleReceivedInventory(self, hashId):
if haveBloom:
self.invBloom.add(hashId)
try:
with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId]
except KeyError:
pass
# Fluff trigger by cycle detection
if hashId not in Inventory() or hashId in Dandelion().hashMap:
if hashId in Dandelion().hashMap:
Dandelion().fluffTrigger(hashId)
if hashId not in missingObjects:
missingObjects[hashId] = time.time()
with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True
def hasAddr(self, addr):
if haveBloom:
return addr in self.invBloom
def addAddr(self, hashid):
if haveBloom:
self.addrBloom.add(hashid)
# addr sending -> per node upload queue, and flush every minute or so
# inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue and flush every minute or so
# data sending -> a simple queue
# no bloom
# - if inv arrives
# - if we don't have it, add tracking and download queue
# - if we do have it, remove from tracking
# tracking downloads
# - per node hash of items the node has but we don't
# tracking inv
# - per node hash of items that neither the remote node nor we have
#