This repository has been archived on 2025-01-19. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2025-01-19/src/network/objectracker.py

140 lines
4.5 KiB
Python
Raw Normal View History

2019-09-06 12:28:06 +02:00
"""
2019-12-19 12:24:53 +01:00
Module for tracking objects
2019-09-06 12:28:06 +02:00
"""
import time
from threading import RLock
import state
import network.connectionpool as connectionpool
2018-02-01 12:20:41 +01:00
from randomtrackingdict import RandomTrackingDict
haveBloom = False
try:
# pybloomfiltermmap
from pybloomfilter import BloomFilter
haveBloom = True
except ImportError:
try:
# pybloom
from pybloom import BloomFilter
haveBloom = True
except ImportError:
pass
# it isn't actually implemented yet so no point in turning it on
haveBloom = False
# tracking pending downloads globally, for stats
missingObjects = {}
class ObjectTracker(object):
2019-09-06 12:28:06 +02:00
"""Object tracker mixin"""
invCleanPeriod = 300
invInitialCapacity = 50000
invErrorRate = 0.03
trackingExpires = 3600
2018-01-02 22:20:33 +01:00
initialTimeOffset = 60
def __init__(self):
self.objectsNewToMe = RandomTrackingDict()
self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.initInvBloom()
self.initAddrBloom()
self.lastCleaned = time.time()
def initInvBloom(self):
2019-09-06 12:28:06 +02:00
"""Init bloom filter for tracking. WIP."""
if haveBloom:
# lock?
2019-12-19 12:24:53 +01:00
self.invBloom = BloomFilter(
capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def initAddrBloom(self):
2019-12-19 12:24:53 +01:00
"""Init bloom filter for tracking addrs, WIP.
This either needs to be moved to addrthread.py or removed."""
if haveBloom:
# lock?
2019-12-19 12:24:53 +01:00
self.addrBloom = BloomFilter(
capacity=ObjectTracker.invInitialCapacity,
error_rate=ObjectTracker.invErrorRate)
def clean(self):
2019-09-06 12:28:06 +02:00
"""Clean up tracking to prevent memory bloat"""
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom:
2019-09-06 12:28:06 +02:00
if missingObjects == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
deadline = time.time() - ObjectTracker.trackingExpires
with self.objectsNewToThemLock:
2019-12-19 12:24:53 +01:00
self.objectsNewToThem = {
k: v
2024-05-13 01:06:24 +02:00
for k, v in self.objectsNewToThem.items()
2019-12-19 12:24:53 +01:00
if v >= deadline}
self.lastCleaned = time.time()
def hasObj(self, hashid):
2019-09-06 12:28:06 +02:00
"""Do we already have object?"""
hashid_bytes = bytes(hashid)
if haveBloom:
return hashid_bytes in self.invBloom
return hashid_bytes in self.objectsNewToMe
def handleReceivedInventory(self, hashId):
2019-09-06 12:28:06 +02:00
"""Handling received inventory"""
hashId_bytes = bytes(hashId)
if haveBloom:
self.invBloom.add(hashId_bytes)
try:
with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId_bytes]
except KeyError:
pass
if hashId_bytes not in missingObjects:
missingObjects[hashId_bytes] = time.time()
self.objectsNewToMe[hashId] = True
def handleReceivedObject(self, streamNumber, hashid):
2019-09-06 12:28:06 +02:00
"""Handling received object"""
2024-05-18 04:12:49 +02:00
hashid_bytes = bytes(hashid)
for i in connectionpool.pool.connections():
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
except KeyError:
2019-09-05 16:39:11 +02:00
if streamNumber in i.streams and (
not state.Dandelion.hasHash(hashid)
or state.Dandelion.objectChildStem(hashid) == i):
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid_bytes] = time.time()
2019-12-19 12:24:53 +01:00
# update stream number,
# which we didn't have when we just received the dinv
# also resets expiration of the stem mode
state.Dandelion.setHashStream(hashid, streamNumber)
if i == self:
try:
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid_bytes]
except KeyError:
pass
self.objectsNewToMe.setLastObject()
def hasAddr(self, addr):
2019-09-06 12:28:06 +02:00
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
return addr in self.invBloom
2019-09-06 12:28:06 +02:00
return None
def addAddr(self, hashid):
2019-09-06 12:28:06 +02:00
"""WIP, should be moved to addrthread.py or removed"""
if haveBloom:
self.addrBloom.add(bytes(hashid))