Import queue from six.moves in the top level module

This commit is contained in:
Dmitri Bogomolov 2021-07-07 22:08:45 +03:00
parent db21f3ea6b
commit f9c49fbeb3
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
5 changed files with 35 additions and 44 deletions

View File

@ -16,20 +16,14 @@ SQLite objects can only be used from one thread.
or isn't thread-safe. or isn't thread-safe.
""" """
# import Queue
import sys
if sys.version_info[0] == 3:
import queue as Queue #python3
else:
import Queue #python2
import threading import threading
from six.moves import queue
sqlSubmitQueue = Queue.Queue()
sqlSubmitQueue = queue.Queue()
"""the queue for SQL""" """the queue for SQL"""
sqlReturnQueue = Queue.Queue() sqlReturnQueue = queue.Queue()
"""the queue for results""" """the queue for results"""
sql_lock = threading.Lock() sql_lock = threading.Lock()
""" lock to prevent queueing a new request until the previous response """ lock to prevent queueing a new request until the previous response

View File

@ -3,21 +3,17 @@ A queue with multiple internal subqueues.
Elements are added into a random subqueue, and retrieval rotates Elements are added into a random subqueue, and retrieval rotates
""" """
import sys
if sys.version_info[0] == 3:
import queue as Queue
else:
import Queue
from collections import deque from collections import deque
if sys.version_info[0] == 3: from six.moves import queue
from . import helper_random
else: try:
import helper_random import helper_random
except ImportError:
from . import helper_random
class MultiQueue(Queue.Queue): class MultiQueue(queue.Queue):
"""A base queue class""" """A base queue class"""
# pylint: disable=redefined-builtin,attribute-defined-outside-init # pylint: disable=redefined-builtin,attribute-defined-outside-init
defaultQueueCount = 10 defaultQueueCount = 10
@ -27,7 +23,7 @@ class MultiQueue(Queue.Queue):
self.queueCount = MultiQueue.defaultQueueCount self.queueCount = MultiQueue.defaultQueueCount
else: else:
self.queueCount = count self.queueCount = count
Queue.Queue.__init__(self, maxsize) queue.Queue.__init__(self, maxsize)
# Initialize the queue representation # Initialize the queue representation
def _init(self, maxsize): def _init(self, maxsize):
@ -42,7 +38,8 @@ class MultiQueue(Queue.Queue):
# Put a new item in the queue # Put a new item in the queue
def _put(self, item): def _put(self, item):
# self.queue.append(item) # self.queue.append(item)
self.queues[helper_random.randomrandrange(self.queueCount)].append((item)) self.queues[helper_random.randomrandrange(self.queueCount)].append(
(item))
# Get an item from the queue # Get an item from the queue
def _get(self): def _get(self):

View File

@ -9,7 +9,7 @@ import knownnodes
import protocol import protocol
import state import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from queues import Queue, portCheckerQueue from queues import queue, portCheckerQueue
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -37,7 +37,7 @@ def chooseConnection(stream):
retval = portCheckerQueue.get(False) retval = portCheckerQueue.get(False)
portCheckerQueue.task_done() portCheckerQueue.task_done()
return retval return retval
except Queue.Empty: except queue.Empty:
pass pass
# with a probability of 0.5, connect to a discovered peer # with a probability of 0.5, connect to a discovered peer
if random.choice((False, True)) and not haveOnion: if random.choice((False, True)) and not haveOnion:

View File

@ -2,10 +2,8 @@
import threading import threading
import time import time
try:
import queue as Queue from six.moves import queue
except ImportError:
import Queue
try: try:
from multiqueue import MultiQueue from multiqueue import MultiQueue
@ -13,13 +11,13 @@ except ImportError:
from .multiqueue import MultiQueue from .multiqueue import MultiQueue
class ObjectProcessorQueue(Queue.Queue): class ObjectProcessorQueue(queue.Queue):
"""Special queue class using lock for `.threads.objectProcessor`""" """Special queue class using lock for `.threads.objectProcessor`"""
maxSize = 32000000 maxSize = 32000000
def __init__(self): def __init__(self):
Queue.Queue.__init__(self) queue.Queue.__init__(self)
self.sizeLock = threading.Lock() self.sizeLock = threading.Lock()
#: in Bytes. We maintain this to prevent nodes from flooding us #: in Bytes. We maintain this to prevent nodes from flooding us
#: with objects which take up too much memory. If this gets #: with objects which take up too much memory. If this gets
@ -31,27 +29,27 @@ class ObjectProcessorQueue(Queue.Queue):
time.sleep(1) time.sleep(1)
with self.sizeLock: with self.sizeLock:
self.curSize += len(item[1]) self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout) queue.Queue.put(self, item, block, timeout)
def get(self, block=True, timeout=None): def get(self, block=True, timeout=None):
item = Queue.Queue.get(self, block, timeout) item = queue.Queue.get(self, block, timeout)
with self.sizeLock: with self.sizeLock:
self.curSize -= len(item[1]) self.curSize -= len(item[1])
return item return item
workerQueue = Queue.Queue() workerQueue = queue.Queue()
UISignalQueue = Queue.Queue() UISignalQueue = queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = queue.Queue()
#: `.network.ReceiveQueueThread` instances dump objects they hear #: `.network.ReceiveQueueThread` instances dump objects they hear
#: on the network into this queue to be processed. #: on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() objectProcessorQueue = ObjectProcessorQueue()
invQueue = MultiQueue() invQueue = MultiQueue()
addrQueue = MultiQueue() addrQueue = MultiQueue()
portCheckerQueue = Queue.Queue() portCheckerQueue = queue.Queue()
receiveDataQueue = Queue.Queue() receiveDataQueue = queue.Queue()
#: The address generator thread uses this queue to get information back #: The address generator thread uses this queue to get information back
#: to the API thread. #: to the API thread.
apiAddressGeneratorReturnQueue = Queue.Queue() apiAddressGeneratorReturnQueue = queue.Queue()
#: for exceptions #: for exceptions
excQueue = Queue.Queue() excQueue = queue.Queue()

View File

@ -1,9 +1,11 @@
"""shutdown function""" """shutdown function"""
import os import os
import Queue
import threading import threading
import time import time
from six.moves import queue
import state import state
from debug import logger from debug import logger
from helper_sql import sqlQuery, sqlStoredProcedure from helper_sql import sqlQuery, sqlStoredProcedure
@ -69,14 +71,14 @@ def doCleanShutdown():
sqlStoredProcedure('exit') sqlStoredProcedure('exit')
# flush queues # flush queues
for queue in ( for q in (
workerQueue, UISignalQueue, addressGeneratorQueue, workerQueue, UISignalQueue, addressGeneratorQueue,
objectProcessorQueue): objectProcessorQueue):
while True: while True:
try: try:
queue.get(False) q.get(False)
queue.task_done() q.task_done()
except Queue.Empty: except queue.Empty:
break break
if state.thisapp.daemon or not state.enableGUI: if state.thisapp.daemon or not state.enableGUI: