From fe0106bdcecb73d21506dc9c5f7f998386edcea9 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 22 Jan 2016 13:49:28 +0100 Subject: [PATCH] Object processor queue class Previous commit didn't include the class. This class takes care of queue size monitoring so that the system doesn't run out of memory. --- src/class_objectProcessorQueue.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 src/class_objectProcessorQueue.py diff --git a/src/class_objectProcessorQueue.py b/src/class_objectProcessorQueue.py new file mode 100644 index 00000000..6d3d4804 --- /dev/null +++ b/src/class_objectProcessorQueue.py @@ -0,0 +1,28 @@ +import shared + +import Queue +import threading +import time + +class ObjectProcessorQueue(Queue.Queue): + maxSize = 32000000 + + def __init__(self): + Queue.Queue.__init__(self) + self.sizeLock = threading.Lock() + self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. + + def put(self, item, block = True, timeout = None): + while self.curSize >= self.maxSize and not shared.shutdown: + time.sleep(1) + if shared.shutdown: + return + with self.sizeLock: + self.curSize += len(item[1]) + Queue.Queue.put(self, item, block, timeout) + + def get(self, block = True, timeout = None): + item = Queue.Queue.get(self, block, timeout) + with self.sizeLock: + self.curSize -= len(item[1]) + return item