From 8d8e43b1fc828c67d497521f2f46a7accc387956 Mon Sep 17 00:00:00 2001 From: "Grant T. Olson" Date: Sat, 31 Aug 2013 10:40:11 -0400 Subject: [PATCH] Added SqlBulkExecute class so we can update inventory without a million commits --- src/class_singleCleaner.py | 26 ++++++++++++++------------ src/helper_sql.py | 28 ++++++++++++++++++++++++++++ src/shared.py | 12 ++++++------ 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index e5512e7c..3cc80868 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -30,18 +30,20 @@ class singleCleaner(threading.Thread): while True: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time()) - 3600 > receivedTime: - sqlExecute( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash, - objectType, - streamNumber, - payload, - receivedTime, - '') - del shared.inventory[hash] + + with SqlBulkExecute() as sql: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if int(time.time()) - 3600 > receivedTime: + sql.execute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + receivedTime, + '') + del shared.inventory[hash] shared.UISignalQueue.put(('updateStatusBar', '')) shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. diff --git a/src/helper_sql.py b/src/helper_sql.py index 706fce0c..0353f9ae 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -36,3 +36,31 @@ def sqlStoredProcedure(procName): sqlLock.acquire() sqlSubmitQueue.put(procName) sqlLock.release() + +class SqlBulkExecute: + def __enter__(self): + sqlLock.acquire() + return self + + def __exit__(self, type, value, traceback): + sqlSubmitQueue.put('commit') + sqlLock.release() + + def execute(self, sqlStatement, *args): + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + sqlReturnQueue.get() + + def query(self, sqlStatement, *args): + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + return sqlReturnQueue.get() + diff --git a/src/shared.py b/src/shared.py index 5061f902..0ff80978 100644 --- a/src/shared.py +++ b/src/shared.py @@ -305,12 +305,12 @@ def broadcastToSendDataQueues(data): def flushInventory(): #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. - for hash, storedValue in inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - t = () - sqlExecute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash,objectType,streamNumber,payload,receivedTime,'') - del inventory[hash] + with SqlBulkExecute() as sql: + for hash, storedValue in inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash,objectType,streamNumber,payload,receivedTime,'') + del inventory[hash] def fixPotentiallyInvalidUTF8Data(text): try: