diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 98b6df05..7cbe11ad 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -9,9 +9,10 @@ from helper_threading import StoppableThread #from inventory import Inventory from network.connectionpool import BMConnectionPool import protocol +from state import missingObjects class DownloadThread(threading.Thread, StoppableThread): - maxPending = 200 + minPending = 200 requestChunk = 1000 requestTimeout = 60 cleanInterval = 60 @@ -22,13 +23,18 @@ class DownloadThread(threading.Thread, StoppableThread): self.initStop() self.name = "Downloader" logger.info("init download thread") - self.pending = {} self.lastCleaned = time.time() def cleanPending(self): deadline = time.time() - DownloadThread.requestExpires - self.pending = {k: v for k, v in self.pending.iteritems() if v >= deadline} - self.lastCleaned = time.time() + try: + toDelete = [k for k, v in missingObjects.iteritems() if v < deadline] + except RuntimeError: + pass + else: + for i in toDelete: + del missingObjects[i] + self.lastCleaned = time.time() def run(self): while not self._stopped: @@ -41,11 +47,12 @@ class DownloadThread(threading.Thread, StoppableThread): timedOut = now - DownloadThread.requestTimeout # this may take a while, but it needs a consistency so I think it's better to lock a bigger chunk with i.objectsNewToMeLock: - downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in self.pending and self.pending[k] > timedOut))) - if downloadPending >= DownloadThread.maxPending: + downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in missingObjects and missingObjects[k] > timedOut))) + if downloadPending >= DownloadThread.minPending: continue # keys with True values in the dict - request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in self.pending or self.pending[k] < timedOut)) + request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in missingObjects or missingObjects[k] < timedOut)) + random.shuffle(request) if not request: continue if len(request) > DownloadThread.requestChunk - downloadPending: @@ -53,7 +60,7 @@ class DownloadThread(threading.Thread, StoppableThread): # mark them as pending for k in request: i.objectsNewToMe[k] = False - self.pending[k] = now + missingObjects[k] = now payload = bytearray() payload.extend(addresses.encodeVarint(len(request))) @@ -65,4 +72,4 @@ class DownloadThread(threading.Thread, StoppableThread): if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: self.cleanPending() if not requested: - self.stop.wait(1) + self.stop.wait(5) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index bfb75174..a86ec23f 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -83,7 +83,8 @@ class ObjectTracker(object): except KeyError: pass if hashId not in Inventory(): - missingObjects[hashId] = None + if hashId not in missingObjects: + missingObjects[hashId] = time.time() with self.objectsNewToMeLock: self.objectsNewToMe[hashId] = True elif hashId in Dandelion().hashMap: