|
|
|
@ -1,3 +1,7 @@ |
|
|
|
|
""" |
|
|
|
|
src/network/objectracker.py |
|
|
|
|
=========================== |
|
|
|
|
""" |
|
|
|
|
import time |
|
|
|
|
from threading import RLock |
|
|
|
|
|
|
|
|
@ -27,6 +31,7 @@ missingObjects = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ObjectTracker(object): |
|
|
|
|
"""Object tracker mixin""" |
|
|
|
|
invCleanPeriod = 300 |
|
|
|
|
invInitialCapacity = 50000 |
|
|
|
|
invErrorRate = 0.03 |
|
|
|
@ -42,21 +47,24 @@ class ObjectTracker(object): |
|
|
|
|
self.lastCleaned = time.time() |
|
|
|
|
|
|
|
|
|
def initInvBloom(self): |
|
|
|
|
"""Init bloom filter for tracking. WIP.""" |
|
|
|
|
if haveBloom: |
|
|
|
|
# lock? |
|
|
|
|
self.invBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, |
|
|
|
|
error_rate=ObjectTracker.invErrorRate) |
|
|
|
|
|
|
|
|
|
def initAddrBloom(self): |
|
|
|
|
"""Init bloom filter for tracking addrs, WIP. This either needs to be moved to addrthread.py or removed.""" |
|
|
|
|
if haveBloom: |
|
|
|
|
# lock? |
|
|
|
|
self.addrBloom = BloomFilter(capacity=ObjectTracker.invInitialCapacity, |
|
|
|
|
error_rate=ObjectTracker.invErrorRate) |
|
|
|
|
|
|
|
|
|
def clean(self): |
|
|
|
|
"""Clean up tracking to prevent memory bloat""" |
|
|
|
|
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: |
|
|
|
|
if haveBloom: |
|
|
|
|
if len(missingObjects) == 0: |
|
|
|
|
if missingObjects == 0: |
|
|
|
|
self.initInvBloom() |
|
|
|
|
self.initAddrBloom() |
|
|
|
|
else: |
|
|
|
@ -67,12 +75,13 @@ class ObjectTracker(object): |
|
|
|
|
self.lastCleaned = time.time() |
|
|
|
|
|
|
|
|
|
def hasObj(self, hashid): |
|
|
|
|
"""Do we already have object?""" |
|
|
|
|
if haveBloom: |
|
|
|
|
return hashid in self.invBloom |
|
|
|
|
else: |
|
|
|
|
return hashid in self.objectsNewToMe |
|
|
|
|
return hashid in self.objectsNewToMe |
|
|
|
|
|
|
|
|
|
def handleReceivedInventory(self, hashId): |
|
|
|
|
"""Handling received inventory""" |
|
|
|
|
if haveBloom: |
|
|
|
|
self.invBloom.add(hashId) |
|
|
|
|
try: |
|
|
|
@ -85,6 +94,7 @@ class ObjectTracker(object): |
|
|
|
|
self.objectsNewToMe[hashId] = True |
|
|
|
|
|
|
|
|
|
def handleReceivedObject(self, streamNumber, hashid): |
|
|
|
|
"""Handling received object""" |
|
|
|
|
for i in network.connectionpool.BMConnectionPool().inboundConnections.values( |
|
|
|
|
) + network.connectionpool.BMConnectionPool().outboundConnections.values(): |
|
|
|
|
if not i.fullyEstablished: |
|
|
|
@ -109,25 +119,12 @@ class ObjectTracker(object): |
|
|
|
|
self.objectsNewToMe.setLastObject() |
|
|
|
|
|
|
|
|
|
def hasAddr(self, addr): |
|
|
|
|
"""WIP, should be moved to addrthread.py or removed""" |
|
|
|
|
if haveBloom: |
|
|
|
|
return addr in self.invBloom |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def addAddr(self, hashid): |
|
|
|
|
"""WIP, should be moved to addrthread.py or removed""" |
|
|
|
|
if haveBloom: |
|
|
|
|
self.addrBloom.add(hashid) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
addr sending -> per node upload queue, and flush every minute or so |
|
|
|
|
inv sending -> if not in bloom, inv immediately, otherwise put into a per node upload queue |
|
|
|
|
and flush every minute or so |
|
|
|
|
data sending -> a simple queue |
|
|
|
|
no bloom |
|
|
|
|
- if inv arrives |
|
|
|
|
- if we don't have it, add tracking and download queue |
|
|
|
|
- if we do have it, remove from tracking |
|
|
|
|
tracking downloads |
|
|
|
|
- per node hash of items the node has but we don't |
|
|
|
|
tracking inv |
|
|
|
|
- per node hash of items that neither the remote node nor we have |
|
|
|
|
""" |
|
|
|
|