diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index d92a37ec..e5512e7c 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -2,6 +2,7 @@ import threading import shared import time import sys +from helper_sql import * '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. It cleans these data structures in memory: @@ -27,21 +28,21 @@ class singleCleaner(threading.Thread): timeWeLastClearedInventoryAndPubkeysTables = 0 while True: - shared.sqlLock.acquire() shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) for hash, storedValue in shared.inventory.items(): objectType, streamNumber, payload, receivedTime = storedValue if int(time.time()) - 3600 > receivedTime: - t = (hash, objectType, streamNumber, payload, receivedTime,'') - shared.sqlSubmitQueue.put( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() + sqlExecute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + receivedTime, + '') del shared.inventory[hash] - shared.sqlSubmitQueue.put('commit') shared.UISignalQueue.put(('updateStatusBar', '')) - shared.sqlLock.release() shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. # If we are running as a daemon then we are going to fill up the UI @@ -53,29 +54,20 @@ class singleCleaner(threading.Thread): timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) # inventory (moves data from the inventory data structure to # the on-disk sql database) - shared.sqlLock.acquire() # inventory (clears pubkeys after 28 days and everything else # after 2 days and 12 hours) - t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int( - time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys) - shared.sqlSubmitQueue.put( - '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime'pubkey') OR (receivedtime (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' - t = (int( - time.time()), msgretrynumber + 1, 'msgqueued', ackdata) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') + sqlExecute( + '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''', + int(time.time()), + msgretrynumber + 1, + 'msgqueued', + ackdata) shared.workerQueue.put(('sendmessage', '')) shared.UISignalQueue.put(( 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() time.sleep(300)