New class multiqueue
- to be used for invthread and addthread - updated invthread for multiqueue
This commit is contained in:
parent
f5a143d0b8
commit
cc3cf77759
35
src/multiqueue.py
Normal file
35
src/multiqueue.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
from collections import deque
|
||||
import Queue
|
||||
import random
|
||||
|
||||
class MultiQueue(Queue.Queue):
|
||||
defaultQueueCount = 10
|
||||
def __init__(self, maxsize=0, count=0):
|
||||
if not count:
|
||||
self.queueCount = MultiQueue.defaultQueueCount
|
||||
else:
|
||||
self.queueCount = count
|
||||
Queue.Queue.__init__(self, maxsize)
|
||||
|
||||
# Initialize the queue representation
|
||||
def _init(self, maxsize):
|
||||
self.iter = 0
|
||||
self.queues = []
|
||||
for i in range(self.queueCount):
|
||||
self.queues.append(deque())
|
||||
|
||||
def _qsize(self, len=len):
|
||||
return len(self.queues[self.iter])
|
||||
|
||||
# Put a new item in the queue
|
||||
def _put(self, item):
|
||||
#self.queue.append(item)
|
||||
i = random.randrange(0, self.queueCount)
|
||||
self.queues[i].append((item))
|
||||
|
||||
# Get an item from the queue
|
||||
def _get(self):
|
||||
return self.queues[self.iter].popleft()
|
||||
|
||||
def iterate(self):
|
||||
self.iter = (self.iter + 1) % self.queueCount
|
|
@ -1,37 +1,22 @@
|
|||
from binascii import hexlify
|
||||
import collections
|
||||
import Queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
import addresses
|
||||
from bmconfigparser import BMConfigParser
|
||||
from debug import logger
|
||||
from helper_threading import StoppableThread
|
||||
from network.bmproto import BMProto
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from queues import invQueue
|
||||
import protocol
|
||||
import state
|
||||
|
||||
class InvThread(threading.Thread, StoppableThread):
|
||||
size = 10
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self, name="InvThread")
|
||||
self.initStop()
|
||||
self.name = "InvThread"
|
||||
|
||||
self.shutdown = False
|
||||
|
||||
self.collectionOfInvs = []
|
||||
for i in range(InvThread.size):
|
||||
self.collectionOfInvs.append({})
|
||||
|
||||
def run(self):
|
||||
iterator = 0
|
||||
while not state.shutdown:
|
||||
chunk = []
|
||||
while True:
|
||||
try:
|
||||
data = invQueue.get(False)
|
||||
|
@ -39,50 +24,25 @@ class InvThread(threading.Thread, StoppableThread):
|
|||
BMConnectionPool().handleReceivedObject(data[0], data[1])
|
||||
else:
|
||||
BMConnectionPool().handleReceivedObject(data[0], data[1], data[2])
|
||||
self.holdHash (data[0], data[1])
|
||||
chunk.append((data[0], data[1]))
|
||||
except Queue.Empty:
|
||||
break
|
||||
|
||||
if self.collectionOfInvs[iterator]:
|
||||
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
|
||||
if chunk:
|
||||
for connection in BMConnectionPool().inboundConnections.values() + \
|
||||
BMConnectionPool().outboundConnections.values():
|
||||
hashes = []
|
||||
for stream in connection.streams:
|
||||
for inv in chunk:
|
||||
if inv[0] not in connection.streams:
|
||||
continue
|
||||
try:
|
||||
for hashId in self.collectionOfInvs[iterator][stream]:
|
||||
try:
|
||||
with connection.objectsNewToThemLock:
|
||||
del connection.objectsNewToThem[hashId]
|
||||
hashes.append(hashId)
|
||||
except KeyError:
|
||||
pass
|
||||
with connection.objectsNewToThemLock:
|
||||
del connection.objectsNewToThem[inv[1]]
|
||||
hashes.append(inv[1])
|
||||
except KeyError:
|
||||
continue
|
||||
if hashes:
|
||||
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + "".join(hashes)))
|
||||
self.collectionOfInvs[iterator] = {}
|
||||
iterator += 1
|
||||
iterator %= InvThread.size
|
||||
connection.writeQueue.put(protocol.CreatePacket('inv', \
|
||||
addresses.encodeVarint(len(hashes)) + "".join(hashes)))
|
||||
invQueue.iterate()
|
||||
self.stop.wait(1)
|
||||
|
||||
def holdHash(self, stream, hashId):
|
||||
i = random.randrange(0, InvThread.size)
|
||||
if stream not in self.collectionOfInvs[i]:
|
||||
self.collectionOfInvs[i][stream] = []
|
||||
self.collectionOfInvs[i][stream].append(hashId)
|
||||
|
||||
def hasHash(self, hashId):
|
||||
for streamlist in self.collectionOfInvs:
|
||||
for stream in streamlist:
|
||||
if hashId in streamlist[stream]:
|
||||
return True
|
||||
return False
|
||||
|
||||
def hashCount(self):
|
||||
retval = 0
|
||||
for streamlist in self.collectionOfInvs:
|
||||
for stream in streamlist:
|
||||
retval += len(streamlist[stream])
|
||||
return retval
|
||||
|
||||
def close(self):
|
||||
self.shutdown = True
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
import Queue
|
||||
|
||||
from class_objectProcessorQueue import ObjectProcessorQueue
|
||||
from multiqueue import MultiQueue
|
||||
|
||||
workerQueue = Queue.Queue()
|
||||
UISignalQueue = Queue.Queue()
|
||||
addressGeneratorQueue = Queue.Queue()
|
||||
# receiveDataThreads dump objects they hear on the network into this queue to be processed.
|
||||
objectProcessorQueue = ObjectProcessorQueue()
|
||||
invQueue = Queue.Queue()
|
||||
invQueue = MultiQueue()
|
||||
addrQueue = MultiQueue()
|
||||
portCheckerQueue = Queue.Queue()
|
||||
peerDiscoveryQueue = Queue.Queue()
|
||||
apiAddressGeneratorReturnQueue = Queue.Queue(
|
||||
|
|
Reference in New Issue
Block a user