class_singleCleaner uses helper_sql

This commit is contained in:
Grant T. Olson 2013-08-27 08:46:57 -04:00 committed by Grant T. Olson
parent 5b23d99907
commit 9e8cbd0f0e
1 changed files with 29 additions and 41 deletions

View File

@ -2,6 +2,7 @@ import threading
import shared
import time
import sys
from helper_sql import *
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory:
@ -27,21 +28,21 @@ class singleCleaner(threading.Thread):
timeWeLastClearedInventoryAndPubkeysTables = 0
while True:
shared.sqlLock.acquire()
shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
for hash, storedValue in shared.inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue
if int(time.time()) - 3600 > receivedTime:
t = (hash, objectType, streamNumber, payload, receivedTime,'')
shared.sqlSubmitQueue.put(
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
sqlExecute(
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
hash,
objectType,
streamNumber,
payload,
receivedTime,
'')
del shared.inventory[hash]
shared.sqlSubmitQueue.put('commit')
shared.UISignalQueue.put(('updateStatusBar', ''))
shared.sqlLock.release()
shared.broadcastToSendDataQueues((
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
# If we are running as a daemon then we are going to fill up the UI
@ -53,29 +54,20 @@ class singleCleaner(threading.Thread):
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
# inventory (moves data from the inventory data structure to
# the on-disk sql database)
shared.sqlLock.acquire()
# inventory (clears pubkeys after 28 days and everything else
# after 2 days and 12 hours)
t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int(
time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
shared.sqlSubmitQueue.put(
'''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
sqlExecute(
'''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''',
int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory,
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
# pubkeys
t = (int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys,)
shared.sqlSubmitQueue.put(
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
sqlExecute(
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
t = ()
shared.sqlSubmitQueue.put(
queryreturn = sqlQuery(
'''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent') ''') # If the message's folder='trash' then we'll ignore it.
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
for row in queryreturn:
if len(row) < 5:
with shared.printLock:
@ -96,27 +88,23 @@ class singleCleaner(threading.Thread):
shared.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to request a public key...'))
t = (int(
time.time()), pubkeyretrynumber + 1, toripe)
shared.sqlSubmitQueue.put(
'''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
t = ()
sqlExecute(
'''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''',
int(time.time()),
pubkeyretrynumber + 1,
toripe)
shared.workerQueue.put(('sendmessage', ''))
else: # status == msgsent
if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))):
print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.'
t = (int(
time.time()), msgretrynumber + 1, 'msgqueued', ackdata)
shared.sqlSubmitQueue.put(
'''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
sqlExecute(
'''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''',
int(time.time()),
msgretrynumber + 1,
'msgqueued',
ackdata)
shared.workerQueue.put(('sendmessage', ''))
shared.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
time.sleep(300)