PyBitmessage/src/class_singleCleaner.py

194 lines
7.6 KiB
Python
Raw Normal View History

2014-08-27 07:14:32 +00:00
"""
The `singleCleaner` class is a timer-driven thread that cleans data structures
2018-05-02 15:29:55 +00:00
to free memory, resends messages when a remote node doesn't respond, and
2014-08-27 07:14:32 +00:00
sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory:
- inventory (moves data to the on-disk sql database)
- inventorySets (clears then reloads data out of sql database)
It cleans these tables on the disk:
- inventory (clears expired objects)
- pubkeys (clears pubkeys older than 4 weeks old which we have not used
personally)
- knownNodes (clears addresses which have not been online for over 3 days)
It resends messages when there has been no response:
- resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...)
- resends msg messages in 5 days (then 10 days, then 20 days, etc...)
2014-08-27 07:14:32 +00:00
"""
2018-05-02 15:29:55 +00:00
import gc
import os
2018-05-02 15:29:55 +00:00
import time
import knownnodes
import queues
2019-11-04 14:47:36 +00:00
import shared
import state
import tr
2018-05-02 15:29:55 +00:00
from bmconfigparser import BMConfigParser
from helper_sql import sqlQuery, sqlExecute
2018-05-02 15:29:55 +00:00
from inventory import Inventory
from network import BMConnectionPool, StoppableThread
2018-05-02 15:29:55 +00:00
class singleCleaner(StoppableThread):
2019-11-04 14:47:36 +00:00
"""The singleCleaner thread class"""
name = "singleCleaner"
cycleLength = 300
expireDiscoveredPeers = 300
2019-11-04 14:47:36 +00:00
def run(self): # pylint: disable=too-many-branches
gc.disable()
timeWeLastClearedInventoryAndPubkeysTables = 0
try:
shared.maximumLengthOfTimeToBotherResendingMessages = (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxdays'))
* 24 * 60 * 60
) + (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxmonths'))
* (60 * 60 * 24 * 365) / 12)
except:
# Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing
# from the config file.
shared.maximumLengthOfTimeToBotherResendingMessages = float('inf')
2017-05-27 17:01:14 +00:00
# initial wait
if state.shutdown == 0:
self.stop.wait(singleCleaner.cycleLength)
2017-05-27 17:01:14 +00:00
while state.shutdown == 0:
queues.UISignalQueue.put((
'updateStatusBar',
'Doing housekeeping (Flushing inventory in memory to disk...)'
))
Inventory().flush()
queues.UISignalQueue.put(('updateStatusBar', ''))
2018-05-02 15:29:55 +00:00
# If we are running as a daemon then we are going to fill up the UI
# queue which will never be handled by a UI. We should clear it to
# save memory.
# FIXME redundant?
if shared.thisapp.daemon or not state.enableGUI:
queues.UISignalQueue.queue.clear()
if timeWeLastClearedInventoryAndPubkeysTables < \
int(time.time()) - 7380:
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
Inventory().clean()
queues.workerQueue.put(('sendOnionPeerObj', ''))
# pubkeys
2013-08-27 12:46:57 +00:00
sqlExecute(
"DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'",
2013-08-27 12:46:57 +00:00
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
# Let us resend getpubkey objects if we have not yet heard
# a pubkey, and also msg objects if we have not yet heard
# an acknowledgement
2013-08-27 12:46:57 +00:00
queryreturn = sqlQuery(
"SELECT toaddress, ackdata, status FROM sent"
" WHERE ((status='awaitingpubkey' OR status='msgsent')"
" AND folder='sent' AND sleeptill<? AND senttime>?)",
int(time.time()), int(time.time())
- shared.maximumLengthOfTimeToBotherResendingMessages
)
for row in queryreturn:
2015-03-09 06:35:32 +00:00
if len(row) < 2:
self.logger.error(
'Something went wrong in the singleCleaner thread:'
' a query did not return the requested fields. %r',
row
)
self.stop.wait(3)
break
2015-03-09 06:35:32 +00:00
toAddress, ackData, status = row
if status == 'awaitingpubkey':
self.resendPubkeyRequest(toAddress)
2015-03-09 06:35:32 +00:00
elif status == 'msgsent':
self.resendMsg(ackData)
try:
# Cleanup knownnodes and handle possible severe exception
# while writing it to disk
knownnodes.cleanupKnownNodes()
except Exception as err:
2019-11-04 14:47:36 +00:00
# pylint: disable=protected-access
if "Errno 28" in str(err):
self.logger.fatal(
'(while writing knownnodes to disk)'
' Alert: Your disk or data storage volume is full.'
)
queues.UISignalQueue.put((
'alert',
(tr._translate("MainWindow", "Disk full"),
tr._translate(
"MainWindow",
'Alert: Your disk or data storage volume'
' is full. Bitmessage will now exit.'),
2019-11-04 14:47:36 +00:00
True)
))
2019-11-04 14:47:36 +00:00
if shared.thisapp.daemon or not state.enableGUI:
os._exit(1)
# inv/object tracking
for connection in BMConnectionPool().connections():
connection.clean()
# discovery tracking
2017-08-06 19:38:23 +00:00
exp = time.time() - singleCleaner.expireDiscoveredPeers
reaper = (k for k, v in state.discoveredPeers.items() if v < exp)
for k in reaper:
try:
del state.discoveredPeers[k]
except KeyError:
pass
2019-11-04 14:47:36 +00:00
# ..todo:: cleanup pending upload / download
gc.collect()
if state.shutdown == 0:
self.stop.wait(singleCleaner.cycleLength)
def resendPubkeyRequest(self, address):
"""Resend pubkey request for address"""
self.logger.debug(
'It has been a long time and we haven\'t heard a response to our'
' getpubkey request. Sending again.'
)
try:
# We need to take this entry out of the neededPubkeys structure
# because the queues.workerQueue checks to see whether the entry
# is already present and will not do the POW and send the message
# because it assumes that it has already done it recently.
del state.neededPubkeys[address]
except:
pass
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to request a public key...'
))
sqlExecute(
'''UPDATE sent SET status='msgqueued' WHERE toaddress=?''',
address)
queues.workerQueue.put(('sendmessage', ''))
def resendMsg(self, ackdata):
"""Resend message by ackdata"""
self.logger.debug(
'It has been a long time and we haven\'t heard an acknowledgement'
' to our msg. Sending again.'
)
sqlExecute(
'''UPDATE sent SET status='msgqueued' WHERE ackdata=?''',
ackdata)
queues.workerQueue.put(('sendmessage', ''))
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to deliver a message...'
))