Download thread updates
- now tracks downloads globally too, so it doesn't request the same object from multiple peers at the same time - retries at the earliest every minute - stops trying to download an object after an hour - minor fixes in retrying downloading invalid objects
This commit is contained in:
parent
846fced0a2
commit
fc19e4119a
|
@ -274,12 +274,12 @@ class Main:
|
||||||
state.invThread = InvThread()
|
state.invThread = InvThread()
|
||||||
state.invThread.daemon = True
|
state.invThread.daemon = True
|
||||||
state.invThread.start()
|
state.invThread.start()
|
||||||
downloadThread = DownloadThread()
|
|
||||||
downloadThread.daemon = True
|
|
||||||
downloadThread.start()
|
|
||||||
state.addrThread = AddrThread()
|
state.addrThread = AddrThread()
|
||||||
state.addrThread.daemon = True
|
state.addrThread.daemon = True
|
||||||
state.addrThread.start()
|
state.addrThread.start()
|
||||||
|
state.downloadThread = DownloadThread()
|
||||||
|
state.downloadThread.daemon = True
|
||||||
|
state.downloadThread.start()
|
||||||
|
|
||||||
connectToStream(1)
|
connectToStream(1)
|
||||||
|
|
||||||
|
|
|
@ -275,47 +275,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
self.object.checkEOLSanity()
|
self.object.checkEOLSanity()
|
||||||
self.object.checkAlreadyHave()
|
self.object.checkAlreadyHave()
|
||||||
except (BMObjectExpiredError, BMObjectAlreadyHaveError) as e:
|
except (BMObjectExpiredError, BMObjectAlreadyHaveError) as e:
|
||||||
for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \
|
BMProto.stopDownloadingObject(self.object.inventoryHash)
|
||||||
network.connectionpool.BMConnectionPool().outboundConnections.values():
|
|
||||||
try:
|
|
||||||
with connection.objectsNewToThemLock:
|
|
||||||
del connection.objectsNewToThem[self.object.inventoryHash]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
with connection.objectsNewToMeLock:
|
|
||||||
del connection.objectsNewToMe[self.object.inventoryHash]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
raise e
|
raise e
|
||||||
try:
|
try:
|
||||||
self.object.checkStream()
|
self.object.checkStream()
|
||||||
except (BMObjectUnwantedStreamError,) as e:
|
except (BMObjectUnwantedStreamError,) as e:
|
||||||
for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \
|
BMProto.stopDownloadingObject(self.object.inventoryHash, BMConfigParser().get("inventory", "acceptmismatch"))
|
||||||
network.connectionpool.BMConnectionPool().outboundConnections.values():
|
|
||||||
try:
|
|
||||||
with connection.objectsNewToMeLock:
|
|
||||||
del connection.objectsNewToMe[self.object.inventoryHash]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
if not BMConfigParser().get("inventory", "acceptmismatch"):
|
|
||||||
try:
|
|
||||||
with connection.objectsNewToThemLock:
|
|
||||||
del connection.objectsNewToThem[self.object.inventoryHash]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
if not BMConfigParser().get("inventory", "acceptmismatch"):
|
if not BMConfigParser().get("inventory", "acceptmismatch"):
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
self.object.checkObjectByType()
|
try:
|
||||||
|
self.object.checkObjectByType()
|
||||||
|
objectProcessorQueue.put((self.object.objectType, self.object.data))
|
||||||
|
except BMObjectInvalidError as e:
|
||||||
|
BMProto.stopDownloadingObject(self.object.inventoryHash, True)
|
||||||
|
|
||||||
Inventory()[self.object.inventoryHash] = (
|
Inventory()[self.object.inventoryHash] = (
|
||||||
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
|
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
|
||||||
objectProcessorQueue.put((self.object.objectType,self.object.data))
|
|
||||||
#DownloadQueue().task_done(self.object.inventoryHash)
|
|
||||||
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
|
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
|
||||||
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
|
|
||||||
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _decode_addr(self):
|
def _decode_addr(self):
|
||||||
|
@ -471,6 +448,22 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
retval += protocol.CreatePacket('addr', payload)
|
retval += protocol.CreatePacket('addr', payload)
|
||||||
return retval
|
return retval
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def stopDownloadingObject(hashId, forwardAnyway=False):
|
||||||
|
for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \
|
||||||
|
network.connectionpool.BMConnectionPool().outboundConnections.values():
|
||||||
|
try:
|
||||||
|
with connection.objectsNewToMeLock:
|
||||||
|
del connection.objectsNewToMe[hashId]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
if not forwardAnyway:
|
||||||
|
try:
|
||||||
|
with connection.objectsNewToThemLock:
|
||||||
|
del connection.objectsNewToThem[hashId]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
def handle_close(self, reason=None):
|
def handle_close(self, reason=None):
|
||||||
self.set_state("close")
|
self.set_state("close")
|
||||||
if reason is None:
|
if reason is None:
|
||||||
|
|
|
@ -44,7 +44,7 @@ class BMConnectionPool(object):
|
||||||
del i.objectsNewToMe[hashid]
|
del i.objectsNewToMe[hashid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
with i.objectsNewToThemLock:
|
with i.objectsNewToThemLock:
|
||||||
i.objectsNewToThem[hashid] = True
|
i.objectsNewToThem[hashid] = time.time()
|
||||||
if i == connection:
|
if i == connection:
|
||||||
try:
|
try:
|
||||||
with i.objectsNewToThemLock:
|
with i.objectsNewToThemLock:
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
import addresses
|
import addresses
|
||||||
#from bmconfigparser import BMConfigParser
|
#from bmconfigparser import BMConfigParser
|
||||||
|
@ -9,26 +10,38 @@ from network.connectionpool import BMConnectionPool
|
||||||
import protocol
|
import protocol
|
||||||
|
|
||||||
class DownloadThread(threading.Thread, StoppableThread):
|
class DownloadThread(threading.Thread, StoppableThread):
|
||||||
maxPending = 500
|
maxPending = 50
|
||||||
requestChunk = 1000
|
requestChunk = 100
|
||||||
|
requestTimeout = 60
|
||||||
|
cleanInterval = 60
|
||||||
|
requestExpires = 600
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="DownloadThread")
|
threading.Thread.__init__(self, name="DownloadThread")
|
||||||
self.initStop()
|
self.initStop()
|
||||||
self.name = "DownloadThread"
|
self.name = "DownloadThread"
|
||||||
logger.info("init download thread")
|
logger.info("init download thread")
|
||||||
|
self.pending = {}
|
||||||
|
self.lastCleaned = time.time()
|
||||||
|
|
||||||
|
def cleanPending(self):
|
||||||
|
deadline = time.time() - DownloadThread.requestExpires
|
||||||
|
self.pending = {k: v for k, v in self.pending.iteritems() if v >= deadline}
|
||||||
|
self.lastCleaned = time.time()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while not self._stopped:
|
while not self._stopped:
|
||||||
requested = 0
|
requested = 0
|
||||||
for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
||||||
# this may take a while, but it needs a consistency so I think it's better
|
now = time.time()
|
||||||
|
timedOut = now - DownloadThread.requestTimeout
|
||||||
|
# this may take a while, but it needs a consistency so I think it's better to lock a bigger chunk
|
||||||
with i.objectsNewToMeLock:
|
with i.objectsNewToMeLock:
|
||||||
downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if not v)))
|
downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in self.pending and self.pending[k] > timedOut)))
|
||||||
if downloadPending >= DownloadThread.maxPending:
|
if downloadPending >= DownloadThread.maxPending:
|
||||||
continue
|
continue
|
||||||
# keys with True values in the dict
|
# keys with True values in the dict
|
||||||
request = list((k for k, v in i.objectsNewToMe.iteritems() if v))
|
request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in self.pending or self.pending[k] < timedOut))
|
||||||
if not request:
|
if not request:
|
||||||
continue
|
continue
|
||||||
if len(request) > DownloadThread.requestChunk - downloadPending:
|
if len(request) > DownloadThread.requestChunk - downloadPending:
|
||||||
|
@ -36,9 +49,13 @@ class DownloadThread(threading.Thread, StoppableThread):
|
||||||
# mark them as pending
|
# mark them as pending
|
||||||
for k in request:
|
for k in request:
|
||||||
i.objectsNewToMe[k] = False
|
i.objectsNewToMe[k] = False
|
||||||
|
self.pending[k] = now
|
||||||
|
|
||||||
payload = addresses.encodeVarint(len(request)) + ''.join(request)
|
payload = addresses.encodeVarint(len(request)) + ''.join(request)
|
||||||
i.writeQueue.put(protocol.CreatePacket('getdata', payload))
|
i.writeQueue.put(protocol.CreatePacket('getdata', payload))
|
||||||
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
|
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
|
||||||
requested += len(request)
|
requested += len(request)
|
||||||
self.stop.wait(1)
|
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval:
|
||||||
|
self.cleanPending()
|
||||||
|
if not requested:
|
||||||
|
self.stop.wait(1)
|
||||||
|
|
|
@ -28,6 +28,7 @@ class ObjectTracker(object):
|
||||||
invCleanPeriod = 300
|
invCleanPeriod = 300
|
||||||
invInitialCapacity = 50000
|
invInitialCapacity = 50000
|
||||||
invErrorRate = 0.03
|
invErrorRate = 0.03
|
||||||
|
trackingExpires = 3600
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.objectsNewToMe = {}
|
self.objectsNewToMe = {}
|
||||||
|
@ -62,9 +63,9 @@ class ObjectTracker(object):
|
||||||
with self.objectsNewToMeLock:
|
with self.objectsNewToMeLock:
|
||||||
tmp = self.objectsNewToMe.copy()
|
tmp = self.objectsNewToMe.copy()
|
||||||
self.objectsNewToMe = tmp
|
self.objectsNewToMe = tmp
|
||||||
|
deadline = time.time() - ObjectTracker.trackingExpires
|
||||||
with self.objectsNewToThemLock:
|
with self.objectsNewToThemLock:
|
||||||
tmp = self.objectsNewToThem.copy()
|
self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline}
|
||||||
self.objectsNewToThem = tmp
|
|
||||||
self.lastCleaned = time.time()
|
self.lastCleaned = time.time()
|
||||||
|
|
||||||
def hasObj(self, hashid):
|
def hasObj(self, hashid):
|
||||||
|
|
|
@ -70,7 +70,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
|
||||||
with connection.objectsNewToThemLock:
|
with connection.objectsNewToThemLock:
|
||||||
for objHash in Inventory().unexpired_hashes_by_stream(stream):
|
for objHash in Inventory().unexpired_hashes_by_stream(stream):
|
||||||
bigInvList[objHash] = 0
|
bigInvList[objHash] = 0
|
||||||
connection.objectsNewToThem[objHash] = True
|
connection.objectsNewToThem[objHash] = time.time()
|
||||||
objectCount = 0
|
objectCount = 0
|
||||||
payload = b''
|
payload = b''
|
||||||
# Now let us start appending all of these hashes together. They will be
|
# Now let us start appending all of these hashes together. They will be
|
||||||
|
|
|
@ -25,6 +25,7 @@ maximumNumberOfHalfOpenConnections = 0
|
||||||
|
|
||||||
invThread = None
|
invThread = None
|
||||||
addrThread = None
|
addrThread = None
|
||||||
|
downloadThread = None
|
||||||
|
|
||||||
ownAddresses = {}
|
ownAddresses = {}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user