From a090eea9b03ad7316e471743e1e61a411172bcc1 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 19 Oct 2017 08:56:48 +0200 Subject: [PATCH] Minor multiqueue updates - add task_done to addrthread and invthread - implement totalSize for multiqueue - order in invThread changed --- src/multiqueue.py | 6 ++++-- src/network/addrthread.py | 2 ++ src/network/invthread.py | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/multiqueue.py b/src/multiqueue.py index 30326ee7..62b0fa87 100644 --- a/src/multiqueue.py +++ b/src/multiqueue.py @@ -24,8 +24,7 @@ class MultiQueue(Queue.Queue): # Put a new item in the queue def _put(self, item): #self.queue.append(item) - i = random.randrange(0, self.queueCount) - self.queues[i].append((item)) + self.queues[random.randrange(self.queueCount)].append((item)) # Get an item from the queue def _get(self): @@ -33,3 +32,6 @@ class MultiQueue(Queue.Queue): def iterate(self): self.iter = (self.iter + 1) % self.queueCount + + def totalSize(self): + return sum(len(x) for x in self.queues) diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 8c78894f..5b0ea638 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -31,4 +31,6 @@ class AddrThread(threading.Thread, StoppableThread): #finish addrQueue.iterate() + for i in range(len(chunk)): + addrQueue.task_done() self.stop.wait(1) diff --git a/src/network/invthread.py b/src/network/invthread.py index cbed7a70..4f26c0fa 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -33,6 +33,7 @@ class InvThread(threading.Thread, StoppableThread): self.dandelionLocalRouteRefresh() try: data = invQueue.get(False) + chunk.append((data[0], data[1])) # locally generated if len(data) == 2: DandelionStems().add(data[1], None, self.dandelionRoutes) @@ -41,7 +42,6 @@ class InvThread(threading.Thread, StoppableThread): else: source = BMConnectionPool().getConnectionByAddr(data[2]) BMConnectionPool().handleReceivedObject(data[0], data[1], source) - chunk.append((data[0], data[1])) except Queue.Empty: break # connection not found, handle it as if generated locally @@ -81,4 +81,6 @@ class InvThread(threading.Thread, StoppableThread): connection.append_write_buf(protocol.CreatePacket('dinv', \ addresses.encodeVarint(len(stems)) + "".join(stems))) invQueue.iterate() + for i in range(len(chunk)): + invQueue.task_done() self.stop.wait(1)