Moved ObjectProcessorQueue to queues, added some doc

This commit is contained in:
Dmitri Bogomolov 2019-10-31 14:13:36 +02:00
parent 341651973a
commit 4d8d9b169f
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
3 changed files with 39 additions and 32 deletions

View File

@ -229,7 +229,7 @@ apidoc_excluded_paths = [
'bitmessageqt/newaddresswizard.py', 'bitmessageqt/newaddresswizard.py',
'class_objectProcessor.py', 'defaults.py', 'helper_startup.py', 'class_objectProcessor.py', 'defaults.py', 'helper_startup.py',
'kivymd', 'main.py', 'navigationdrawer', 'network/http*', 'kivymd', 'main.py', 'navigationdrawer', 'network/http*',
'pybitmessage', 'queues.py', 'tests', 'version.py' 'pybitmessage', 'tests', 'version.py'
] ]
apidoc_module_first = True apidoc_module_first = True
apidoc_separate_modules = True apidoc_separate_modules = True

View File

@ -1,24 +0,0 @@
import Queue
import threading
import time
class ObjectProcessorQueue(Queue.Queue):
maxSize = 32000000
def __init__(self):
Queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
def put(self, item, block = True, timeout = None):
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout)
def get(self, block = True, timeout = None):
item = Queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item

View File

@ -1,20 +1,51 @@
import Queue """Most of the queues used by bitmessage threads are defined here."""
import Queue
import threading
import time
from class_objectProcessorQueue import ObjectProcessorQueue
from multiqueue import MultiQueue from multiqueue import MultiQueue
class ObjectProcessorQueue(Queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`"""
maxSize = 32000000
def __init__(self):
Queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
#: in Bytes. We maintain this to prevent nodes from flooding us
#: with objects which take up too much memory. If this gets
#: too big we'll sleep before asking for further objects.
self.curSize = 0
def put(self, item, block=True, timeout=None):
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout)
def get(self, block=True, timeout=None):
item = Queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item
workerQueue = Queue.Queue() workerQueue = Queue.Queue()
UISignalQueue = Queue.Queue() UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue()
# receiveDataThreads dump objects they hear on the network into this #: receiveDataThreads dump objects they hear on the network into this
# queue to be processed. #: queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue() invQueue = MultiQueue()
addrQueue = MultiQueue() addrQueue = MultiQueue()
portCheckerQueue = Queue.Queue() portCheckerQueue = Queue.Queue()
receiveDataQueue = Queue.Queue() receiveDataQueue = Queue.Queue()
# The address generator thread uses this queue to get information back #: The address generator thread uses this queue to get information back
# to the API thread. #: to the API thread.
apiAddressGeneratorReturnQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue()
# Exceptions #: for exceptions
excQueue = Queue.Queue() excQueue = Queue.Queue()