diff --git a/src/helper_sql.py b/src/helper_sql.py index 5bd2f0f7..cba98884 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -16,20 +16,14 @@ SQLite objects can only be used from one thread. or isn't thread-safe. """ - -# import Queue -import sys -if sys.version_info[0] == 3: - import queue as Queue #python3 -else: - import Queue #python2 - import threading +from six.moves import queue -sqlSubmitQueue = Queue.Queue() + +sqlSubmitQueue = queue.Queue() """the queue for SQL""" -sqlReturnQueue = Queue.Queue() +sqlReturnQueue = queue.Queue() """the queue for results""" sql_lock = threading.Lock() """ lock to prevent queueing a new request until the previous response diff --git a/src/multiqueue.py b/src/multiqueue.py index 886d693d..88b6a4dd 100644 --- a/src/multiqueue.py +++ b/src/multiqueue.py @@ -3,21 +3,17 @@ A queue with multiple internal subqueues. Elements are added into a random subqueue, and retrieval rotates """ -import sys -if sys.version_info[0] == 3: - import queue as Queue -else: - import Queue - from collections import deque -if sys.version_info[0] == 3: - from . import helper_random -else: +from six.moves import queue + +try: import helper_random +except ImportError: + from . import helper_random -class MultiQueue(Queue.Queue): +class MultiQueue(queue.Queue): """A base queue class""" # pylint: disable=redefined-builtin,attribute-defined-outside-init defaultQueueCount = 10 @@ -27,7 +23,7 @@ class MultiQueue(Queue.Queue): self.queueCount = MultiQueue.defaultQueueCount else: self.queueCount = count - Queue.Queue.__init__(self, maxsize) + queue.Queue.__init__(self, maxsize) # Initialize the queue representation def _init(self, maxsize): @@ -42,7 +38,8 @@ class MultiQueue(Queue.Queue): # Put a new item in the queue def _put(self, item): # self.queue.append(item) - self.queues[helper_random.randomrandrange(self.queueCount)].append((item)) + self.queues[helper_random.randomrandrange(self.queueCount)].append( + (item)) # Get an item from the queue def _get(self): diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index badd98b7..c31bbb6a 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -9,7 +9,7 @@ import knownnodes import protocol import state from bmconfigparser import BMConfigParser -from queues import Queue, portCheckerQueue +from queues import queue, portCheckerQueue logger = logging.getLogger('default') @@ -37,7 +37,7 @@ def chooseConnection(stream): retval = portCheckerQueue.get(False) portCheckerQueue.task_done() return retval - except Queue.Empty: + except queue.Empty: pass # with a probability of 0.5, connect to a discovered peer if random.choice((False, True)) and not haveOnion: diff --git a/src/queues.py b/src/queues.py index 0c03b251..4a9b98d2 100644 --- a/src/queues.py +++ b/src/queues.py @@ -2,10 +2,8 @@ import threading import time -try: - import queue as Queue -except ImportError: - import Queue + +from six.moves import queue try: from multiqueue import MultiQueue @@ -13,13 +11,13 @@ except ImportError: from .multiqueue import MultiQueue -class ObjectProcessorQueue(Queue.Queue): +class ObjectProcessorQueue(queue.Queue): """Special queue class using lock for `.threads.objectProcessor`""" maxSize = 32000000 def __init__(self): - Queue.Queue.__init__(self) + queue.Queue.__init__(self) self.sizeLock = threading.Lock() #: in Bytes. We maintain this to prevent nodes from flooding us #: with objects which take up too much memory. If this gets @@ -31,27 +29,27 @@ class ObjectProcessorQueue(Queue.Queue): time.sleep(1) with self.sizeLock: self.curSize += len(item[1]) - Queue.Queue.put(self, item, block, timeout) + queue.Queue.put(self, item, block, timeout) def get(self, block=True, timeout=None): - item = Queue.Queue.get(self, block, timeout) + item = queue.Queue.get(self, block, timeout) with self.sizeLock: self.curSize -= len(item[1]) return item -workerQueue = Queue.Queue() -UISignalQueue = Queue.Queue() -addressGeneratorQueue = Queue.Queue() +workerQueue = queue.Queue() +UISignalQueue = queue.Queue() +addressGeneratorQueue = queue.Queue() #: `.network.ReceiveQueueThread` instances dump objects they hear #: on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() invQueue = MultiQueue() addrQueue = MultiQueue() -portCheckerQueue = Queue.Queue() -receiveDataQueue = Queue.Queue() +portCheckerQueue = queue.Queue() +receiveDataQueue = queue.Queue() #: The address generator thread uses this queue to get information back #: to the API thread. -apiAddressGeneratorReturnQueue = Queue.Queue() +apiAddressGeneratorReturnQueue = queue.Queue() #: for exceptions -excQueue = Queue.Queue() +excQueue = queue.Queue() diff --git a/src/shutdown.py b/src/shutdown.py index 74332a81..3e2b8ca8 100644 --- a/src/shutdown.py +++ b/src/shutdown.py @@ -1,9 +1,11 @@ """shutdown function""" + import os -import Queue import threading import time +from six.moves import queue + import state from debug import logger from helper_sql import sqlQuery, sqlStoredProcedure @@ -69,14 +71,14 @@ def doCleanShutdown(): sqlStoredProcedure('exit') # flush queues - for queue in ( + for q in ( workerQueue, UISignalQueue, addressGeneratorQueue, objectProcessorQueue): while True: try: - queue.get(False) - queue.task_done() - except Queue.Empty: + q.get(False) + q.task_done() + except queue.Empty: break if state.thisapp.daemon or not state.enableGUI: