From ca567acab329e35b7dd7f15d51115eb03d59ce7f Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Tue, 18 Dec 2018 22:47:34 +0100 Subject: [PATCH] Put uploads into a separate thread - instead of being processed in the ReceiveQueue thread, uploads are now done in a dedicated thread. Only the parsing is done in ReceiveQueue thread. - the UploadThread is modelled based on the DownloadThred, but simpler. - it checks for intersection attack, eliminates duplicates and restricts the write buffer size to 2MB (may still grow slightly higher if too many big objects are requested, but the absolute limit appears to be about 4.5MB in the worst case scenario). - the restriction of the write buffer may cause some upload throttling (to about 2MB per second per connection), but can be optimised later - fixes #1414 --- src/bitmessagemain.py | 4 +++ src/network/bmproto.py | 19 +++------- src/network/uploadthread.py | 71 +++++++++++++++++++++++++++++++++++++ src/state.py | 1 + src/tests/test_process.py | 2 +- 5 files changed, 81 insertions(+), 16 deletions(-) create mode 100644 src/network/uploadthread.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 2c3d10db..b3775b60 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -58,6 +58,7 @@ from network.announcethread import AnnounceThread from network.invthread import InvThread from network.addrthread import AddrThread from network.downloadthread import DownloadThread +from network.uploadthread import UploadThread # Helper Functions import helper_generic @@ -333,6 +334,9 @@ class Main: state.downloadThread = DownloadThread() state.downloadThread.daemon = True state.downloadThread.start() + state.uploadThread = UploadThread() + state.uploadThread.daemon = True + state.uploadThread.start() connectToStream(1) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index ec86290b..c5bc9263 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -18,6 +18,8 @@ from network.node import Node from network.objectracker import ObjectTracker from network.proxy import ProxyError from objectracker import missingObjects +from randomtrackingdict import RandomTrackingDict + import addresses from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue @@ -57,6 +59,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.isOutbound = False # packet/connection from a local IP self.local = False + self.pendingUpload = RandomTrackingDict() def bm_proto_reset(self): self.magic = None @@ -279,23 +282,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # skip? if time.time() < self.skipUntil: return True - #TODO make this more asynchronous helper_random.randomshuffle(items) for i in map(str, items): - if Dandelion().hasHash(i) and \ - self != Dandelion().objectChildStem(i): - self.antiIntersectionDelay() - logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination) - break - else: - try: - self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) - except KeyError: - self.antiIntersectionDelay() - logger.info('%s asked for an object we don\'t have.', self.destination) - break - # I think that aborting after the first missing/stem object is more secure - # when using random reordering, as the recipient won't know exactly which objects we refuse to deliver + self.pendingUpload[i] = time.time() return True def _command_inv(self, dandelion=False): diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py new file mode 100644 index 00000000..61ee6fab --- /dev/null +++ b/src/network/uploadthread.py @@ -0,0 +1,71 @@ +""" +src/network/uploadthread.py +""" +# pylint: disable=unsubscriptable-object +import threading +import time + +import helper_random +import protocol +from debug import logger +from helper_threading import StoppableThread +from inventory import Inventory +from network.connectionpool import BMConnectionPool +from network.dandelion import Dandelion +from randomtrackingdict import RandomTrackingDict + + +class UploadThread(threading.Thread, StoppableThread): + """This is a thread that uploads the objects that the peers requested from me """ + maxBufSize = 2097152 # 2MB + + def __init__(self): + threading.Thread.__init__(self, name="Uploader") + self.initStop() + self.name = "Uploader" + logger.info("init upload thread") + + def run(self): + while not self._stopped: + uploaded = 0 + # Choose downloading peers randomly + connections = [x for x in BMConnectionPool().inboundConnections.values() + + BMConnectionPool().outboundConnections.values() if x.fullyEstablished] + helper_random.randomshuffle(connections) + for i in connections: + now = time.time() + # avoid unnecessary delay + if i.skipUntil >= now: + continue + if len(i.write_buf) > UploadThread.maxBufSize: + continue + try: + request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) + except KeyError: + continue + payload = bytearray() + chunk_count = 0 + for chunk in request: + del i.pendingUpload[chunk] + if Dandelion().hasHash(chunk) and \ + i != Dandelion().objectChildStem(chunk): + i.antiIntersectionDelay() + logger.info('%s asked for a stem object we didn\'t offer to it.', + i.destination) + break + try: + payload.extend(protocol.CreatePacket('object', + Inventory()[chunk].payload)) + chunk_count += 1 + except KeyError: + i.antiIntersectionDelay() + logger.info('%s asked for an object we don\'t have.', i.destination) + break + if not chunk_count: + continue + i.append_write_buf(payload) + logger.debug("%s:%i Uploading %i objects", + i.destination.host, i.destination.port, chunk_count) + uploaded += chunk_count + if not uploaded: + self.stop.wait(1) diff --git a/src/state.py b/src/state.py index 87022d9c..2cbc3a7c 100644 --- a/src/state.py +++ b/src/state.py @@ -34,6 +34,7 @@ maximumNumberOfHalfOpenConnections = 0 invThread = None addrThread = None downloadThread = None +uploadThread = None ownAddresses = {} diff --git a/src/tests/test_process.py b/src/tests/test_process.py index b32cfeb6..5033045e 100644 --- a/src/tests/test_process.py +++ b/src/tests/test_process.py @@ -23,7 +23,7 @@ class TestProcessProto(unittest.TestCase): it starts pybitmessage in setUpClass() and stops it in tearDownClass() """ _process_cmd = ['pybitmessage', '-d'] - _threads_count = 14 + _threads_count = 15 _files = ( 'keys.dat', 'debug.log', 'messages.dat', 'knownnodes.dat', '.api_started', 'unittest.lock'