diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 2c3d10db..b3775b60 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -58,6 +58,7 @@ from network.announcethread import AnnounceThread from network.invthread import InvThread from network.addrthread import AddrThread from network.downloadthread import DownloadThread +from network.uploadthread import UploadThread # Helper Functions import helper_generic @@ -333,6 +334,9 @@ class Main: state.downloadThread = DownloadThread() state.downloadThread.daemon = True state.downloadThread.start() + state.uploadThread = UploadThread() + state.uploadThread.daemon = True + state.uploadThread.start() connectToStream(1) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index ec86290b..c5bc9263 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -18,6 +18,8 @@ from network.node import Node from network.objectracker import ObjectTracker from network.proxy import ProxyError from objectracker import missingObjects +from randomtrackingdict import RandomTrackingDict + import addresses from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue @@ -57,6 +59,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.isOutbound = False # packet/connection from a local IP self.local = False + self.pendingUpload = RandomTrackingDict() def bm_proto_reset(self): self.magic = None @@ -279,23 +282,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): # skip? if time.time() < self.skipUntil: return True - #TODO make this more asynchronous helper_random.randomshuffle(items) for i in map(str, items): - if Dandelion().hasHash(i) and \ - self != Dandelion().objectChildStem(i): - self.antiIntersectionDelay() - logger.info('%s asked for a stem object we didn\'t offer to it.', self.destination) - break - else: - try: - self.append_write_buf(protocol.CreatePacket('object', Inventory()[i].payload)) - except KeyError: - self.antiIntersectionDelay() - logger.info('%s asked for an object we don\'t have.', self.destination) - break - # I think that aborting after the first missing/stem object is more secure - # when using random reordering, as the recipient won't know exactly which objects we refuse to deliver + self.pendingUpload[i] = time.time() return True def _command_inv(self, dandelion=False): diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py new file mode 100644 index 00000000..61ee6fab --- /dev/null +++ b/src/network/uploadthread.py @@ -0,0 +1,71 @@ +""" +src/network/uploadthread.py +""" +# pylint: disable=unsubscriptable-object +import threading +import time + +import helper_random +import protocol +from debug import logger +from helper_threading import StoppableThread +from inventory import Inventory +from network.connectionpool import BMConnectionPool +from network.dandelion import Dandelion +from randomtrackingdict import RandomTrackingDict + + +class UploadThread(threading.Thread, StoppableThread): + """This is a thread that uploads the objects that the peers requested from me """ + maxBufSize = 2097152 # 2MB + + def __init__(self): + threading.Thread.__init__(self, name="Uploader") + self.initStop() + self.name = "Uploader" + logger.info("init upload thread") + + def run(self): + while not self._stopped: + uploaded = 0 + # Choose downloading peers randomly + connections = [x for x in BMConnectionPool().inboundConnections.values() + + BMConnectionPool().outboundConnections.values() if x.fullyEstablished] + helper_random.randomshuffle(connections) + for i in connections: + now = time.time() + # avoid unnecessary delay + if i.skipUntil >= now: + continue + if len(i.write_buf) > UploadThread.maxBufSize: + continue + try: + request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending) + except KeyError: + continue + payload = bytearray() + chunk_count = 0 + for chunk in request: + del i.pendingUpload[chunk] + if Dandelion().hasHash(chunk) and \ + i != Dandelion().objectChildStem(chunk): + i.antiIntersectionDelay() + logger.info('%s asked for a stem object we didn\'t offer to it.', + i.destination) + break + try: + payload.extend(protocol.CreatePacket('object', + Inventory()[chunk].payload)) + chunk_count += 1 + except KeyError: + i.antiIntersectionDelay() + logger.info('%s asked for an object we don\'t have.', i.destination) + break + if not chunk_count: + continue + i.append_write_buf(payload) + logger.debug("%s:%i Uploading %i objects", + i.destination.host, i.destination.port, chunk_count) + uploaded += chunk_count + if not uploaded: + self.stop.wait(1) diff --git a/src/state.py b/src/state.py index 87022d9c..2cbc3a7c 100644 --- a/src/state.py +++ b/src/state.py @@ -34,6 +34,7 @@ maximumNumberOfHalfOpenConnections = 0 invThread = None addrThread = None downloadThread = None +uploadThread = None ownAddresses = {} diff --git a/src/tests/test_process.py b/src/tests/test_process.py index b32cfeb6..5033045e 100644 --- a/src/tests/test_process.py +++ b/src/tests/test_process.py @@ -23,7 +23,7 @@ class TestProcessProto(unittest.TestCase): it starts pybitmessage in setUpClass() and stops it in tearDownClass() """ _process_cmd = ['pybitmessage', '-d'] - _threads_count = 14 + _threads_count = 15 _files = ( 'keys.dat', 'debug.log', 'messages.dat', 'knownnodes.dat', '.api_started', 'unittest.lock'