From fc19e4119ad25e1436f71f63e64bab5e66dfeeb9 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 5 Jul 2017 09:25:49 +0200 Subject: [PATCH] Download thread updates - now tracks downloads globally too, so it doesn't request the same object from multiple peers at the same time - retries at the earliest every minute - stops trying to download an object after an hour - minor fixes in retrying downloading invalid objects --- src/bitmessagemain.py | 6 ++-- src/network/bmproto.py | 53 ++++++++++++++----------------- src/network/connectionpool.py | 2 +- src/network/downloadthread.py | 29 +++++++++++++---- src/network/objectracker.py | 5 +-- src/network/receivequeuethread.py | 2 +- src/state.py | 1 + 7 files changed, 55 insertions(+), 43 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 461486f7..88fa3c1d 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -274,12 +274,12 @@ class Main: state.invThread = InvThread() state.invThread.daemon = True state.invThread.start() - downloadThread = DownloadThread() - downloadThread.daemon = True - downloadThread.start() state.addrThread = AddrThread() state.addrThread.daemon = True state.addrThread.start() + state.downloadThread = DownloadThread() + state.downloadThread.daemon = True + state.downloadThread.start() connectToStream(1) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 9ccd69ca..b08b6d61 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -275,47 +275,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object.checkEOLSanity() self.object.checkAlreadyHave() except (BMObjectExpiredError, BMObjectAlreadyHaveError) as e: - for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ - network.connectionpool.BMConnectionPool().outboundConnections.values(): - try: - with connection.objectsNewToThemLock: - del connection.objectsNewToThem[self.object.inventoryHash] - except KeyError: - pass - try: - with connection.objectsNewToMeLock: - del connection.objectsNewToMe[self.object.inventoryHash] - except KeyError: - pass + BMProto.stopDownloadingObject(self.object.inventoryHash) raise e try: self.object.checkStream() except (BMObjectUnwantedStreamError,) as e: - for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ - network.connectionpool.BMConnectionPool().outboundConnections.values(): - try: - with connection.objectsNewToMeLock: - del connection.objectsNewToMe[self.object.inventoryHash] - except KeyError: - pass - if not BMConfigParser().get("inventory", "acceptmismatch"): - try: - with connection.objectsNewToThemLock: - del connection.objectsNewToThem[self.object.inventoryHash] - except KeyError: - pass + BMProto.stopDownloadingObject(self.object.inventoryHash, BMConfigParser().get("inventory", "acceptmismatch")) if not BMConfigParser().get("inventory", "acceptmismatch"): raise e - self.object.checkObjectByType() + try: + self.object.checkObjectByType() + objectProcessorQueue.put((self.object.objectType, self.object.data)) + except BMObjectInvalidError as e: + BMProto.stopDownloadingObject(self.object.inventoryHash, True) Inventory()[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) - objectProcessorQueue.put((self.object.objectType,self.object.data)) - #DownloadQueue().task_done(self.object.inventoryHash) invQueue.put((self.object.streamNumber, self.object.inventoryHash, self)) - #ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash)) - #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) return True def _decode_addr(self): @@ -471,6 +448,22 @@ class BMProto(AdvancedDispatcher, ObjectTracker): retval += protocol.CreatePacket('addr', payload) return retval + @staticmethod + def stopDownloadingObject(hashId, forwardAnyway=False): + for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ + network.connectionpool.BMConnectionPool().outboundConnections.values(): + try: + with connection.objectsNewToMeLock: + del connection.objectsNewToMe[hashId] + except KeyError: + pass + if not forwardAnyway: + try: + with connection.objectsNewToThemLock: + del connection.objectsNewToThem[hashId] + except KeyError: + pass + def handle_close(self, reason=None): self.set_state("close") if reason is None: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 9328720f..c4aa53d8 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -44,7 +44,7 @@ class BMConnectionPool(object): del i.objectsNewToMe[hashid] except KeyError: with i.objectsNewToThemLock: - i.objectsNewToThem[hashid] = True + i.objectsNewToThem[hashid] = time.time() if i == connection: try: with i.objectsNewToThemLock: diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index a8d3e1f7..9c7e92da 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,4 +1,5 @@ import threading +import time import addresses #from bmconfigparser import BMConfigParser @@ -9,26 +10,38 @@ from network.connectionpool import BMConnectionPool import protocol class DownloadThread(threading.Thread, StoppableThread): - maxPending = 500 - requestChunk = 1000 + maxPending = 50 + requestChunk = 100 + requestTimeout = 60 + cleanInterval = 60 + requestExpires = 600 def __init__(self): threading.Thread.__init__(self, name="DownloadThread") self.initStop() self.name = "DownloadThread" logger.info("init download thread") + self.pending = {} + self.lastCleaned = time.time() + + def cleanPending(self): + deadline = time.time() - DownloadThread.requestExpires + self.pending = {k: v for k, v in self.pending.iteritems() if v >= deadline} + self.lastCleaned = time.time() def run(self): while not self._stopped: requested = 0 for i in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): - # this may take a while, but it needs a consistency so I think it's better + now = time.time() + timedOut = now - DownloadThread.requestTimeout + # this may take a while, but it needs a consistency so I think it's better to lock a bigger chunk with i.objectsNewToMeLock: - downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if not v))) + downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in self.pending and self.pending[k] > timedOut))) if downloadPending >= DownloadThread.maxPending: continue # keys with True values in the dict - request = list((k for k, v in i.objectsNewToMe.iteritems() if v)) + request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in self.pending or self.pending[k] < timedOut)) if not request: continue if len(request) > DownloadThread.requestChunk - downloadPending: @@ -36,9 +49,13 @@ class DownloadThread(threading.Thread, StoppableThread): # mark them as pending for k in request: i.objectsNewToMe[k] = False + self.pending[k] = now payload = addresses.encodeVarint(len(request)) + ''.join(request) i.writeQueue.put(protocol.CreatePacket('getdata', payload)) logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) requested += len(request) - self.stop.wait(1) + if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: + self.cleanPending() + if not requested: + self.stop.wait(1) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 5c0ad147..a7295c21 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -28,6 +28,7 @@ class ObjectTracker(object): invCleanPeriod = 300 invInitialCapacity = 50000 invErrorRate = 0.03 + trackingExpires = 3600 def __init__(self): self.objectsNewToMe = {} @@ -62,9 +63,9 @@ class ObjectTracker(object): with self.objectsNewToMeLock: tmp = self.objectsNewToMe.copy() self.objectsNewToMe = tmp + deadline = time.time() - ObjectTracker.trackingExpires with self.objectsNewToThemLock: - tmp = self.objectsNewToThem.copy() - self.objectsNewToThem = tmp + self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline} self.lastCleaned = time.time() def hasObj(self, hashid): diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 8360ab45..137131c5 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -70,7 +70,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): with connection.objectsNewToThemLock: for objHash in Inventory().unexpired_hashes_by_stream(stream): bigInvList[objHash] = 0 - connection.objectsNewToThem[objHash] = True + connection.objectsNewToThem[objHash] = time.time() objectCount = 0 payload = b'' # Now let us start appending all of these hashes together. They will be diff --git a/src/state.py b/src/state.py index 1b12831e..08da36eb 100644 --- a/src/state.py +++ b/src/state.py @@ -25,6 +25,7 @@ maximumNumberOfHalfOpenConnections = 0 invThread = None addrThread = None +downloadThread = None ownAddresses = {}