objectProcessorQueue fixes

- it didn't shutdown correctly
- it didn't handle exception correctly (however, if I understand
correctly, this will never be triggered if using blocking get, so it
doesn't affect PyBitmessage)
- flushing size check changed from 1 to 0 (I don't know why it was 1)
This commit is contained in:
mailchuck 2016-01-22 14:12:57 +01:00 committed by Peter Surda
parent 47f0df6c0b
commit ec4a16b388
2 changed files with 6 additions and 5 deletions

View File

@ -72,7 +72,7 @@ class objectProcessor(threading.Thread):
time.sleep(.5) # Wait just a moment for most of the connections to close time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0 numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql: with SqlBulkExecute() as sql:
while shared.objectProcessorQueue.curSize > 1: while shared.objectProcessorQueue.curSize > 0:
objectType, data = shared.objectProcessorQueue.get() objectType, data = shared.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType,data) objectType,data)

View File

@ -13,16 +13,17 @@ class ObjectProcessorQueue(Queue.Queue):
self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
def put(self, item, block = True, timeout = None): def put(self, item, block = True, timeout = None):
while self.curSize >= self.maxSize and not shared.shutdown: while self.curSize >= self.maxSize:
time.sleep(1) time.sleep(1)
if shared.shutdown:
return
with self.sizeLock: with self.sizeLock:
self.curSize += len(item[1]) self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout) Queue.Queue.put(self, item, block, timeout)
def get(self, block = True, timeout = None): def get(self, block = True, timeout = None):
try:
item = Queue.Queue.get(self, block, timeout) item = Queue.Queue.get(self, block, timeout)
except Queue.Empty as e:
raise Queue.Empty()
with self.sizeLock: with self.sizeLock:
self.curSize -= len(item[1]) self.curSize -= len(item[1])
return item return item