2017-11-22 13:49:18 +00:00
import gc
2013-06-20 21:23:03 +00:00
import threading
import shared
import time
2014-01-20 18:45:21 +00:00
import os
2013-09-09 23:26:32 +00:00
import tr #anslate
2017-02-22 08:34:54 +00:00
from bmconfigparser import BMConfigParser
2013-08-27 12:46:57 +00:00
from helper_sql import *
2015-11-24 00:55:17 +00:00
from helper_threading import *
2017-01-10 20:15:35 +00:00
from inventory import Inventory
2017-05-28 22:24:07 +00:00
from network . connectionpool import BMConnectionPool
2013-09-09 23:26:32 +00:00
from debug import logger
2017-02-08 12:41:56 +00:00
import knownnodes
import queues
2017-01-11 16:00:00 +00:00
import state
2013-06-20 21:23:03 +00:00
2014-08-27 07:14:32 +00:00
"""
The singleCleaner class is a timer - driven thread that cleans data structures
to free memory , resends messages when a remote node doesn ' t respond, and
sends pong messages to keep connections alive if the network isn ' t busy.
2013-06-20 21:23:03 +00:00
It cleans these data structures in memory :
2013-10-02 00:14:53 +00:00
inventory ( moves data to the on - disk sql database )
inventorySets ( clears then reloads data out of sql database )
2013-06-20 21:23:03 +00:00
It cleans these tables on the disk :
2014-08-27 07:14:32 +00:00
inventory ( clears expired objects )
2013-10-02 00:14:53 +00:00
pubkeys ( clears pubkeys older than 4 weeks old which we have not used personally )
2016-06-15 16:45:23 +00:00
knownNodes ( clears addresses which have not been online for over 3 days )
2013-06-20 21:23:03 +00:00
It resends messages when there has been no response :
2013-11-04 07:05:07 +00:00
resends getpubkey messages in 5 days ( then 10 days , then 20 days , etc . . . )
resends msg messages in 5 days ( then 10 days , then 20 days , etc . . . )
2013-06-20 21:23:03 +00:00
2014-08-27 07:14:32 +00:00
"""
2013-06-20 21:23:03 +00:00
2015-11-24 00:55:17 +00:00
class singleCleaner ( threading . Thread , StoppableThread ) :
2017-05-28 22:24:07 +00:00
cycleLength = 300
2017-08-06 19:29:54 +00:00
expireDiscoveredPeers = 300
2013-06-20 21:23:03 +00:00
def __init__ ( self ) :
2015-11-18 15:22:17 +00:00
threading . Thread . __init__ ( self , name = " singleCleaner " )
2015-11-24 00:55:17 +00:00
self . initStop ( )
2013-06-20 21:23:03 +00:00
def run ( self ) :
2017-11-22 13:49:18 +00:00
gc . disable ( )
2013-06-20 21:23:03 +00:00
timeWeLastClearedInventoryAndPubkeysTables = 0
2013-11-04 07:05:07 +00:00
try :
2017-01-11 13:27:19 +00:00
shared . maximumLengthOfTimeToBotherResendingMessages = ( float ( BMConfigParser ( ) . get ( ' bitmessagesettings ' , ' stopresendingafterxdays ' ) ) * 24 * 60 * 60 ) + ( float ( BMConfigParser ( ) . get ( ' bitmessagesettings ' , ' stopresendingafterxmonths ' ) ) * ( 60 * 60 * 24 * 365 ) / 12 )
2013-11-04 07:05:07 +00:00
except :
2013-11-06 04:22:51 +00:00
# Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file.
2013-11-04 07:05:07 +00:00
shared . maximumLengthOfTimeToBotherResendingMessages = float ( ' inf ' )
2013-06-20 21:23:03 +00:00
2017-05-27 17:01:14 +00:00
# initial wait
if state . shutdown == 0 :
2017-05-28 22:24:07 +00:00
self . stop . wait ( singleCleaner . cycleLength )
2017-05-27 17:01:14 +00:00
2017-01-14 22:20:15 +00:00
while state . shutdown == 0 :
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( (
2013-06-20 21:23:03 +00:00
' updateStatusBar ' , ' Doing housekeeping (Flushing inventory in memory to disk...) ' ) )
2017-01-10 20:15:35 +00:00
Inventory ( ) . flush ( )
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' updateStatusBar ' , ' ' ) )
2014-08-27 07:14:32 +00:00
2013-06-20 21:23:03 +00:00
# If we are running as a daemon then we are going to fill up the UI
# queue which will never be handled by a UI. We should clear it to
# save memory.
2018-04-09 04:38:48 +00:00
if shared . thisapp . daemon or not state . enableGUI : # FIXME redundant?
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . queue . clear ( )
2013-06-20 21:23:03 +00:00
if timeWeLastClearedInventoryAndPubkeysTables < int ( time . time ( ) ) - 7380 :
timeWeLastClearedInventoryAndPubkeysTables = int ( time . time ( ) )
2017-01-10 20:15:35 +00:00
Inventory ( ) . clean ( )
2013-06-20 21:23:03 +00:00
# pubkeys
2013-08-27 12:46:57 +00:00
sqlExecute (
''' DELETE FROM pubkeys WHERE time<? AND usedpersonally= ' no ' ''' ,
int ( time . time ( ) ) - shared . lengthOfTimeToHoldOnToAllPubkeys )
2013-06-20 21:23:03 +00:00
2015-03-09 06:35:32 +00:00
# Let us resend getpubkey objects if we have not yet heard a pubkey, and also msg objects if we have not yet heard an acknowledgement
2013-08-27 12:46:57 +00:00
queryreturn = sqlQuery (
2015-03-09 06:35:32 +00:00
''' select toaddress, ackdata, status FROM sent WHERE ((status= ' awaitingpubkey ' OR status= ' msgsent ' ) AND folder= ' sent ' AND sleeptill<? AND senttime>?) ''' ,
int ( time . time ( ) ) ,
int ( time . time ( ) ) - shared . maximumLengthOfTimeToBotherResendingMessages )
2013-06-20 21:23:03 +00:00
for row in queryreturn :
2015-03-09 06:35:32 +00:00
if len ( row ) < 2 :
2015-11-18 15:22:17 +00:00
logger . error ( ' Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr ( row ) )
2015-11-24 00:55:17 +00:00
self . stop . wait ( 3 )
2013-06-20 21:23:03 +00:00
break
2015-03-09 06:35:32 +00:00
toAddress , ackData , status = row
2013-06-20 21:23:03 +00:00
if status == ' awaitingpubkey ' :
2015-03-09 06:35:32 +00:00
resendPubkeyRequest ( toAddress )
elif status == ' msgsent ' :
resendMsg ( ackData )
2013-11-06 04:22:51 +00:00
2016-06-15 16:45:23 +00:00
# cleanup old nodes
now = int ( time . time ( ) )
2017-02-09 10:53:33 +00:00
with knownnodes . knownNodesLock :
for stream in knownnodes . knownNodes :
2017-10-19 06:52:44 +00:00
keys = knownnodes . knownNodes [ stream ] . keys ( )
for node in keys :
2017-07-05 07:17:01 +00:00
try :
2017-10-19 06:52:44 +00:00
# scrap old nodes
2017-07-05 07:17:01 +00:00
if now - knownnodes . knownNodes [ stream ] [ node ] [ " lastseen " ] > 2419200 : # 28 days
2017-10-19 06:52:44 +00:00
shared . needToWriteKnownNodesToDisk = True
2017-07-05 07:17:01 +00:00
del knownnodes . knownNodes [ stream ] [ node ]
2017-10-19 06:52:44 +00:00
continue
# scrap old nodes with low rating
if now - knownnodes . knownNodes [ stream ] [ node ] [ " lastseen " ] > 10800 and knownnodes . knownNodes [ stream ] [ node ] [ " rating " ] < = knownnodes . knownNodesForgetRating :
shared . needToWriteKnownNodesToDisk = True
del knownnodes . knownNodes [ stream ] [ node ]
continue
2017-07-05 07:17:01 +00:00
except TypeError :
print " Error in %s " % ( str ( node ) )
2017-10-19 06:52:44 +00:00
keys = [ ]
2016-06-15 16:45:23 +00:00
2013-09-09 23:26:32 +00:00
# Let us write out the knowNodes to disk if there is anything new to write out.
if shared . needToWriteKnownNodesToDisk :
try :
2017-02-09 10:53:33 +00:00
knownnodes . saveKnownNodes ( )
2013-09-09 23:26:32 +00:00
except Exception as err :
if " Errno 28 " in str ( err ) :
2017-02-08 12:41:56 +00:00
logger . fatal ( ' (while receiveDataThread knownnodes.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ' )
queues . UISignalQueue . put ( ( ' alert ' , ( tr . _translate ( " MainWindow " , " Disk full " ) , tr . _translate ( " MainWindow " , ' Alert: Your disk or data storage volume is full. Bitmessage will now exit. ' ) , True ) ) )
2018-04-09 04:38:48 +00:00
if shared . thisapp . daemon or not state . enableGUI : # FIXME redundant?
2013-09-09 23:26:32 +00:00
os . _exit ( 0 )
shared . needToWriteKnownNodesToDisk = False
2017-02-06 10:02:03 +00:00
2017-11-22 20:13:35 +00:00
# # clear download queues
# for thread in threading.enumerate():
# if thread.isAlive() and hasattr(thread, 'downloadQueue'):
# thread.downloadQueue.clear()
2017-03-20 00:22:37 +00:00
2017-06-02 05:09:35 +00:00
# inv/object tracking
for connection in BMConnectionPool ( ) . inboundConnections . values ( ) + BMConnectionPool ( ) . outboundConnections . values ( ) :
connection . clean ( )
2017-08-06 19:29:54 +00:00
# discovery tracking
2017-08-06 19:38:23 +00:00
exp = time . time ( ) - singleCleaner . expireDiscoveredPeers
2017-08-06 19:29:54 +00:00
reaper = ( k for k , v in state . discoveredPeers . items ( ) if v < exp )
for k in reaper :
try :
del state . discoveredPeers [ k ]
except KeyError :
pass
2017-02-06 10:02:03 +00:00
# TODO: cleanup pending upload / download
2017-11-22 13:49:18 +00:00
gc . collect ( )
2017-02-02 14:52:32 +00:00
if state . shutdown == 0 :
2017-05-28 22:24:07 +00:00
self . stop . wait ( singleCleaner . cycleLength )
2013-11-04 07:05:07 +00:00
2015-03-09 06:35:32 +00:00
def resendPubkeyRequest ( address ) :
logger . debug ( ' It has been a long time and we haven \' t heard a response to our getpubkey request. Sending again. ' )
2013-10-03 14:29:50 +00:00
try :
2017-01-11 16:00:00 +00:00
del state . neededPubkeys [
2017-02-08 12:41:56 +00:00
address ] # We need to take this entry out of the neededPubkeys structure because the queues.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently.
2013-10-03 14:29:50 +00:00
except :
pass
2013-11-06 04:22:51 +00:00
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( (
2013-10-03 14:29:50 +00:00
' updateStatusBar ' , ' Doing work necessary to again attempt to request a public key... ' ) )
sqlExecute (
2015-03-09 06:35:32 +00:00
''' UPDATE sent SET status= ' msgqueued ' WHERE toaddress=? ''' ,
address )
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( ( ' sendmessage ' , ' ' ) )
2013-10-03 14:29:50 +00:00
2015-03-09 06:35:32 +00:00
def resendMsg ( ackdata ) :
logger . debug ( ' It has been a long time and we haven \' t heard an acknowledgement to our msg. Sending again. ' )
2013-10-03 14:29:50 +00:00
sqlExecute (
2015-03-09 06:35:32 +00:00
''' UPDATE sent SET status= ' msgqueued ' WHERE ackdata=? ''' ,
ackdata )
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( ( ' sendmessage ' , ' ' ) )
queues . UISignalQueue . put ( (
2013-10-03 14:29:50 +00:00
' updateStatusBar ' , ' Doing work necessary to again attempt to deliver a message... ' ) )