From ca7becb921e6fd506e52228cfdcc3c1b4dcfc41e Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 2 Apr 2018 19:33:41 +0200 Subject: [PATCH] Download fixes - in corner cases, download request could have contained an incorrect request length. I haven't actually checked if this can be triggered though - wait for downloading until anti intersection delay expires. Doesn't necessarily mean that it will always avoid peer's anti intersection delay, but it's close enough - tracks last time an object was received. If it was too long time ago, reset the download request queue. This avoid situations like when a request gets ignored during the anti intersection delay, but it will keep thinking there are still pending requests as long as not all missing objects have been requested. This caused staggered download (request 1000 items, wait 1 minute, request 1000 more, wait another minute, ...) - with these fixes, you should end up downloading as fast as your network and CPU allow - best tested with trustedpeer --- src/network/downloadthread.py | 13 +++++++++---- src/network/objectracker.py | 1 + src/randomtrackingdict.py | 9 +++++++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index cd4b8018..ee0b0289 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -49,12 +49,15 @@ class DownloadThread(threading.Thread, StoppableThread): requestChunk = 1 for i in connections: now = time.time() + # avoid unnecessary delay + if i.skipUntil >= now: + continue try: request = i.objectsNewToMe.randomKeys(requestChunk) except KeyError: continue payload = bytearray() - payload.extend(addresses.encodeVarint(len(request))) + chunkCount = 0 for chunk in request: if chunk in Inventory() and not Dandelion().hasHash(chunk): try: @@ -63,12 +66,14 @@ class DownloadThread(threading.Thread, StoppableThread): pass continue payload.extend(chunk) + chunkCount += 1 missingObjects[chunk] = now - if not payload: + if not chunkCount: continue + payload[0:0] = addresses.encodeVarint(chunkCount) i.append_write_buf(protocol.CreatePacket('getdata', payload)) - logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) - requested += len(request) + logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount) + requested += chunkCount if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: self.cleanPending() if not requested: diff --git a/src/network/objectracker.py b/src/network/objectracker.py index ff2a9036..b5110212 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -104,6 +104,7 @@ class ObjectTracker(object): del i.objectsNewToThem[hashid] except KeyError: pass + self.objectsNewToMe.setLastObject() def hasAddr(self, addr): if haveBloom: diff --git a/src/randomtrackingdict.py b/src/randomtrackingdict.py index ef5e6206..d583d939 100644 --- a/src/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -12,6 +12,7 @@ class RandomTrackingDict(object): self.len = 0 self.pendingLen = 0 self.lastPoll = 0 + self.lastObject = 0 self.lock = RLock() def __len__(self): @@ -71,14 +72,18 @@ class RandomTrackingDict(object): def setPendingTimeout(self, pendingTimeout): self.pendingTimeout = pendingTimeout + def setLastObject(self): + self.lastObject = time() + def randomKeys(self, count=1): if self.len == 0 or ((self.pendingLen >= self.maxPending or self.pendingLen == self.len) and self.lastPoll + self.pendingTimeout > time()): raise KeyError - # reset if we've requested all with self.lock: - if self.pendingLen == self.len: + # reset if we've requested all + # or if last object received too long time ago + if self.pendingLen == self.len or self.lastObject + self.pendingTimeout > time(): self.pendingLen = 0 available = self.len - self.pendingLen if count > available: