This repository has been archived on 2025-02-01. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2025-02-01/src/queues.py

129 lines
4.6 KiB
Python
Executable File

"""Most of the queues used by bitmessage threads are defined here."""
import queue as Queue
import threading
import time
import traceback
try:
from multiqueue import MultiQueue
except ModuleNotFoundError:
from pybitmessage.multiqueue import MultiQueue
class ObjectProcessorQueue(Queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`"""
maxSize = 32000000
def __init__(self):
Queue.Queue.__init__(self)
self.sizeLock = threading.Lock()
#: in Bytes. We maintain this to prevent nodes from flooding us
#: with objects which take up too much memory. If this gets
#: too big we'll sleep before asking for further objects.
self.curSize = 0
def put(self, item, block=True, timeout=None):
"""Putting values in queues"""
while self.curSize >= self.maxSize:
time.sleep(1)
with self.sizeLock:
self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout)
def get(self, block=True, timeout=None):
"""Getting values from queues"""
item = Queue.Queue.get(self, block, timeout)
with self.sizeLock:
self.curSize -= len(item[1])
return item
class addressGeneratorQueueClass(Queue.Queue):
debug_file = open("/tmp/addressgenerator.log", "a")
def __init__(self):
Queue.Queue.__init__(self)
def put(self, item, block =True, timeout=None):
self.debug_file.write('---this put condition--\n')
self.debug_file.write('this put condition- ')
self.debug_file.write('Current-thread-{} \n'.format(
threading.current_thread().name))
self.debug_file.write('Traceback-{} \n'.format(
str(traceback.format_stack())))
self.debug_file.write('Printig the put item-{}'.format(
item))
Queue.Queue.put(self, item, block, timeout)
self.debug_file.write('-------------------\n \n')
def get(self, block =True, timeout=None):
self.debug_file.write('---this get condition ---\n')
self.debug_file.write('Current-thread-{} \n '.format(
threading.current_thread().name))
self.debug_file.write('Traceback-{} \n'.format(
str(traceback.format_stack())))
item = Queue.Queue.get(self, block, timeout)
self.debug_file.write('Printig the get item-{}'.format(
str(item)))
self.debug_file.write('-------------------\n \n')
return item
class apiAddressGeneratorReturnQueueQueueClass(Queue.Queue):
debug_file = open("/tmp/apiAddressGeneratorReturnQueue.log", "a")
def __init__(self):
self.debug_file.write(
'apiAddressGeneratorReturnQueueQueue started.....\n'
)
Queue.Queue.__init__(self)
def put(self, item, block =True, timeout=None):
self.debug_file.write('-------------------\n')
self.debug_file.write('this put condition- ')
self.debug_file.write('@@@@@@ put 51 @@@@@@\n')
self.debug_file.write(threading.current_thread().name)
self.debug_file.write('@@@@@@ put 53 @@@@@@\n')
self.debug_file.write(str(traceback.print_exc()))
self.debug_file.write('@@@@@@ put 55 @@@@@@\n')
Queue.Queue.put(self, item, block, timeout)
self.debug_file.write('@@@@@@ put 57 @@@@@@\n')
self.debug_file.write('-------------------\n')
def get(self, block =True, timeout=None):
self.debug_file.write('-------------------\n')
self.debug_file.write('this get condition -')
self.debug_file.write('@@@@@@ get 64 @@@@@@')
self.debug_file.write(threading.current_thread().name)
self.debug_file.write('@@@@@@ get 66 @@@@@@')
self.debug_file.write(str(traceback.print_exc()))
self.debug_file.write('@@@@@@ get 68 @@@@@@')
item = Queue.Queue.get(self, block, timeout)
self.debug_file.write('@@@@@@ get 70 @@@@@@')
self.debug_file.write('-------------------\n')
return item
workerQueue = Queue.Queue()
UISignalQueue = Queue.Queue()
addressGeneratorQueue = addressGeneratorQueueClass()
#: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue()
addrQueue = MultiQueue()
portCheckerQueue = Queue.Queue()
receiveDataQueue = Queue.Queue()
#: The address generator thread uses this queue to get information back
#: to the API thread.
apiAddressGeneratorReturnQueue = apiAddressGeneratorReturnQueueQueueClass()
#: for exceptions
excQueue = Queue.Queue()
#new