From 4d8d9b169f7326b189928f5b485aa7d88207245c Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Thu, 31 Oct 2019 14:13:36 +0200 Subject: [PATCH] Moved ObjectProcessorQueue to queues, added some doc --- docs/conf.py | 2 +- src/class_objectProcessorQueue.py | 24 ----------------- src/queues.py | 45 ++++++++++++++++++++++++++----- 3 files changed, 39 insertions(+), 32 deletions(-) delete mode 100644 src/class_objectProcessorQueue.py diff --git a/docs/conf.py b/docs/conf.py index b6e75cc1..f9283f38 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -229,7 +229,7 @@ apidoc_excluded_paths = [ 'bitmessageqt/newaddresswizard.py', 'class_objectProcessor.py', 'defaults.py', 'helper_startup.py', 'kivymd', 'main.py', 'navigationdrawer', 'network/http*', - 'pybitmessage', 'queues.py', 'tests', 'version.py' + 'pybitmessage', 'tests', 'version.py' ] apidoc_module_first = True apidoc_separate_modules = True diff --git a/src/class_objectProcessorQueue.py b/src/class_objectProcessorQueue.py deleted file mode 100644 index b6628816..00000000 --- a/src/class_objectProcessorQueue.py +++ /dev/null @@ -1,24 +0,0 @@ -import Queue -import threading -import time - -class ObjectProcessorQueue(Queue.Queue): - maxSize = 32000000 - - def __init__(self): - Queue.Queue.__init__(self) - self.sizeLock = threading.Lock() - self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. - - def put(self, item, block = True, timeout = None): - while self.curSize >= self.maxSize: - time.sleep(1) - with self.sizeLock: - self.curSize += len(item[1]) - Queue.Queue.put(self, item, block, timeout) - - def get(self, block = True, timeout = None): - item = Queue.Queue.get(self, block, timeout) - with self.sizeLock: - self.curSize -= len(item[1]) - return item diff --git a/src/queues.py b/src/queues.py index 7b6bbade..d0ac77d0 100644 --- a/src/queues.py +++ b/src/queues.py @@ -1,20 +1,51 @@ -import Queue +"""Most of the queues used by bitmessage threads are defined here.""" + +import Queue +import threading +import time -from class_objectProcessorQueue import ObjectProcessorQueue from multiqueue import MultiQueue + +class ObjectProcessorQueue(Queue.Queue): + """Special queue class using lock for `.threads.objectProcessor`""" + + maxSize = 32000000 + + def __init__(self): + Queue.Queue.__init__(self) + self.sizeLock = threading.Lock() + #: in Bytes. We maintain this to prevent nodes from flooding us + #: with objects which take up too much memory. If this gets + #: too big we'll sleep before asking for further objects. + self.curSize = 0 + + def put(self, item, block=True, timeout=None): + while self.curSize >= self.maxSize: + time.sleep(1) + with self.sizeLock: + self.curSize += len(item[1]) + Queue.Queue.put(self, item, block, timeout) + + def get(self, block=True, timeout=None): + item = Queue.Queue.get(self, block, timeout) + with self.sizeLock: + self.curSize -= len(item[1]) + return item + + workerQueue = Queue.Queue() UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() -# receiveDataThreads dump objects they hear on the network into this -# queue to be processed. +#: receiveDataThreads dump objects they hear on the network into this +#: queue to be processed. objectProcessorQueue = ObjectProcessorQueue() invQueue = MultiQueue() addrQueue = MultiQueue() portCheckerQueue = Queue.Queue() receiveDataQueue = Queue.Queue() -# The address generator thread uses this queue to get information back -# to the API thread. +#: The address generator thread uses this queue to get information back +#: to the API thread. apiAddressGeneratorReturnQueue = Queue.Queue() -# Exceptions +#: for exceptions excQueue = Queue.Queue()