From ec4a16b388001edce052c8278235fc536d8512f5 Mon Sep 17 00:00:00 2001 From: mailchuck Date: Fri, 22 Jan 2016 14:12:57 +0100 Subject: [PATCH] objectProcessorQueue fixes - it didn't shutdown correctly - it didn't handle exception correctly (however, if I understand correctly, this will never be triggered if using blocking get, so it doesn't affect PyBitmessage) - flushing size check changed from 1 to 0 (I don't know why it was 1) --- src/class_objectProcessor.py | 2 +- src/class_objectProcessorQueue.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 48e90a22..0035dce9 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -72,7 +72,7 @@ class objectProcessor(threading.Thread): time.sleep(.5) # Wait just a moment for most of the connections to close numberOfObjectsThatWereInTheObjectProcessorQueue = 0 with SqlBulkExecute() as sql: - while shared.objectProcessorQueue.curSize > 1: + while shared.objectProcessorQueue.curSize > 0: objectType, data = shared.objectProcessorQueue.get() sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', objectType,data) diff --git a/src/class_objectProcessorQueue.py b/src/class_objectProcessorQueue.py index 6d3d4804..9bf3f82a 100644 --- a/src/class_objectProcessorQueue.py +++ b/src/class_objectProcessorQueue.py @@ -13,16 +13,17 @@ class ObjectProcessorQueue(Queue.Queue): self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. def put(self, item, block = True, timeout = None): - while self.curSize >= self.maxSize and not shared.shutdown: + while self.curSize >= self.maxSize: time.sleep(1) - if shared.shutdown: - return with self.sizeLock: self.curSize += len(item[1]) Queue.Queue.put(self, item, block, timeout) def get(self, block = True, timeout = None): - item = Queue.Queue.get(self, block, timeout) + try: + item = Queue.Queue.get(self, block, timeout) + except Queue.Empty as e: + raise Queue.Empty() with self.sizeLock: self.curSize -= len(item[1]) return item