From cc3cf777596f43be63310daeae804bd640b015f9 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Tue, 27 Jun 2017 13:25:12 +0200 Subject: [PATCH] New class multiqueue - to be used for invthread and addthread - updated invthread for multiqueue --- src/multiqueue.py | 35 +++++++++++++++++++++ src/network/invthread.py | 68 +++++++++------------------------------- src/queues.py | 5 ++- 3 files changed, 53 insertions(+), 55 deletions(-) create mode 100644 src/multiqueue.py diff --git a/src/multiqueue.py b/src/multiqueue.py new file mode 100644 index 00000000..30326ee7 --- /dev/null +++ b/src/multiqueue.py @@ -0,0 +1,35 @@ +from collections import deque +import Queue +import random + +class MultiQueue(Queue.Queue): + defaultQueueCount = 10 + def __init__(self, maxsize=0, count=0): + if not count: + self.queueCount = MultiQueue.defaultQueueCount + else: + self.queueCount = count + Queue.Queue.__init__(self, maxsize) + + # Initialize the queue representation + def _init(self, maxsize): + self.iter = 0 + self.queues = [] + for i in range(self.queueCount): + self.queues.append(deque()) + + def _qsize(self, len=len): + return len(self.queues[self.iter]) + + # Put a new item in the queue + def _put(self, item): + #self.queue.append(item) + i = random.randrange(0, self.queueCount) + self.queues[i].append((item)) + + # Get an item from the queue + def _get(self): + return self.queues[self.iter].popleft() + + def iterate(self): + self.iter = (self.iter + 1) % self.queueCount diff --git a/src/network/invthread.py b/src/network/invthread.py index 9d05aec4..398fecf0 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,37 +1,22 @@ -from binascii import hexlify -import collections import Queue -import random import threading -import time import addresses -from bmconfigparser import BMConfigParser -from debug import logger from helper_threading import StoppableThread -from network.bmproto import BMProto from network.connectionpool import BMConnectionPool from queues import invQueue import protocol import state class InvThread(threading.Thread, StoppableThread): - size = 10 - def __init__(self): threading.Thread.__init__(self, name="InvThread") self.initStop() self.name = "InvThread" - self.shutdown = False - - self.collectionOfInvs = [] - for i in range(InvThread.size): - self.collectionOfInvs.append({}) - def run(self): - iterator = 0 while not state.shutdown: + chunk = [] while True: try: data = invQueue.get(False) @@ -39,50 +24,25 @@ class InvThread(threading.Thread, StoppableThread): BMConnectionPool().handleReceivedObject(data[0], data[1]) else: BMConnectionPool().handleReceivedObject(data[0], data[1], data[2]) - self.holdHash (data[0], data[1]) + chunk.append((data[0], data[1])) except Queue.Empty: break - if self.collectionOfInvs[iterator]: - for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + if chunk: + for connection in BMConnectionPool().inboundConnections.values() + \ + BMConnectionPool().outboundConnections.values(): hashes = [] - for stream in connection.streams: + for inv in chunk: + if inv[0] not in connection.streams: + continue try: - for hashId in self.collectionOfInvs[iterator][stream]: - try: - with connection.objectsNewToThemLock: - del connection.objectsNewToThem[hashId] - hashes.append(hashId) - except KeyError: - pass + with connection.objectsNewToThemLock: + del connection.objectsNewToThem[inv[1]] + hashes.append(inv[1]) except KeyError: continue if hashes: - connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + "".join(hashes))) - self.collectionOfInvs[iterator] = {} - iterator += 1 - iterator %= InvThread.size + connection.writeQueue.put(protocol.CreatePacket('inv', \ + addresses.encodeVarint(len(hashes)) + "".join(hashes))) + invQueue.iterate() self.stop.wait(1) - - def holdHash(self, stream, hashId): - i = random.randrange(0, InvThread.size) - if stream not in self.collectionOfInvs[i]: - self.collectionOfInvs[i][stream] = [] - self.collectionOfInvs[i][stream].append(hashId) - - def hasHash(self, hashId): - for streamlist in self.collectionOfInvs: - for stream in streamlist: - if hashId in streamlist[stream]: - return True - return False - - def hashCount(self): - retval = 0 - for streamlist in self.collectionOfInvs: - for stream in streamlist: - retval += len(streamlist[stream]) - return retval - - def close(self): - self.shutdown = True diff --git a/src/queues.py b/src/queues.py index 7c36d54a..223c7c3b 100644 --- a/src/queues.py +++ b/src/queues.py @@ -1,12 +1,15 @@ import Queue + from class_objectProcessorQueue import ObjectProcessorQueue +from multiqueue import MultiQueue workerQueue = Queue.Queue() UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. objectProcessorQueue = ObjectProcessorQueue() -invQueue = Queue.Queue() +invQueue = MultiQueue() +addrQueue = MultiQueue() portCheckerQueue = Queue.Queue() peerDiscoveryQueue = Queue.Queue() apiAddressGeneratorReturnQueue = Queue.Queue(