Download fixes
- in corner cases, download request could have contained an incorrect request length. I haven't actually checked if this can be triggered though - wait for downloading until anti intersection delay expires. Doesn't necessarily mean that it will always avoid peer's anti intersection delay, but it's close enough - tracks last time an object was received. If it was too long time ago, reset the download request queue. This avoid situations like when a request gets ignored during the anti intersection delay, but it will keep thinking there are still pending requests as long as not all missing objects have been requested. This caused staggered download (request 1000 items, wait 1 minute, request 1000 more, wait another minute, ...) - with these fixes, you should end up downloading as fast as your network and CPU allow - best tested with trustedpeer
This commit is contained in:
parent
6eb2155497
commit
ca7becb921
|
@ -49,12 +49,15 @@ class DownloadThread(threading.Thread, StoppableThread):
|
|||
requestChunk = 1
|
||||
for i in connections:
|
||||
now = time.time()
|
||||
# avoid unnecessary delay
|
||||
if i.skipUntil >= now:
|
||||
continue
|
||||
try:
|
||||
request = i.objectsNewToMe.randomKeys(requestChunk)
|
||||
except KeyError:
|
||||
continue
|
||||
payload = bytearray()
|
||||
payload.extend(addresses.encodeVarint(len(request)))
|
||||
chunkCount = 0
|
||||
for chunk in request:
|
||||
if chunk in Inventory() and not Dandelion().hasHash(chunk):
|
||||
try:
|
||||
|
@ -63,12 +66,14 @@ class DownloadThread(threading.Thread, StoppableThread):
|
|||
pass
|
||||
continue
|
||||
payload.extend(chunk)
|
||||
chunkCount += 1
|
||||
missingObjects[chunk] = now
|
||||
if not payload:
|
||||
if not chunkCount:
|
||||
continue
|
||||
payload[0:0] = addresses.encodeVarint(chunkCount)
|
||||
i.append_write_buf(protocol.CreatePacket('getdata', payload))
|
||||
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
|
||||
requested += len(request)
|
||||
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount)
|
||||
requested += chunkCount
|
||||
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval:
|
||||
self.cleanPending()
|
||||
if not requested:
|
||||
|
|
|
@ -104,6 +104,7 @@ class ObjectTracker(object):
|
|||
del i.objectsNewToThem[hashid]
|
||||
except KeyError:
|
||||
pass
|
||||
self.objectsNewToMe.setLastObject()
|
||||
|
||||
def hasAddr(self, addr):
|
||||
if haveBloom:
|
||||
|
|
|
@ -12,6 +12,7 @@ class RandomTrackingDict(object):
|
|||
self.len = 0
|
||||
self.pendingLen = 0
|
||||
self.lastPoll = 0
|
||||
self.lastObject = 0
|
||||
self.lock = RLock()
|
||||
|
||||
def __len__(self):
|
||||
|
@ -71,14 +72,18 @@ class RandomTrackingDict(object):
|
|||
def setPendingTimeout(self, pendingTimeout):
|
||||
self.pendingTimeout = pendingTimeout
|
||||
|
||||
def setLastObject(self):
|
||||
self.lastObject = time()
|
||||
|
||||
def randomKeys(self, count=1):
|
||||
if self.len == 0 or ((self.pendingLen >= self.maxPending or
|
||||
self.pendingLen == self.len) and self.lastPoll +
|
||||
self.pendingTimeout > time()):
|
||||
raise KeyError
|
||||
# reset if we've requested all
|
||||
with self.lock:
|
||||
if self.pendingLen == self.len:
|
||||
# reset if we've requested all
|
||||
# or if last object received too long time ago
|
||||
if self.pendingLen == self.len or self.lastObject + self.pendingTimeout > time():
|
||||
self.pendingLen = 0
|
||||
available = self.len - self.pendingLen
|
||||
if count > available:
|
||||
|
|
Reference in New Issue
Block a user