From bdf61489aebdcad5b5238602727894fb8801fe2e Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 10 Jul 2017 07:08:10 +0200 Subject: [PATCH] Allow multiple ReceiveQueue threads - defaults to 3 --- src/bitmessagemain.py | 7 ++++--- src/bmconfigparser.py | 3 +++ src/helper_threading.py | 15 +++++++++++++++ src/network/advanceddispatcher.py | 7 +++++-- src/network/receivequeuethread.py | 10 +++++----- 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index b142676b..3e0a1c84 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -267,9 +267,10 @@ class Main: asyncoreThread = BMNetworkThread() asyncoreThread.daemon = True asyncoreThread.start() - receiveQueueThread = ReceiveQueueThread() - receiveQueueThread.daemon = True - receiveQueueThread.start() + for i in range(BMConfigParser().getint("threads", "receive")): + receiveQueueThread = ReceiveQueueThread(i) + receiveQueueThread.daemon = True + receiveQueueThread.start() announceThread = AnnounceThread() announceThread.daemon = True announceThread.start() diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index 913df9c0..094cd73d 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -15,6 +15,9 @@ BMConfigDefaults = { "maxtotalconnections": 200, "maxuploadrate": 0, }, + "threads": { + "receive": 3, + }, "network": { "asyncore": True, "bind": None, diff --git a/src/helper_threading.py b/src/helper_threading.py index 7ea4a12d..56c5b8b2 100644 --- a/src/helper_threading.py +++ b/src/helper_threading.py @@ -1,4 +1,6 @@ +from contextlib import contextmanager import threading + try: import prctl def set_thread_name(name): prctl.set_name(name) @@ -20,3 +22,16 @@ class StoppableThread(object): def stopThread(self): self._stopped = True self.stop.set() + +class BusyError(threading.ThreadError): + pass + +@contextmanager +def nonBlocking(lock): + locked = lock.acquire(False) + if not locked: + raise BusyError + try: + yield + finally: + lock.release() diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 338e3bba..dbe65e39 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -6,6 +6,7 @@ import time import asyncore_pollchoose as asyncore from debug import logger +from helper_threading import BusyError, nonBlocking class AdvancedDispatcher(asyncore.dispatcher): _buf_len = 2097152 # 2MB @@ -22,6 +23,7 @@ class AdvancedDispatcher(asyncore.dispatcher): self.expectBytes = 0 self.readLock = threading.RLock() self.writeLock = threading.RLock() + self.processingLock = threading.RLock() def append_write_buf(self, data): if data: @@ -43,8 +45,9 @@ class AdvancedDispatcher(asyncore.dispatcher): return False while len(self.read_buf) >= self.expectBytes: try: - if getattr(self, "state_" + str(self.state))() is False: - break + with nonBlocking(self.processingLock): + if getattr(self, "state_" + str(self.state))() is False: + break except AttributeError: raise return False diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index b6810a3c..a899851f 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -15,17 +15,16 @@ import protocol import state class ReceiveQueueThread(threading.Thread, StoppableThread): - def __init__(self): - threading.Thread.__init__(self, name="ReceiveQueueThread") + def __init__(self, num=0): + threading.Thread.__init__(self, name="ReceiveQueue_%i" %(num)) self.initStop() - self.name = "ReceiveQueueThread" - logger.info("init receive queue thread") + self.name = "ReceiveQueue_%i" % (num) + logger.info("init receive queue thread %i", num) def run(self): while not self._stopped and state.shutdown == 0: try: dest = receiveDataQueue.get(block=True, timeout=1) - receiveDataQueue.task_done() except Queue.Empty: continue @@ -44,3 +43,4 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): # AttributeError = state isn't implemented except (KeyError, AttributeError): pass + receiveDataQueue.task_done()