Dandelion fixes
- expiration wasn't handled correctly - objects with no child stems never expired. While this is better for anonymity, it can cause objects getting stuck - upon expiration the nodes weren't marked as not having the object, causing it to not be advertised - I haven't tested it but at least I don't think can make things worse
This commit is contained in:
parent
e07cd1462e
commit
465a276c02
|
@ -122,11 +122,13 @@ class Dandelion():
|
||||||
def expire(self):
|
def expire(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
deadline = time()
|
deadline = time()
|
||||||
# only expire those that have a child node, i.e. those without a child not will stick around
|
toDelete = [[v.stream, k, v.child]
|
||||||
toDelete = [[v.stream, k, v.child] for k, v in self.hashMap.iteritems() if v.timeout < deadline and v.child]
|
for k, v in self.hashMap.iteritems()
|
||||||
|
if v.timeout < deadline]
|
||||||
for row in toDelete:
|
for row in toDelete:
|
||||||
self.removeHash(row[1], 'expiration')
|
self.removeHash(row[1], 'expiration')
|
||||||
invQueue.put((row[0], row[1], row[2]))
|
invQueue.put((row[0], row[1], row[2]))
|
||||||
|
return toDelete
|
||||||
|
|
||||||
def reRandomiseStems(self):
|
def reRandomiseStems(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
|
|
@ -12,6 +12,27 @@ from queues import invQueue
|
||||||
import protocol
|
import protocol
|
||||||
import state
|
import state
|
||||||
|
|
||||||
|
|
||||||
|
def handleExpiredDandelion(expired):
|
||||||
|
"""For expired dandelion objects, mark all remotes as not having
|
||||||
|
the object"""
|
||||||
|
if not expired:
|
||||||
|
return
|
||||||
|
for i in \
|
||||||
|
BMConnectionPool().inboundConnections.values() + \
|
||||||
|
BMConnectionPool().outboundConnections.values():
|
||||||
|
if not i.fullyEstablished:
|
||||||
|
continue
|
||||||
|
for x in expired:
|
||||||
|
streamNumber, hashid, _ = x
|
||||||
|
try:
|
||||||
|
del i.objectsNewToMe[hashid]
|
||||||
|
except KeyError:
|
||||||
|
if streamNumber in i.streams:
|
||||||
|
with i.objectsNewToThemLock:
|
||||||
|
i.objectsNewToThem[hashid] = time()
|
||||||
|
|
||||||
|
|
||||||
class InvThread(threading.Thread, StoppableThread):
|
class InvThread(threading.Thread, StoppableThread):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="InvBroadcaster")
|
threading.Thread.__init__(self, name="InvBroadcaster")
|
||||||
|
@ -20,8 +41,9 @@ class InvThread(threading.Thread, StoppableThread):
|
||||||
|
|
||||||
def handleLocallyGenerated(self, stream, hashId):
|
def handleLocallyGenerated(self, stream, hashId):
|
||||||
Dandelion().addHash(hashId, stream=stream)
|
Dandelion().addHash(hashId, stream=stream)
|
||||||
for connection in BMConnectionPool().inboundConnections.values() + \
|
for connection in \
|
||||||
BMConnectionPool().outboundConnections.values():
|
BMConnectionPool().inboundConnections.values() + \
|
||||||
|
BMConnectionPool().outboundConnections.values():
|
||||||
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
|
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
|
||||||
continue
|
continue
|
||||||
connection.objectsNewToThem[hashId] = time()
|
connection.objectsNewToThem[hashId] = time()
|
||||||
|
@ -31,7 +53,7 @@ class InvThread(threading.Thread, StoppableThread):
|
||||||
chunk = []
|
chunk = []
|
||||||
while True:
|
while True:
|
||||||
# Dandelion fluff trigger by expiration
|
# Dandelion fluff trigger by expiration
|
||||||
Dandelion().expire()
|
handleExpiredDandelion(Dandelion().expire())
|
||||||
try:
|
try:
|
||||||
data = invQueue.get(False)
|
data = invQueue.get(False)
|
||||||
chunk.append((data[0], data[1]))
|
chunk.append((data[0], data[1]))
|
||||||
|
|
Reference in New Issue
Block a user