Peter Surda
ca7becb921
- in corner cases, download request could have contained an incorrect request length. I haven't actually checked if this can be triggered though - wait for downloading until anti intersection delay expires. Doesn't necessarily mean that it will always avoid peer's anti intersection delay, but it's close enough - tracks last time an object was received. If it was too long time ago, reset the download request queue. This avoid situations like when a request gets ignored during the anti intersection delay, but it will keep thinking there are still pending requests as long as not all missing objects have been requested. This caused staggered download (request 1000 items, wait 1 minute, request 1000 more, wait another minute, ...) - with these fixes, you should end up downloading as fast as your network and CPU allow - best tested with trustedpeer
81 lines
2.9 KiB
Python
81 lines
2.9 KiB
Python
import random
|
|
import threading
|
|
import time
|
|
|
|
import addresses
|
|
from dandelion import Dandelion
|
|
from debug import logger
|
|
from helper_threading import StoppableThread
|
|
from inventory import Inventory
|
|
from network.connectionpool import BMConnectionPool
|
|
import protocol
|
|
from state import missingObjects
|
|
import helper_random
|
|
|
|
class DownloadThread(threading.Thread, StoppableThread):
|
|
minPending = 200
|
|
maxRequestChunk = 1000
|
|
requestTimeout = 60
|
|
cleanInterval = 60
|
|
requestExpires = 3600
|
|
|
|
def __init__(self):
|
|
threading.Thread.__init__(self, name="Downloader")
|
|
self.initStop()
|
|
self.name = "Downloader"
|
|
logger.info("init download thread")
|
|
self.lastCleaned = time.time()
|
|
|
|
def cleanPending(self):
|
|
deadline = time.time() - DownloadThread.requestExpires
|
|
try:
|
|
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
|
|
except RuntimeError:
|
|
pass
|
|
else:
|
|
for i in toDelete:
|
|
del missingObjects[i]
|
|
self.lastCleaned = time.time()
|
|
|
|
def run(self):
|
|
while not self._stopped:
|
|
requested = 0
|
|
# Choose downloading peers randomly
|
|
connections = [x for x in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
|
|
helper_random.randomshuffle(connections)
|
|
try:
|
|
requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1)
|
|
except ZeroDivisionError:
|
|
requestChunk = 1
|
|
for i in connections:
|
|
now = time.time()
|
|
# avoid unnecessary delay
|
|
if i.skipUntil >= now:
|
|
continue
|
|
try:
|
|
request = i.objectsNewToMe.randomKeys(requestChunk)
|
|
except KeyError:
|
|
continue
|
|
payload = bytearray()
|
|
chunkCount = 0
|
|
for chunk in request:
|
|
if chunk in Inventory() and not Dandelion().hasHash(chunk):
|
|
try:
|
|
del i.objectsNewToMe[chunk]
|
|
except KeyError:
|
|
pass
|
|
continue
|
|
payload.extend(chunk)
|
|
chunkCount += 1
|
|
missingObjects[chunk] = now
|
|
if not chunkCount:
|
|
continue
|
|
payload[0:0] = addresses.encodeVarint(chunkCount)
|
|
i.append_write_buf(protocol.CreatePacket('getdata', payload))
|
|
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount)
|
|
requested += chunkCount
|
|
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval:
|
|
self.cleanPending()
|
|
if not requested:
|
|
self.stop.wait(1)
|