Minor multiqueue updates
- add task_done to addrthread and invthread - implement totalSize for multiqueue - order in invThread changed
This commit is contained in:
parent
d44c6c6464
commit
a090eea9b0
|
@ -24,8 +24,7 @@ class MultiQueue(Queue.Queue):
|
||||||
# Put a new item in the queue
|
# Put a new item in the queue
|
||||||
def _put(self, item):
|
def _put(self, item):
|
||||||
#self.queue.append(item)
|
#self.queue.append(item)
|
||||||
i = random.randrange(0, self.queueCount)
|
self.queues[random.randrange(self.queueCount)].append((item))
|
||||||
self.queues[i].append((item))
|
|
||||||
|
|
||||||
# Get an item from the queue
|
# Get an item from the queue
|
||||||
def _get(self):
|
def _get(self):
|
||||||
|
@ -33,3 +32,6 @@ class MultiQueue(Queue.Queue):
|
||||||
|
|
||||||
def iterate(self):
|
def iterate(self):
|
||||||
self.iter = (self.iter + 1) % self.queueCount
|
self.iter = (self.iter + 1) % self.queueCount
|
||||||
|
|
||||||
|
def totalSize(self):
|
||||||
|
return sum(len(x) for x in self.queues)
|
||||||
|
|
|
@ -31,4 +31,6 @@ class AddrThread(threading.Thread, StoppableThread):
|
||||||
#finish
|
#finish
|
||||||
|
|
||||||
addrQueue.iterate()
|
addrQueue.iterate()
|
||||||
|
for i in range(len(chunk)):
|
||||||
|
addrQueue.task_done()
|
||||||
self.stop.wait(1)
|
self.stop.wait(1)
|
||||||
|
|
|
@ -33,6 +33,7 @@ class InvThread(threading.Thread, StoppableThread):
|
||||||
self.dandelionLocalRouteRefresh()
|
self.dandelionLocalRouteRefresh()
|
||||||
try:
|
try:
|
||||||
data = invQueue.get(False)
|
data = invQueue.get(False)
|
||||||
|
chunk.append((data[0], data[1]))
|
||||||
# locally generated
|
# locally generated
|
||||||
if len(data) == 2:
|
if len(data) == 2:
|
||||||
DandelionStems().add(data[1], None, self.dandelionRoutes)
|
DandelionStems().add(data[1], None, self.dandelionRoutes)
|
||||||
|
@ -41,7 +42,6 @@ class InvThread(threading.Thread, StoppableThread):
|
||||||
else:
|
else:
|
||||||
source = BMConnectionPool().getConnectionByAddr(data[2])
|
source = BMConnectionPool().getConnectionByAddr(data[2])
|
||||||
BMConnectionPool().handleReceivedObject(data[0], data[1], source)
|
BMConnectionPool().handleReceivedObject(data[0], data[1], source)
|
||||||
chunk.append((data[0], data[1]))
|
|
||||||
except Queue.Empty:
|
except Queue.Empty:
|
||||||
break
|
break
|
||||||
# connection not found, handle it as if generated locally
|
# connection not found, handle it as if generated locally
|
||||||
|
@ -81,4 +81,6 @@ class InvThread(threading.Thread, StoppableThread):
|
||||||
connection.append_write_buf(protocol.CreatePacket('dinv', \
|
connection.append_write_buf(protocol.CreatePacket('dinv', \
|
||||||
addresses.encodeVarint(len(stems)) + "".join(stems)))
|
addresses.encodeVarint(len(stems)) + "".join(stems)))
|
||||||
invQueue.iterate()
|
invQueue.iterate()
|
||||||
|
for i in range(len(chunk)):
|
||||||
|
invQueue.task_done()
|
||||||
self.stop.wait(1)
|
self.stop.wait(1)
|
||||||
|
|
Reference in New Issue
Block a user