PyBitmessage-2021-04-27/src/network/downloadthread.py
Peter Surda ca7becb921
Download fixes
- in corner cases, download request could have contained an incorrect
  request length. I haven't actually checked if this can be triggered
  though
- wait for downloading until anti intersection delay expires. Doesn't
  necessarily mean that it will always avoid peer's anti intersection
  delay, but it's close enough
- tracks last time an object was received. If it was too long time ago,
  reset the download request queue. This avoid situations like when a
  request gets ignored during the anti intersection delay, but it will
  keep thinking there are still pending requests as long as not all
  missing objects have been requested. This caused staggered download
  (request 1000 items, wait 1 minute, request 1000 more, wait another
  minute, ...)
- with these fixes, you should end up downloading as fast as your
  network and CPU allow
- best tested with trustedpeer
2018-04-03 19:24:07 +02:00

81 lines
2.9 KiB
Python

import random
import threading
import time
import addresses
from dandelion import Dandelion
from debug import logger
from helper_threading import StoppableThread
from inventory import Inventory
from network.connectionpool import BMConnectionPool
import protocol
from state import missingObjects
import helper_random
class DownloadThread(threading.Thread, StoppableThread):
minPending = 200
maxRequestChunk = 1000
requestTimeout = 60
cleanInterval = 60
requestExpires = 3600
def __init__(self):
threading.Thread.__init__(self, name="Downloader")
self.initStop()
self.name = "Downloader"
logger.info("init download thread")
self.lastCleaned = time.time()
def cleanPending(self):
deadline = time.time() - DownloadThread.requestExpires
try:
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
except RuntimeError:
pass
else:
for i in toDelete:
del missingObjects[i]
self.lastCleaned = time.time()
def run(self):
while not self._stopped:
requested = 0
# Choose downloading peers randomly
connections = [x for x in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
helper_random.randomshuffle(connections)
try:
requestChunk = max(int(min(DownloadThread.maxRequestChunk, len(missingObjects)) / len(connections)), 1)
except ZeroDivisionError:
requestChunk = 1
for i in connections:
now = time.time()
# avoid unnecessary delay
if i.skipUntil >= now:
continue
try:
request = i.objectsNewToMe.randomKeys(requestChunk)
except KeyError:
continue
payload = bytearray()
chunkCount = 0
for chunk in request:
if chunk in Inventory() and not Dandelion().hasHash(chunk):
try:
del i.objectsNewToMe[chunk]
except KeyError:
pass
continue
payload.extend(chunk)
chunkCount += 1
missingObjects[chunk] = now
if not chunkCount:
continue
payload[0:0] = addresses.encodeVarint(chunkCount)
i.append_write_buf(protocol.CreatePacket('getdata', payload))
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount)
requested += chunkCount
if time.time() >= self.lastCleaned + DownloadThread.cleanInterval:
self.cleanPending()
if not requested:
self.stop.wait(1)