This repository has been archived on 2024-12-10. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2024-12-10/src/network/objectracker.py

137 lines
4.3 KiB
Python

"""
Module for tracking objects
"""
import time
from threading import RLock
import connectionpool
from network import dandelion_ins
from randomtrackingdict import RandomTrackingDict
haveBloom = False
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
haveBloom = True
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
haveBloom = True
except ImportError:
pass
# it isn't actually implemented yet so no point in turning it on
haveBloom = False
# tracking pending downloads globally, for stats
missingObjects = {}
class ObjectTracker(object):
"""Object tracker mixin"""
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
trackingExpires = 3600
initialTimeOffset = 60
def __init__(self):
self.objectsNewToMe = RandomTrackingDict()
self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.initInvBloom()
self.initAddrBloom()
self.lastCleaned = time.time()
def initInvBloom(self):
"""Init bloom filter for tracking. WIP."""
if haveBloom:
# lock?
self.invBloom = BloomFilter(
capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def initAddrBloom(self):
"""Init bloom filter for tracking addrs, WIP.
This either needs to be moved to addrthread.py or removed."""
if haveBloom:
# lock?
self.addrBloom = BloomFilter(
capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def clean(self):
"""Clean up tracking to prevent memory bloat"""
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom:
if missingObjects == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
deadline = time.time() - ObjectTracker.trackingExpires
with self.objectsNewToThemLock:
self.objectsNewToThem = {
k: v
for k, v in self.objectsNewToThem.iteritems()
if v >= deadline}
self.lastCleaned = time.time()
def hasObj(self, hashid):
"""Do we already have object?"""
if haveBloom:
return hashid in self.invBloom
return hashid in self.objectsNewToMe
def handleReceivedInventory(self, hashId):
"""Handling received inventory"""
if haveBloom:
self.invBloom.add(hashId)
try:
with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId]
except KeyError:
pass
if hashId not in missingObjects:
missingObjects[hashId] = time.time()
self.objectsNewToMe[hashId] = True
def handleReceivedObject(self, streamNumber, hashid):
"""Handling received object"""
for i in connectionpool.pool.connections():
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams and (
not dandelion_ins.hasHash(hashid)
or dandelion_ins.objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time.time()
# update stream number,
# which we didn't have when we just received the dinv
# also resets expiration of the stem mode
dandelion_ins.setHashStream(hashid, streamNumber)
if i == self:
try:
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError:
pass
self.objectsNewToMe.setLastObject()
def hasAddr(self, addr):
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
return addr in self.invBloom
return None
def addAddr(self, hashid):
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
self.addrBloom.add(hashid)