objectProcessorQueue fixes

- it didn't shutdown correctly
- it didn't handle exception correctly (however, if I understand
correctly, this will never be triggered if using blocking get, so it
doesn't affect PyBitmessage)
- flushing size check changed from 1 to 0 (I don't know why it was 1)
This commit is contained in:
mailchuck 2016-01-22 14:12:57 +01:00
parent fe0106bdce
commit d925ab61b3
Signed by untrusted user who does not match committer: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
2 changed files with 6 additions and 5 deletions

View File

@ -72,7 +72,7 @@ class objectProcessor(threading.Thread):
time.sleep(.5) # Wait just a moment for most of the connections to close time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0 numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql: with SqlBulkExecute() as sql:
while shared.objectProcessorQueue.curSize > 1: while shared.objectProcessorQueue.curSize > 0:
objectType, data = shared.objectProcessorQueue.get() objectType, data = shared.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType,data) objectType,data)

View File

@ -13,16 +13,17 @@ class ObjectProcessorQueue(Queue.Queue):
self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. self.curSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
def put(self, item, block = True, timeout = None): def put(self, item, block = True, timeout = None):
while self.curSize >= self.maxSize and not shared.shutdown: while self.curSize >= self.maxSize:
time.sleep(1) time.sleep(1)
if shared.shutdown:
return
with self.sizeLock: with self.sizeLock:
self.curSize += len(item[1]) self.curSize += len(item[1])
Queue.Queue.put(self, item, block, timeout) Queue.Queue.put(self, item, block, timeout)
def get(self, block = True, timeout = None): def get(self, block = True, timeout = None):
try:
item = Queue.Queue.get(self, block, timeout) item = Queue.Queue.get(self, block, timeout)
except Queue.Empty as e:
raise Queue.Empty()
with self.sizeLock: with self.sizeLock:
self.curSize -= len(item[1]) self.curSize -= len(item[1])
return item return item