Put uploads into a separate thread

- instead of being processed in the ReceiveQueue thread, uploads are now done
  in a dedicated thread. Only the parsing is done in ReceiveQueue thread.
- the UploadThread is modelled based on the DownloadThred, but simpler.
- it checks for intersection attack, eliminates duplicates and restricts the
  write buffer size to 2MB (may still grow slightly higher if too many big
  objects are requested, but the absolute limit appears to be about 4.5MB in the
  worst case scenario).
- the restriction of the write buffer may cause some upload throttling (to
  about 2MB per second per connection), but can be optimised later
- fixes #1414
This commit is contained in:
Peter Šurda 2018-12-18 22:47:34 +01:00
parent 2457643426
commit ca567acab3
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 81 additions and 16 deletions

View File

@ -58,6 +58,7 @@ from network.announcethread import AnnounceThread
from network.invthread import InvThread from network.invthread import InvThread
from network.addrthread import AddrThread from network.addrthread import AddrThread
from network.downloadthread import DownloadThread from network.downloadthread import DownloadThread
from network.uploadthread import UploadThread
# Helper Functions # Helper Functions
import helper_generic import helper_generic
@ -333,6 +334,9 @@ class Main:
state.downloadThread = DownloadThread() state.downloadThread = DownloadThread()
state.downloadThread.daemon = True state.downloadThread.daemon = True
state.downloadThread.start() state.downloadThread.start()
state.uploadThread = UploadThread()
state.uploadThread.daemon = True
state.uploadThread.start()
connectToStream(1) connectToStream(1)

View File

@ -18,6 +18,8 @@ from network.node import Node
from network.objectracker import ObjectTracker from network.objectracker import ObjectTracker
from network.proxy import ProxyError from network.proxy import ProxyError
from objectracker import missingObjects from objectracker import missingObjects
from randomtrackingdict import RandomTrackingDict
import addresses import addresses
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
@ -57,6 +59,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.isOutbound = False self.isOutbound = False
# packet/connection from a local IP # packet/connection from a local IP
self.local = False self.local = False
self.pendingUpload = RandomTrackingDict()
def bm_proto_reset(self): def bm_proto_reset(self):
self.magic = None self.magic = None
@ -279,23 +282,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# skip? # skip?
if time.time() < self.skipUntil: if time.time() < self.skipUntil:
return True return True
#TODO make this more asynchronous
helper_random.randomshuffle(items) helper_random.randomshuffle(items)
for i in map(str, items): for i in map(str, items):
if Dandelion().hasHash(i) and \ self.pendingUpload[i] = time.time()
self != Dandelion().objectChildStem(i):
self.antiIntersectionDelay()
logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination)
break
else:
try:
self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload))
except KeyError:
self.antiIntersectionDelay()
logger.info('%s asked for an object we don\'t have.', self.destination)
break
# I think that aborting after the first missing/stem object is more secure
# when using random reordering, as the recipient won't know exactly which objects we refuse to deliver
return True return True
def _command_inv(self, dandelion=False): def _command_inv(self, dandelion=False):

View File

@ -0,0 +1,71 @@
"""
src/network/uploadthread.py
"""
# pylint: disable=unsubscriptable-object
import threading
import time
import helper_random
import protocol
from debug import logger
from helper_threading import StoppableThread
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
class UploadThread(threading.Thread, StoppableThread):
"""This is a thread that uploads the objects that the peers requested from me """
maxBufSize = 2097152 # 2MB
def __init__(self):
threading.Thread.__init__(self, name="Uploader")
self.initStop()
self.name = "Uploader"
logger.info("init upload thread")
def run(self):
while not self._stopped:
uploaded = 0
# Choose downloading peers randomly
connections = [x for x in BMConnectionPool().inboundConnections.values() +
BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
helper_random.randomshuffle(connections)
for i in connections:
now = time.time()
# avoid unnecessary delay
if i.skipUntil >= now:
continue
if len(i.write_buf) > UploadThread.maxBufSize:
continue
try:
request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending)
except KeyError:
continue
payload = bytearray()
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if Dandelion().hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk):
i.antiIntersectionDelay()
logger.info('%s asked for a stem object we didn\'t offer to it.',
i.destination)
break
try:
payload.extend(protocol.CreatePacket('object',
Inventory()[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()
logger.info('%s asked for an object we don\'t have.', i.destination)
break
if not chunk_count:
continue
i.append_write_buf(payload)
logger.debug("%s:%i Uploading %i objects",
i.destination.host, i.destination.port, chunk_count)
uploaded += chunk_count
if not uploaded:
self.stop.wait(1)

View File

@ -34,6 +34,7 @@ maximumNumberOfHalfOpenConnections = 0
invThread = None invThread = None
addrThread = None addrThread = None
downloadThread = None downloadThread = None
uploadThread = None
ownAddresses = {} ownAddresses = {}

View File

@ -23,7 +23,7 @@ class TestProcessProto(unittest.TestCase):
it starts pybitmessage in setUpClass() and stops it in tearDownClass() it starts pybitmessage in setUpClass() and stops it in tearDownClass()
""" """
_process_cmd = ['pybitmessage', '-d'] _process_cmd = ['pybitmessage', '-d']
_threads_count = 14 _threads_count = 15
_files = ( _files = (
'keys.dat', 'debug.log', 'messages.dat', 'knownnodes.dat', 'keys.dat', 'debug.log', 'messages.dat', 'knownnodes.dat',
'.api_started', 'unittest.lock' '.api_started', 'unittest.lock'