From ebc62b9edc1d0c0ea9eba3b8d24f442bce9d82f9 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 20 Jun 2013 22:23:03 +0100 Subject: [PATCH] Moving certain classes outside of bitmessagemain.py --- src/bitmessagemain.py | 444 +----------------------------------- src/class_singleCleaner.py | 122 ++++++++++ src/class_singleListener.py | 76 ++++++ src/class_sqlThread.py | 255 +++++++++++++++++++++ 4 files changed, 458 insertions(+), 439 deletions(-) create mode 100644 src/class_singleCleaner.py create mode 100644 src/class_singleListener.py create mode 100644 src/class_sqlThread.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 798841ff..59479bb9 100644 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -48,6 +48,11 @@ from subprocess import call # used when the API must execute an outside program import singleton import proofofwork +# Classes +from class_singleListener import * +from class_sqlThread import * +from class_singleCleaner import * + # For each stream to which we connect, several outgoingSynSender threads # will exist and will collectively create 8 connections with peers. @@ -205,78 +210,6 @@ class outgoingSynSender(threading.Thread): 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: %s\n' % err) time.sleep(0.1) -# Only one singleListener thread will ever exist. It creates the -# receiveDataThread and sendDataThread for each incoming connection. Note -# that it cannot set the stream number because it is not known yet- the -# other node will have to tell us its stream number in a version message. -# If we don't care about their stream, we will close the connection -# (within the recversion function of the recieveData thread) - - -class singleListener(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - - def run(self): - # We don't want to accept incoming connections if the user is using a - # SOCKS proxy. If they eventually select proxy 'none' then this will - # start listening for connections. - while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': - time.sleep(300) - - shared.printLock.acquire() - print 'Listening for incoming connections.' - shared.printLock.release() - HOST = '' # Symbolic name meaning all available interfaces - PORT = shared.config.getint('bitmessagesettings', 'port') - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # This option apparently avoids the TIME_WAIT state so that we can - # rebind faster - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((HOST, PORT)) - sock.listen(2) - - while True: - # We don't want to accept incoming connections if the user is using - # a SOCKS proxy. If the user eventually select proxy 'none' then - # this will start listening for connections. - while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': - time.sleep(10) - while len(shared.connectedHostsList) > 220: - shared.printLock.acquire() - print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.' - shared.printLock.release() - time.sleep(10) - a, (HOST, PORT) = sock.accept() - - # The following code will, unfortunately, block an incoming - # connection if someone else on the same LAN is already connected - # because the two computers will share the same external IP. This - # is here to prevent connection flooding. - while HOST in shared.connectedHostsList: - shared.printLock.acquire() - print 'We are already connected to', HOST + '. Ignoring connection.' - shared.printLock.release() - a.close() - a, (HOST, PORT) = sock.accept() - objectsOfWhichThisRemoteNodeIsAlreadyAware = {} - a.settimeout(20) - - sd = sendDataThread() - sd.setup( - a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) - sd.start() - - rd = receiveDataThread() - rd.daemon = True # close the main program even if there are threads left - rd.setup( - a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) - rd.start() - - shared.printLock.acquire() - print self, 'connected to', HOST, 'during INCOMING request.' - shared.printLock.release() # This thread is created either by the synSenderThread(for outgoing # connections) or the singleListenerThread(for incoming connectiosn). @@ -2651,375 +2584,8 @@ def isHostInPrivateIPRange(host): return True return False -# This thread exists because SQLITE3 is so un-threadsafe that we must -# submit queries to it and it puts results back in a different queue. They -# won't let us just use locks. -class sqlThread(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - - def run(self): - self.conn = sqlite3.connect(shared.appdata + 'messages.dat') - self.conn.text_factory = str - self.cur = self.conn.cursor() - try: - self.cur.execute( - '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) - self.cur.execute( - '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text, ackdata blob, lastactiontime integer, status text, pubkeyretrynumber integer, msgretrynumber integer, folder text, encodingtype int)''' ) - self.cur.execute( - '''CREATE TABLE subscriptions (label text, address text, enabled bool)''' ) - self.cur.execute( - '''CREATE TABLE addressbook (label text, address text)''' ) - self.cur.execute( - '''CREATE TABLE blacklist (label text, address text, enabled bool)''' ) - self.cur.execute( - '''CREATE TABLE whitelist (label text, address text, enabled bool)''' ) - # Explanation of what is in the pubkeys table: - # The hash is the RIPEMD160 hash that is encoded in the Bitmessage address. - # transmitdata is literally the data that was included in the Bitmessage pubkey message when it arrived, except for the 24 byte protocol header- ie, it starts with the POW nonce. - # time is the time that the pubkey was broadcast on the network same as with every other type of Bitmessage object. - # usedpersonally is set to "yes" if we have used the key - # personally. This keeps us from deleting it because we may want to - # reply to a message in the future. This field is not a bool - # because we may need more flexability in the future and it doesn't - # take up much more space anyway. - self.cur.execute( - '''CREATE TABLE pubkeys (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''' ) - self.cur.execute( - '''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE)''' ) - self.cur.execute( - '''CREATE TABLE knownnodes (timelastseen int, stream int, services blob, host blob, port blob, UNIQUE(host, stream, port) ON CONFLICT REPLACE)''' ) - # This table isn't used in the program yet but I - # have a feeling that we'll need it. - self.cur.execute( - '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') - self.cur.execute( - '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) - self.cur.execute( '''INSERT INTO settings VALUES('version','1')''') - self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( - int(time.time()),)) - self.conn.commit() - print 'Created messages database file' - except Exception as err: - if str(err) == 'table inbox already exists': - shared.printLock.acquire() - print 'Database file already exists.' - shared.printLock.release() - else: - sys.stderr.write( - 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) - os._exit(0) - - # People running earlier versions of PyBitmessage do not have the - # usedpersonally field in their pubkeys table. Let's add it. - if shared.config.getint('bitmessagesettings', 'settingsversion') == 2: - item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' ''' - parameters = '' - self.cur.execute(item, parameters) - self.conn.commit() - - shared.config.set('bitmessagesettings', 'settingsversion', '3') - with open(shared.appdata + 'keys.dat', 'wb') as configfile: - shared.config.write(configfile) - - # People running earlier versions of PyBitmessage do not have the - # encodingtype field in their inbox and sent tables or the read field - # in the inbox table. Let's add them. - if shared.config.getint('bitmessagesettings', 'settingsversion') == 3: - item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' ''' - parameters = '' - self.cur.execute(item, parameters) - - item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' ''' - parameters = '' - self.cur.execute(item, parameters) - - item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' ''' - parameters = '' - self.cur.execute(item, parameters) - self.conn.commit() - - shared.config.set('bitmessagesettings', 'settingsversion', '4') - with open(shared.appdata + 'keys.dat', 'wb') as configfile: - shared.config.write(configfile) - - if shared.config.getint('bitmessagesettings', 'settingsversion') == 4: - shared.config.set('bitmessagesettings', 'defaultnoncetrialsperbyte', str( - shared.networkDefaultProofOfWorkNonceTrialsPerByte)) - shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes', str( - shared.networkDefaultPayloadLengthExtraBytes)) - shared.config.set('bitmessagesettings', 'settingsversion', '5') - - if shared.config.getint('bitmessagesettings', 'settingsversion') == 5: - shared.config.set( - 'bitmessagesettings', 'maxacceptablenoncetrialsperbyte', '0') - shared.config.set( - 'bitmessagesettings', 'maxacceptablepayloadlengthextrabytes', '0') - shared.config.set('bitmessagesettings', 'settingsversion', '6') - with open(shared.appdata + 'keys.dat', 'wb') as configfile: - shared.config.write(configfile) - - # From now on, let us keep a 'version' embedded in the messages.dat - # file so that when we make changes to the database, the database - # version we are on can stay embedded in the messages.dat file. Let us - # check to see if the settings table exists yet. - item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';''' - parameters = '' - self.cur.execute(item, parameters) - if self.cur.fetchall() == []: - # The settings table doesn't exist. We need to make it. - print 'In messages.dat database, creating new \'settings\' table.' - self.cur.execute( - '''CREATE TABLE settings (key text, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) - self.cur.execute( '''INSERT INTO settings VALUES('version','1')''') - self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( - int(time.time()),)) - print 'In messages.dat database, removing an obsolete field from the pubkeys table.' - self.cur.execute( - '''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''') - self.cur.execute( - '''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''') - self.cur.execute( '''DROP TABLE pubkeys''') - self.cur.execute( - '''CREATE TABLE pubkeys (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''' ) - self.cur.execute( - '''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''') - self.cur.execute( '''DROP TABLE pubkeys_backup;''') - print 'Deleting all pubkeys from inventory. They will be redownloaded and then saved with the correct times.' - self.cur.execute( - '''delete from inventory where objecttype = 'pubkey';''') - print 'replacing Bitmessage announcements mailing list with a new one.' - self.cur.execute( - '''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''') - self.cur.execute( - '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') - print 'Commiting.' - self.conn.commit() - print 'Vacuuming message.dat. You might notice that the file size gets much smaller.' - self.cur.execute( ''' VACUUM ''') - - # After code refactoring, the possible status values for sent messages - # as changed. - self.cur.execute( - '''update sent set status='doingmsgpow' where status='doingpow' ''') - self.cur.execute( - '''update sent set status='msgsent' where status='sentmessage' ''') - self.cur.execute( - '''update sent set status='doingpubkeypow' where status='findingpubkey' ''') - self.cur.execute( - '''update sent set status='broadcastqueued' where status='broadcastpending' ''') - self.conn.commit() - - try: - testpayload = '\x00\x00' - t = ('1234', testpayload, '12345678', 'no') - self.cur.execute( '''INSERT INTO pubkeys VALUES(?,?,?,?)''', t) - self.conn.commit() - self.cur.execute( - '''SELECT transmitdata FROM pubkeys WHERE hash='1234' ''') - queryreturn = self.cur.fetchall() - for row in queryreturn: - transmitdata, = row - self.cur.execute('''DELETE FROM pubkeys WHERE hash='1234' ''') - self.conn.commit() - if transmitdata == '': - sys.stderr.write('Problem: The version of SQLite you have cannot store Null values. Please download and install the latest revision of your version of Python (for example, the latest Python 2.7 revision) and try again.\n') - sys.stderr.write('PyBitmessage will now exit very abruptly. You may now see threading errors related to this abrupt exit but the problem you need to solve is related to SQLite.\n\n') - os._exit(0) - except Exception as err: - print err - - # Let us check to see the last time we vaccumed the messages.dat file. - # If it has been more than a month let's do it now. - item = '''SELECT value FROM settings WHERE key='lastvacuumtime';''' - parameters = '' - self.cur.execute(item, parameters) - queryreturn = self.cur.fetchall() - for row in queryreturn: - value, = row - if int(value) < int(time.time()) - 2592000: - print 'It has been a long time since the messages.dat file has been vacuumed. Vacuuming now...' - self.cur.execute( ''' VACUUM ''') - item = '''update settings set value=? WHERE key='lastvacuumtime';''' - parameters = (int(time.time()),) - self.cur.execute(item, parameters) - - while True: - item = shared.sqlSubmitQueue.get() - if item == 'commit': - self.conn.commit() - elif item == 'exit': - self.conn.close() - shared.printLock.acquire() - print 'sqlThread exiting gracefully.' - shared.printLock.release() - return - elif item == 'movemessagstoprog': - shared.printLock.acquire() - print 'the sqlThread is moving the messages.dat file to the local program directory.' - shared.printLock.release() - self.conn.commit() - self.conn.close() - shutil.move( - shared.lookupAppdataFolder() + 'messages.dat', 'messages.dat') - self.conn = sqlite3.connect('messages.dat') - self.conn.text_factory = str - self.cur = self.conn.cursor() - elif item == 'movemessagstoappdata': - shared.printLock.acquire() - print 'the sqlThread is moving the messages.dat file to the Appdata folder.' - shared.printLock.release() - self.conn.commit() - self.conn.close() - shutil.move( - 'messages.dat', shared.lookupAppdataFolder() + 'messages.dat') - self.conn = sqlite3.connect(shared.appdata + 'messages.dat') - self.conn.text_factory = str - self.cur = self.conn.cursor() - elif item == 'deleteandvacuume': - self.cur.execute('''delete from inbox where folder='trash' ''') - self.cur.execute('''delete from sent where folder='trash' ''') - self.conn.commit() - self.cur.execute( ''' VACUUM ''') - else: - parameters = shared.sqlSubmitQueue.get() - # print 'item', item - # print 'parameters', parameters - try: - self.cur.execute(item, parameters) - except Exception as err: - shared.printLock.acquire() - sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str( - item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n') - sys.stderr.write('This program shall now abruptly exit!\n') - shared.printLock.release() - os._exit(0) - - shared.sqlReturnQueue.put(self.cur.fetchall()) - # shared.sqlSubmitQueue.task_done() - - -'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. -It cleans these data structures in memory: - inventory (moves data to the on-disk sql database) - -It cleans these tables on the disk: - inventory (clears data more than 2 days and 12 hours old) - pubkeys (clears pubkeys older than 4 weeks old which we have not used personally) - -It resends messages when there has been no response: - resends getpubkey messages in 4 days (then 8 days, then 16 days, etc...) - resends msg messages in 4 days (then 8 days, then 16 days, etc...) - -''' - - -class singleCleaner(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - - def run(self): - timeWeLastClearedInventoryAndPubkeysTables = 0 - - while True: - shared.sqlLock.acquire() - shared.UISignalQueue.put(( - 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time()) - 3600 > receivedTime: - t = (hash, objectType, streamNumber, payload, receivedTime) - shared.sqlSubmitQueue.put( - '''INSERT INTO inventory VALUES (?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - del shared.inventory[hash] - shared.sqlSubmitQueue.put('commit') - shared.UISignalQueue.put(('updateStatusBar', '')) - shared.sqlLock.release() - shared.broadcastToSendDataQueues(( - 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. - # If we are running as a daemon then we are going to fill up the UI - # queue which will never be handled by a UI. We should clear it to - # save memory. - if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): - shared.UISignalQueue.queue.clear() - if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: - timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) - # inventory (moves data from the inventory data structure to - # the on-disk sql database) - shared.sqlLock.acquire() - # inventory (clears pubkeys after 28 days and everything else - # after 2 days and 12 hours) - t = (int(time.time()) - lengthOfTimeToLeaveObjectsInInventory, int( - time.time()) - lengthOfTimeToHoldOnToAllPubkeys) - shared.sqlSubmitQueue.put( - '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))): - print 'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.' - try: - del neededPubkeys[ - toripe] # We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently. - except: - pass - - shared.UISignalQueue.put(( - 'updateStatusBar', 'Doing work necessary to again attempt to request a public key...')) - t = (int( - time.time()), pubkeyretrynumber + 1, toripe) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.workerQueue.put(('sendmessage', '')) - else: # status == msgsent - if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): - print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' - t = (int( - time.time()), msgretrynumber + 1, 'msgqueued', ackdata) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.workerQueue.put(('sendmessage', '')) - shared.UISignalQueue.put(( - 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - time.sleep(300) # This thread, of which there is only one, does the heavy lifting: # calculating POWs. diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py new file mode 100644 index 00000000..999cda26 --- /dev/null +++ b/src/class_singleCleaner.py @@ -0,0 +1,122 @@ +import threading +import shared +import time +from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers + +'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. +It cleans these data structures in memory: + inventory (moves data to the on-disk sql database) + +It cleans these tables on the disk: + inventory (clears data more than 2 days and 12 hours old) + pubkeys (clears pubkeys older than 4 weeks old which we have not used personally) + +It resends messages when there has been no response: + resends getpubkey messages in 4 days (then 8 days, then 16 days, etc...) + resends msg messages in 4 days (then 8 days, then 16 days, etc...) + +''' + + +class singleCleaner(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + + def run(self): + timeWeLastClearedInventoryAndPubkeysTables = 0 + + while True: + shared.sqlLock.acquire() + shared.UISignalQueue.put(( + 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if int(time.time()) - 3600 > receivedTime: + t = (hash, objectType, streamNumber, payload, receivedTime) + shared.sqlSubmitQueue.put( + '''INSERT INTO inventory VALUES (?,?,?,?,?)''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + del shared.inventory[hash] + shared.sqlSubmitQueue.put('commit') + shared.UISignalQueue.put(('updateStatusBar', '')) + shared.sqlLock.release() + shared.broadcastToSendDataQueues(( + 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. + # If we are running as a daemon then we are going to fill up the UI + # queue which will never be handled by a UI. We should clear it to + # save memory. + if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): + shared.UISignalQueue.queue.clear() + if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: + timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) + # inventory (moves data from the inventory data structure to + # the on-disk sql database) + shared.sqlLock.acquire() + # inventory (clears pubkeys after 28 days and everything else + # after 2 days and 12 hours) + t = (int(time.time()) - lengthOfTimeToLeaveObjectsInInventory, int( + time.time()) - lengthOfTimeToHoldOnToAllPubkeys) + shared.sqlSubmitQueue.put( + '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))): + print 'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.' + try: + del neededPubkeys[ + toripe] # We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently. + except: + pass + + shared.UISignalQueue.put(( + 'updateStatusBar', 'Doing work necessary to again attempt to request a public key...')) + t = (int( + time.time()), pubkeyretrynumber + 1, toripe) + shared.sqlSubmitQueue.put( + '''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.workerQueue.put(('sendmessage', '')) + else: # status == msgsent + if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): + print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' + t = (int( + time.time()), msgretrynumber + 1, 'msgqueued', ackdata) + shared.sqlSubmitQueue.put( + '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.workerQueue.put(('sendmessage', '')) + shared.UISignalQueue.put(( + 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) + shared.sqlSubmitQueue.put('commit') + shared.sqlLock.release() + time.sleep(300) diff --git a/src/class_singleListener.py b/src/class_singleListener.py new file mode 100644 index 00000000..9e982a24 --- /dev/null +++ b/src/class_singleListener.py @@ -0,0 +1,76 @@ +import threading +import shared +import socket + +# Only one singleListener thread will ever exist. It creates the +# receiveDataThread and sendDataThread for each incoming connection. Note +# that it cannot set the stream number because it is not known yet- the +# other node will have to tell us its stream number in a version message. +# If we don't care about their stream, we will close the connection +# (within the recversion function of the recieveData thread) + + +class singleListener(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + + def run(self): + # We don't want to accept incoming connections if the user is using a + # SOCKS proxy. If they eventually select proxy 'none' then this will + # start listening for connections. + while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': + time.sleep(300) + + shared.printLock.acquire() + print 'Listening for incoming connections.' + shared.printLock.release() + HOST = '' # Symbolic name meaning all available interfaces + PORT = shared.config.getint('bitmessagesettings', 'port') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # This option apparently avoids the TIME_WAIT state so that we can + # rebind faster + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((HOST, PORT)) + sock.listen(2) + + while True: + # We don't want to accept incoming connections if the user is using + # a SOCKS proxy. If the user eventually select proxy 'none' then + # this will start listening for connections. + while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': + time.sleep(10) + while len(shared.connectedHostsList) > 220: + shared.printLock.acquire() + print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.' + shared.printLock.release() + time.sleep(10) + a, (HOST, PORT) = sock.accept() + + # The following code will, unfortunately, block an incoming + # connection if someone else on the same LAN is already connected + # because the two computers will share the same external IP. This + # is here to prevent connection flooding. + while HOST in shared.connectedHostsList: + shared.printLock.acquire() + print 'We are already connected to', HOST + '. Ignoring connection.' + shared.printLock.release() + a.close() + a, (HOST, PORT) = sock.accept() + objectsOfWhichThisRemoteNodeIsAlreadyAware = {} + a.settimeout(20) + + sd = sendDataThread() + sd.setup( + a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) + sd.start() + + rd = receiveDataThread() + rd.daemon = True # close the main program even if there are threads left + rd.setup( + a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) + rd.start() + + shared.printLock.acquire() + print self, 'connected to', HOST, 'during INCOMING request.' + shared.printLock.release() diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py new file mode 100644 index 00000000..e483284f --- /dev/null +++ b/src/class_sqlThread.py @@ -0,0 +1,255 @@ +import threading +import shared +import sqlite3 +import time + +# This thread exists because SQLITE3 is so un-threadsafe that we must +# submit queries to it and it puts results back in a different queue. They +# won't let us just use locks. + + +class sqlThread(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + + def run(self): + self.conn = sqlite3.connect(shared.appdata + 'messages.dat') + self.conn.text_factory = str + self.cur = self.conn.cursor() + try: + self.cur.execute( + '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) + self.cur.execute( + '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text, ackdata blob, lastactiontime integer, status text, pubkeyretrynumber integer, msgretrynumber integer, folder text, encodingtype int)''' ) + self.cur.execute( + '''CREATE TABLE subscriptions (label text, address text, enabled bool)''' ) + self.cur.execute( + '''CREATE TABLE addressbook (label text, address text)''' ) + self.cur.execute( + '''CREATE TABLE blacklist (label text, address text, enabled bool)''' ) + self.cur.execute( + '''CREATE TABLE whitelist (label text, address text, enabled bool)''' ) + # Explanation of what is in the pubkeys table: + # The hash is the RIPEMD160 hash that is encoded in the Bitmessage address. + # transmitdata is literally the data that was included in the Bitmessage pubkey message when it arrived, except for the 24 byte protocol header- ie, it starts with the POW nonce. + # time is the time that the pubkey was broadcast on the network same as with every other type of Bitmessage object. + # usedpersonally is set to "yes" if we have used the key + # personally. This keeps us from deleting it because we may want to + # reply to a message in the future. This field is not a bool + # because we may need more flexability in the future and it doesn't + # take up much more space anyway. + self.cur.execute( + '''CREATE TABLE pubkeys (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''' ) + self.cur.execute( + '''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE)''' ) + self.cur.execute( + '''CREATE TABLE knownnodes (timelastseen int, stream int, services blob, host blob, port blob, UNIQUE(host, stream, port) ON CONFLICT REPLACE)''' ) + # This table isn't used in the program yet but I + # have a feeling that we'll need it. + self.cur.execute( + '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') + self.cur.execute( + '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) + self.cur.execute( '''INSERT INTO settings VALUES('version','1')''') + self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( + int(time.time()),)) + self.conn.commit() + print 'Created messages database file' + except Exception as err: + if str(err) == 'table inbox already exists': + shared.printLock.acquire() + print 'Database file already exists.' + shared.printLock.release() + else: + sys.stderr.write( + 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) + os._exit(0) + + # People running earlier versions of PyBitmessage do not have the + # usedpersonally field in their pubkeys table. Let's add it. + if shared.config.getint('bitmessagesettings', 'settingsversion') == 2: + item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' ''' + parameters = '' + self.cur.execute(item, parameters) + self.conn.commit() + + shared.config.set('bitmessagesettings', 'settingsversion', '3') + with open(shared.appdata + 'keys.dat', 'wb') as configfile: + shared.config.write(configfile) + + # People running earlier versions of PyBitmessage do not have the + # encodingtype field in their inbox and sent tables or the read field + # in the inbox table. Let's add them. + if shared.config.getint('bitmessagesettings', 'settingsversion') == 3: + item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' ''' + parameters = '' + self.cur.execute(item, parameters) + + item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' ''' + parameters = '' + self.cur.execute(item, parameters) + + item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' ''' + parameters = '' + self.cur.execute(item, parameters) + self.conn.commit() + + shared.config.set('bitmessagesettings', 'settingsversion', '4') + with open(shared.appdata + 'keys.dat', 'wb') as configfile: + shared.config.write(configfile) + + if shared.config.getint('bitmessagesettings', 'settingsversion') == 4: + shared.config.set('bitmessagesettings', 'defaultnoncetrialsperbyte', str( + shared.networkDefaultProofOfWorkNonceTrialsPerByte)) + shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes', str( + shared.networkDefaultPayloadLengthExtraBytes)) + shared.config.set('bitmessagesettings', 'settingsversion', '5') + + if shared.config.getint('bitmessagesettings', 'settingsversion') == 5: + shared.config.set( + 'bitmessagesettings', 'maxacceptablenoncetrialsperbyte', '0') + shared.config.set( + 'bitmessagesettings', 'maxacceptablepayloadlengthextrabytes', '0') + shared.config.set('bitmessagesettings', 'settingsversion', '6') + with open(shared.appdata + 'keys.dat', 'wb') as configfile: + shared.config.write(configfile) + + # From now on, let us keep a 'version' embedded in the messages.dat + # file so that when we make changes to the database, the database + # version we are on can stay embedded in the messages.dat file. Let us + # check to see if the settings table exists yet. + item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';''' + parameters = '' + self.cur.execute(item, parameters) + if self.cur.fetchall() == []: + # The settings table doesn't exist. We need to make it. + print 'In messages.dat database, creating new \'settings\' table.' + self.cur.execute( + '''CREATE TABLE settings (key text, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) + self.cur.execute( '''INSERT INTO settings VALUES('version','1')''') + self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( + int(time.time()),)) + print 'In messages.dat database, removing an obsolete field from the pubkeys table.' + self.cur.execute( + '''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''') + self.cur.execute( + '''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''') + self.cur.execute( '''DROP TABLE pubkeys''') + self.cur.execute( + '''CREATE TABLE pubkeys (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''' ) + self.cur.execute( + '''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''') + self.cur.execute( '''DROP TABLE pubkeys_backup;''') + print 'Deleting all pubkeys from inventory. They will be redownloaded and then saved with the correct times.' + self.cur.execute( + '''delete from inventory where objecttype = 'pubkey';''') + print 'replacing Bitmessage announcements mailing list with a new one.' + self.cur.execute( + '''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''') + self.cur.execute( + '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') + print 'Commiting.' + self.conn.commit() + print 'Vacuuming message.dat. You might notice that the file size gets much smaller.' + self.cur.execute( ''' VACUUM ''') + + # After code refactoring, the possible status values for sent messages + # as changed. + self.cur.execute( + '''update sent set status='doingmsgpow' where status='doingpow' ''') + self.cur.execute( + '''update sent set status='msgsent' where status='sentmessage' ''') + self.cur.execute( + '''update sent set status='doingpubkeypow' where status='findingpubkey' ''') + self.cur.execute( + '''update sent set status='broadcastqueued' where status='broadcastpending' ''') + self.conn.commit() + + try: + testpayload = '\x00\x00' + t = ('1234', testpayload, '12345678', 'no') + self.cur.execute( '''INSERT INTO pubkeys VALUES(?,?,?,?)''', t) + self.conn.commit() + self.cur.execute( + '''SELECT transmitdata FROM pubkeys WHERE hash='1234' ''') + queryreturn = self.cur.fetchall() + for row in queryreturn: + transmitdata, = row + self.cur.execute('''DELETE FROM pubkeys WHERE hash='1234' ''') + self.conn.commit() + if transmitdata == '': + sys.stderr.write('Problem: The version of SQLite you have cannot store Null values. Please download and install the latest revision of your version of Python (for example, the latest Python 2.7 revision) and try again.\n') + sys.stderr.write('PyBitmessage will now exit very abruptly. You may now see threading errors related to this abrupt exit but the problem you need to solve is related to SQLite.\n\n') + os._exit(0) + except Exception as err: + print err + + # Let us check to see the last time we vaccumed the messages.dat file. + # If it has been more than a month let's do it now. + item = '''SELECT value FROM settings WHERE key='lastvacuumtime';''' + parameters = '' + self.cur.execute(item, parameters) + queryreturn = self.cur.fetchall() + for row in queryreturn: + value, = row + if int(value) < int(time.time()) - 2592000: + print 'It has been a long time since the messages.dat file has been vacuumed. Vacuuming now...' + self.cur.execute( ''' VACUUM ''') + item = '''update settings set value=? WHERE key='lastvacuumtime';''' + parameters = (int(time.time()),) + self.cur.execute(item, parameters) + + while True: + item = shared.sqlSubmitQueue.get() + if item == 'commit': + self.conn.commit() + elif item == 'exit': + self.conn.close() + shared.printLock.acquire() + print 'sqlThread exiting gracefully.' + shared.printLock.release() + return + elif item == 'movemessagstoprog': + shared.printLock.acquire() + print 'the sqlThread is moving the messages.dat file to the local program directory.' + shared.printLock.release() + self.conn.commit() + self.conn.close() + shutil.move( + shared.lookupAppdataFolder() + 'messages.dat', 'messages.dat') + self.conn = sqlite3.connect('messages.dat') + self.conn.text_factory = str + self.cur = self.conn.cursor() + elif item == 'movemessagstoappdata': + shared.printLock.acquire() + print 'the sqlThread is moving the messages.dat file to the Appdata folder.' + shared.printLock.release() + self.conn.commit() + self.conn.close() + shutil.move( + 'messages.dat', shared.lookupAppdataFolder() + 'messages.dat') + self.conn = sqlite3.connect(shared.appdata + 'messages.dat') + self.conn.text_factory = str + self.cur = self.conn.cursor() + elif item == 'deleteandvacuume': + self.cur.execute('''delete from inbox where folder='trash' ''') + self.cur.execute('''delete from sent where folder='trash' ''') + self.conn.commit() + self.cur.execute( ''' VACUUM ''') + else: + parameters = shared.sqlSubmitQueue.get() + # print 'item', item + # print 'parameters', parameters + try: + self.cur.execute(item, parameters) + except Exception as err: + shared.printLock.acquire() + sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str( + item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n') + sys.stderr.write('This program shall now abruptly exit!\n') + shared.printLock.release() + os._exit(0) + + shared.sqlReturnQueue.put(self.cur.fetchall()) + # shared.sqlSubmitQueue.task_done()