PyBitmessage/src/class_singleCleaner.py

230 lines
9.1 KiB
Python
Raw Normal View History

2014-08-27 09:14:32 +02:00
"""
2018-05-02 17:29:55 +02:00
The singleCleaner class is a timer-driven thread that cleans data structures
to free memory, resends messages when a remote node doesn't respond, and
2014-08-27 09:14:32 +02:00
sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory:
2013-10-02 02:14:53 +02:00
inventory (moves data to the on-disk sql database)
inventorySets (clears then reloads data out of sql database)
It cleans these tables on the disk:
2014-08-27 09:14:32 +02:00
inventory (clears expired objects)
pubkeys (clears pubkeys older than 4 weeks old which we have not used
personally)
knownNodes (clears addresses which have not been online for over 3 days)
It resends messages when there has been no response:
resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...)
resends msg messages in 5 days (then 10 days, then 20 days, etc...)
2014-08-27 09:14:32 +02:00
"""
2018-05-02 17:29:55 +02:00
import gc
import os
2018-05-02 17:29:55 +02:00
import shared
import threading
2018-05-02 17:29:55 +02:00
import time
import tr
2018-05-02 17:29:55 +02:00
from bmconfigparser import BMConfigParser
from helper_sql import sqlQuery, sqlExecute
from helper_threading import StoppableThread
2018-05-02 17:29:55 +02:00
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from debug import logger
import knownnodes
import queues
import state
class singleCleaner(threading.Thread, StoppableThread):
cycleLength = 300
expireDiscoveredPeers = 300
def __init__(self):
threading.Thread.__init__(self, name="singleCleaner")
self.initStop()
def run(self):
gc.disable()
timeWeLastClearedInventoryAndPubkeysTables = 0
try:
shared.maximumLengthOfTimeToBotherResendingMessages = (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxdays')) *
24 * 60 * 60
) + (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxmonths')) *
(60 * 60 * 24 * 365)/12)
except:
# Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing
# from the config file.
shared.maximumLengthOfTimeToBotherResendingMessages = float('inf')
2017-05-27 19:01:14 +02:00
# initial wait
if state.shutdown == 0:
self.stop.wait(singleCleaner.cycleLength)
2017-05-27 19:01:14 +02:00
while state.shutdown == 0:
queues.UISignalQueue.put((
'updateStatusBar',
'Doing housekeeping (Flushing inventory in memory to disk...)'
))
Inventory().flush()
queues.UISignalQueue.put(('updateStatusBar', ''))
2018-05-02 17:29:55 +02:00
# If we are running as a daemon then we are going to fill up the UI
# queue which will never be handled by a UI. We should clear it to
# save memory.
# FIXME redundant?
if shared.thisapp.daemon or not state.enableGUI:
queues.UISignalQueue.queue.clear()
if timeWeLastClearedInventoryAndPubkeysTables < \
int(time.time()) - 7380:
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
Inventory().clean()
# pubkeys
2013-08-27 14:46:57 +02:00
sqlExecute(
"DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'",
2013-08-27 14:46:57 +02:00
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
# Let us resend getpubkey objects if we have not yet heard
# a pubkey, and also msg objects if we have not yet heard
# an acknowledgement
2013-08-27 14:46:57 +02:00
queryreturn = sqlQuery(
"SELECT toaddress, ackdata, status FROM sent"
" WHERE ((status='awaitingpubkey' OR status='msgsent')"
" AND folder='sent' AND sleeptill<? AND senttime>?)",
2015-03-09 07:35:32 +01:00
int(time.time()),
int(time.time())
- shared.maximumLengthOfTimeToBotherResendingMessages
)
for row in queryreturn:
2015-03-09 07:35:32 +01:00
if len(row) < 2:
logger.error(
'Something went wrong in the singleCleaner thread:'
' a query did not return the requested fields. %r',
row
)
self.stop.wait(3)
break
2015-03-09 07:35:32 +01:00
toAddress, ackData, status = row
if status == 'awaitingpubkey':
2015-03-09 07:35:32 +01:00
resendPubkeyRequest(toAddress)
elif status == 'msgsent':
resendMsg(ackData)
# cleanup old nodes
now = int(time.time())
with knownnodes.knownNodesLock:
for stream in knownnodes.knownNodes:
2017-10-19 08:52:44 +02:00
keys = knownnodes.knownNodes[stream].keys()
for node in keys:
try:
2017-10-19 08:52:44 +02:00
# scrap old nodes
if now - knownnodes.knownNodes[stream][node]["lastseen"] > 2419200: # 28 days
2017-10-19 08:52:44 +02:00
shared.needToWriteKnownNodesToDisk = True
del knownnodes.knownNodes[stream][node]
2017-10-19 08:52:44 +02:00
continue
# scrap old nodes with low rating
if now - knownnodes.knownNodes[stream][node]["lastseen"] > 10800 and knownnodes.knownNodes[stream][node]["rating"] <= knownnodes.knownNodesForgetRating:
shared.needToWriteKnownNodesToDisk = True
del knownnodes.knownNodes[stream][node]
continue
except TypeError:
print "Error in %s" % node
2017-10-19 08:52:44 +02:00
keys = []
# Let us write out the knowNodes to disk
# if there is anything new to write out.
if shared.needToWriteKnownNodesToDisk:
try:
knownnodes.saveKnownNodes()
except Exception as err:
if "Errno 28" in str(err):
logger.fatal(
'(while receiveDataThread'
' knownnodes.needToWriteKnownNodesToDisk)'
' Alert: Your disk or data storage volume'
' is full. '
)
queues.UISignalQueue.put((
'alert',
(tr._translate("MainWindow", "Disk full"),
tr._translate(
"MainWindow",
'Alert: Your disk or data storage volume'
' is full. Bitmessage will now exit.'),
True)
))
# FIXME redundant?
if shared.daemon or not state.enableGUI:
os._exit(0)
shared.needToWriteKnownNodesToDisk = False
2017-11-22 21:13:35 +01:00
# # clear download queues
# for thread in threading.enumerate():
# if thread.isAlive() and hasattr(thread, 'downloadQueue'):
# thread.downloadQueue.clear()
# inv/object tracking
for connection in \
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
connection.clean()
# discovery tracking
2017-08-06 21:38:23 +02:00
exp = time.time() - singleCleaner.expireDiscoveredPeers
reaper = (k for k, v in state.discoveredPeers.items() if v < exp)
for k in reaper:
try:
del state.discoveredPeers[k]
except KeyError:
pass
# TODO: cleanup pending upload / download
gc.collect()
if state.shutdown == 0:
self.stop.wait(singleCleaner.cycleLength)
2015-03-09 07:35:32 +01:00
def resendPubkeyRequest(address):
logger.debug(
'It has been a long time and we haven\'t heard a response to our'
' getpubkey request. Sending again.'
)
2013-10-03 16:29:50 +02:00
try:
# We need to take this entry out of the neededPubkeys structure
# because the queues.workerQueue checks to see whether the entry
# is already present and will not do the POW and send the message
# because it assumes that it has already done it recently.
del state.neededPubkeys[address]
2013-10-03 16:29:50 +02:00
except:
pass
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to request a public key...'))
2013-10-03 16:29:50 +02:00
sqlExecute(
2015-03-09 07:35:32 +01:00
'''UPDATE sent SET status='msgqueued' WHERE toaddress=?''',
address)
queues.workerQueue.put(('sendmessage', ''))
2013-10-03 16:29:50 +02:00
2015-03-09 07:35:32 +01:00
def resendMsg(ackdata):
logger.debug(
'It has been a long time and we haven\'t heard an acknowledgement'
' to our msg. Sending again.'
)
2013-10-03 16:29:50 +02:00
sqlExecute(
2015-03-09 07:35:32 +01:00
'''UPDATE sent SET status='msgqueued' WHERE ackdata=?''',
ackdata)
queues.workerQueue.put(('sendmessage', ''))
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to deliver a message...'
))