Added SqlBulkExecute class so we can update inventory without a million commits

This commit is contained in:
Grant T. Olson 2013-08-31 10:40:11 -04:00
parent b83781cefb
commit 8d8e43b1fc
3 changed files with 48 additions and 18 deletions

View File

@ -30,18 +30,20 @@ class singleCleaner(threading.Thread):
while True: while True:
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
for hash, storedValue in shared.inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue with SqlBulkExecute() as sql:
if int(time.time()) - 3600 > receivedTime: for hash, storedValue in shared.inventory.items():
sqlExecute( objectType, streamNumber, payload, receivedTime = storedValue
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', if int(time.time()) - 3600 > receivedTime:
hash, sql.execute(
objectType, '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
streamNumber, hash,
payload, objectType,
receivedTime, streamNumber,
'') payload,
del shared.inventory[hash] receivedTime,
'')
del shared.inventory[hash]
shared.UISignalQueue.put(('updateStatusBar', '')) shared.UISignalQueue.put(('updateStatusBar', ''))
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.

View File

@ -36,3 +36,31 @@ def sqlStoredProcedure(procName):
sqlLock.acquire() sqlLock.acquire()
sqlSubmitQueue.put(procName) sqlSubmitQueue.put(procName)
sqlLock.release() sqlLock.release()
class SqlBulkExecute:
def __enter__(self):
sqlLock.acquire()
return self
def __exit__(self, type, value, traceback):
sqlSubmitQueue.put('commit')
sqlLock.release()
def execute(self, sqlStatement, *args):
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
sqlReturnQueue.get()
def query(self, sqlStatement, *args):
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
return sqlReturnQueue.get()

View File

@ -305,12 +305,12 @@ def broadcastToSendDataQueues(data):
def flushInventory(): def flushInventory():
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
for hash, storedValue in inventory.items(): with SqlBulkExecute() as sql:
objectType, streamNumber, payload, receivedTime = storedValue for hash, storedValue in inventory.items():
t = () objectType, streamNumber, payload, receivedTime = storedValue
sqlExecute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
hash,objectType,streamNumber,payload,receivedTime,'') hash,objectType,streamNumber,payload,receivedTime,'')
del inventory[hash] del inventory[hash]
def fixPotentiallyInvalidUTF8Data(text): def fixPotentiallyInvalidUTF8Data(text):
try: try: