Object processor queue class
Previous commit didn't include the class. This class takes care of queue size monitoring so that the system doesn't run out of memory.
This commit is contained in:
parent
e4f31d25fc
commit
47f0df6c0b
28
src/class_objectProcessorQueue.py
Normal file
28
src/class_objectProcessorQueue.py
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
import shared
|
||||||
|
|
||||||
|
import Queue
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
class ObjectProcessorQueue(Queue.Queue):
|
||||||
|
maxSize = 32000000
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
Queue.Queue.__init__(self)
|
||||||
|
self.sizeLock = threading.Lock()
|
||||||
|
self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
|
||||||
|
|
||||||
|
def put(self, item, block = True, timeout = None):
|
||||||
|
while self.curSize >= self.maxSize and not shared.shutdown:
|
||||||
|
time.sleep(1)
|
||||||
|
if shared.shutdown:
|
||||||
|
return
|
||||||
|
with self.sizeLock:
|
||||||
|
self.curSize += len(item[1])
|
||||||
|
Queue.Queue.put(self, item, block, timeout)
|
||||||
|
|
||||||
|
def get(self, block = True, timeout = None):
|
||||||
|
item = Queue.Queue.get(self, block, timeout)
|
||||||
|
with self.sizeLock:
|
||||||
|
self.curSize -= len(item[1])
|
||||||
|
return item
|
Reference in New Issue
Block a user