diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index cd4b8018..ee0b0289 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -49,12 +49,15 @@ class DownloadThread(threading.Thread, StoppableThread): requestChunk = 1 for i in connections: now = time.time() + # avoid unnecessary delay + if i.skipUntil >= now: + continue try: request = i.objectsNewToMe.randomKeys(requestChunk) except KeyError: continue payload = bytearray() - payload.extend(addresses.encodeVarint(len(request))) + chunkCount = 0 for chunk in request: if chunk in Inventory() and not Dandelion().hasHash(chunk): try: @@ -63,12 +66,14 @@ class DownloadThread(threading.Thread, StoppableThread): pass continue payload.extend(chunk) + chunkCount += 1 missingObjects[chunk] = now - if not payload: + if not chunkCount: continue + payload[0:0] = addresses.encodeVarint(chunkCount) i.append_write_buf(protocol.CreatePacket('getdata', payload)) - logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) - requested += len(request) + logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount) + requested += chunkCount if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: self.cleanPending() if not requested: diff --git a/src/network/objectracker.py b/src/network/objectracker.py index ff2a9036..b5110212 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -104,6 +104,7 @@ class ObjectTracker(object): del i.objectsNewToThem[hashid] except KeyError: pass + self.objectsNewToMe.setLastObject() def hasAddr(self, addr): if haveBloom: diff --git a/src/randomtrackingdict.py b/src/randomtrackingdict.py index ef5e6206..d583d939 100644 --- a/src/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -12,6 +12,7 @@ class RandomTrackingDict(object): self.len = 0 self.pendingLen = 0 self.lastPoll = 0 + self.lastObject = 0 self.lock = RLock() def __len__(self): @@ -71,14 +72,18 @@ class RandomTrackingDict(object): def setPendingTimeout(self, pendingTimeout): self.pendingTimeout = pendingTimeout + def setLastObject(self): + self.lastObject = time() + def randomKeys(self, count=1): if self.len == 0 or ((self.pendingLen >= self.maxPending or self.pendingLen == self.len) and self.lastPoll + self.pendingTimeout > time()): raise KeyError - # reset if we've requested all with self.lock: - if self.pendingLen == self.len: + # reset if we've requested all + # or if last object received too long time ago + if self.pendingLen == self.len or self.lastObject + self.pendingTimeout > time(): self.pendingLen = 0 available = self.len - self.pendingLen if count > available: