From 107e1e0ef8c894a485c8ba719069eb07a51b3ab0 Mon Sep 17 00:00:00 2001 From: Muzahid Date: Thu, 3 Jun 2021 20:55:36 +0530 Subject: [PATCH] Refactor sqlthread --- src/class_sqlThread.py | 875 ++++++++++++++---------------------- src/tests/core.py | 18 +- src/tests/sql/__init__.py | 0 src/tests/test_sqlthread.py | 232 ++++++++-- 4 files changed, 558 insertions(+), 567 deletions(-) create mode 100644 src/tests/sql/__init__.py diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 7df9e253..0d1ed086 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -1,11 +1,13 @@ +# pylint: disable=protected-access, too-many-locals, too-many-branches, too-many-statements + """ sqlThread is defined here """ +import logging import os import shutil # used for moving the messages.dat file import sqlite3 -import sys import threading import time @@ -17,128 +19,113 @@ try: import state from addresses import encodeAddress from bmconfigparser import config, config_ready - from debug import logger from tr import _translate except ImportError: from . import helper_sql, helper_startup, paths, queues, state from .addresses import encodeAddress from .bmconfigparser import config, config_ready - from .debug import logger from .tr import _translate +logger = logging.getLogger('default') -class sqlThread(threading.Thread): - """A thread for all SQL operations""" + +class BitmessageDB(object): + """ Upgrade Db with respect to versions """ def __init__(self): - threading.Thread.__init__(self, name="SQL") + self._current_level = None + self.max_level = 11 + self.conn = None + self.cur = None + self._connection_build() + self.root_path = os.path.dirname(os.path.dirname(__file__)) - def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements - """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" - helper_sql.sql_available = True - config_ready.wait() - self.conn = sqlite3.connect(state.appdata + 'messages.dat') + def _connection_build(self): + self._connection_build_internal('messages.dat', False) + + def _connection_build_internal(self, file_name="messages.dat", memory=False): + """ + Stablish SQL connection + """ + if memory: + self.conn = sqlite3.connect(':memory:') + else: + self.conn = sqlite3.connect(os.path.join(state.appdata + file_name)) self.conn.text_factory = str self.cur = self.conn.cursor() - self.cur.execute('PRAGMA secure_delete = true') - # call create_function for encode address - self.create_function() - + def __get_current_settings_version(self): + """ + Get current setting Version + """ + query = "SELECT value FROM settings WHERE key='version'" + parameters = () + self.cur.execute(query, parameters) try: - self.cur.execute( - '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,''' - ''' received text, message text, folder text, encodingtype int, read bool, sighash blob,''' - ''' UNIQUE(msgid) ON CONFLICT REPLACE)''') - self.cur.execute( - '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,''' - ''' message text, ackdata blob, senttime integer, lastactiontime integer,''' - ''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''') - self.cur.execute( - '''CREATE TABLE subscriptions (label text, address text, enabled bool)''') - self.cur.execute( - '''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''') - self.cur.execute( - '''CREATE TABLE blacklist (label text, address text, enabled bool)''') - self.cur.execute( - '''CREATE TABLE whitelist (label text, address text, enabled bool)''') - self.cur.execute( - '''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,''' - ''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''') - self.cur.execute( - '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,''' - ''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''') - self.cur.execute( - '''INSERT INTO subscriptions VALUES''' - '''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') - self.cur.execute( - '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''') - self.cur.execute('''INSERT INTO settings VALUES('version','11')''') - self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( - int(time.time()),)) - self.cur.execute( - '''CREATE TABLE objectprocessorqueue''' - ''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''') - self.conn.commit() - logger.info('Created messages database file') + return int(self.cur.fetchall()[0][0]) + except (ValueError, IndexError): + return 0 + + def _upgrade_one_level_sql_statement(self, file_name): + """ + Upgrade database versions with applying sql scripts + """ + self.initialize_sql("init_version_{}".format(file_name)) + + def initialize_sql(self, file_name): + """ + Initializing sql + """ + try: + with open(os.path.join(self.root_path, "pybitmessage/sql/{}.sql".format(file_name))) as sql_file: + sql_as_string = sql_file.read() + self.cur.executescript(sql_as_string) + return True + except OSError as err: + logger.debug('The file is missing. Error message: %s\n', str(err)) + except IOError as err: + logger.debug( + 'ERROR trying to initialize database. Error message: %s\n', str(err)) + except sqlite3.Error as err: + logger.error(err) except Exception as err: - if str(err) == 'table inbox already exists': - logger.debug('Database file already exists.') + logger.debug( + 'ERROR trying to initialize database. Error message: %s\n', str(err)) + return False - else: - sys.stderr.write( - 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) - os._exit(0) + @property + def sql_schema_version(self): + """ + Getter for get current schema version + """ + return self.__get_current_settings_version() - # If the settings version is equal to 2 or 3 then the - # sqlThread will modify the pubkeys table and change - # the settings version to 4. - settingsversion = config.getint( - 'bitmessagesettings', 'settingsversion') + @sql_schema_version.setter + def sql_schema_version(self, setter): + """ + Update version with one level + """ - # People running earlier versions of PyBitmessage do not have the - # usedpersonally field in their pubkeys table. Let's add it. - if settingsversion == 2: - item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' ''' - parameters = '' - self.cur.execute(item, parameters) - self.conn.commit() + if setter: + query = "UPDATE settings SET value=CAST(value AS INT) + 1 WHERE key = 'version'" + self.cur.execute(query) - settingsversion = 3 + def upgrade_to_latest(self): + """ + Initialize upgrade level + """ - # People running earlier versions of PyBitmessage do not have the - # encodingtype field in their inbox and sent tables or the read field - # in the inbox table. Let's add them. - if settingsversion == 3: - item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' ''' - parameters = '' - self.cur.execute(item, parameters) + while self.sql_schema_version < self.max_level: + self._upgrade_one_level_sql_statement(self.sql_schema_version) + self.sql_schema_version = True # bump sql_schema_version by one - item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' ''' - parameters = '' - self.cur.execute(item, parameters) + def upgrade_schema_if_old_version(self): + """ check settings table exists """ - item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' ''' - parameters = '' - self.cur.execute(item, parameters) - self.conn.commit() - - settingsversion = 4 - - config.set( - 'bitmessagesettings', 'settingsversion', str(settingsversion)) - config.save() - - helper_startup.updateConfig() - - # From now on, let us keep a 'version' embedded in the messages.dat - # file so that when we make changes to the database, the database - # version we are on can stay embedded in the messages.dat file. Let us - # check to see if the settings table exists yet. - item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';''' - parameters = '' - self.cur.execute(item, parameters) + query = "SELECT name FROM sqlite_master WHERE type='table' AND name='settings'" + parameters = () + self.cur.execute(query, parameters) if self.cur.fetchall() == []: # The settings table doesn't exist. We need to make it. logger.debug( @@ -149,277 +136,17 @@ class sqlThread(threading.Thread): self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( int(time.time()),)) logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.') - self.cur.execute( - '''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int,''' - ''' usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''') - self.cur.execute( - '''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''') - self.cur.execute('''DROP TABLE pubkeys''') - self.cur.execute( - '''CREATE TABLE pubkeys''' - ''' (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''') - self.cur.execute( - '''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''') - self.cur.execute('''DROP TABLE pubkeys_backup;''') - logger.debug( - 'Deleting all pubkeys from inventory.' - ' They will be redownloaded and then saved with the correct times.') - self.cur.execute( - '''delete from inventory where objecttype = 'pubkey';''') - logger.debug('replacing Bitmessage announcements mailing list with a new one.') - self.cur.execute( - '''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''') - self.cur.execute( - '''INSERT INTO subscriptions VALUES''' - '''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') - logger.debug('Commiting.') - self.conn.commit() - logger.debug('Vacuuming message.dat. You might notice that the file size gets much smaller.') - self.cur.execute(''' VACUUM ''') + # initiate sql file + self.initialize_sql("upg_sc_if_old_ver_1") # After code refactoring, the possible status values for sent messages # have changed. - self.cur.execute( - '''update sent set status='doingmsgpow' where status='doingpow' ''') - self.cur.execute( - '''update sent set status='msgsent' where status='sentmessage' ''') - self.cur.execute( - '''update sent set status='doingpubkeypow' where status='findingpubkey' ''') - self.cur.execute( - '''update sent set status='broadcastqueued' where status='broadcastpending' ''') - self.conn.commit() - - # Let's get rid of the first20bytesofencryptedmessage field in - # the inventory table. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - if int(self.cur.fetchall()[0][0]) == 2: - logger.debug( - 'In messages.dat database, removing an obsolete field from' - ' the inventory table.') - self.cur.execute( - '''CREATE TEMPORARY TABLE inventory_backup''' - '''(hash blob, objecttype text, streamnumber int, payload blob,''' - ''' receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE);''') - self.cur.execute( - '''INSERT INTO inventory_backup SELECT hash, objecttype, streamnumber, payload, receivedtime''' - ''' FROM inventory;''') - self.cur.execute('''DROP TABLE inventory''') - self.cur.execute( - '''CREATE TABLE inventory''' - ''' (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer,''' - ''' UNIQUE(hash) ON CONFLICT REPLACE)''') - self.cur.execute( - '''INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime''' - ''' FROM inventory_backup;''') - self.cur.execute('''DROP TABLE inventory_backup;''') - item = '''update settings set value=? WHERE key='version';''' - parameters = (3,) - self.cur.execute(item, parameters) - - # Add a new column to the inventory table to store tags. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 1 or currentVersion == 3: - logger.debug( - 'In messages.dat database, adding tag field to' - ' the inventory table.') - item = '''ALTER TABLE inventory ADD tag blob DEFAULT '' ''' - parameters = '' - self.cur.execute(item, parameters) - item = '''update settings set value=? WHERE key='version';''' - parameters = (4,) - self.cur.execute(item, parameters) - - # Add a new column to the pubkeys table to store the address version. - # We're going to trash all of our pubkeys and let them be redownloaded. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 4: - self.cur.execute('''DROP TABLE pubkeys''') - self.cur.execute( - '''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int,''' - '''usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''') - self.cur.execute( - '''delete from inventory where objecttype = 'pubkey';''') - item = '''update settings set value=? WHERE key='version';''' - parameters = (5,) - self.cur.execute(item, parameters) - - # Add a new table: objectprocessorqueue with which to hold objects - # that have yet to be processed if the user shuts down Bitmessage. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 5: - self.cur.execute('''DROP TABLE knownnodes''') - self.cur.execute( - '''CREATE TABLE objectprocessorqueue''' - ''' (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''') - item = '''update settings set value=? WHERE key='version';''' - parameters = (6,) - self.cur.execute(item, parameters) - - # changes related to protocol v3 - # In table inventory and objectprocessorqueue, objecttype is now - # an integer (it was a human-friendly string previously) - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 6: - logger.debug( - 'In messages.dat database, dropping and recreating' - ' the inventory table.') - self.cur.execute('''DROP TABLE inventory''') - self.cur.execute( - '''CREATE TABLE inventory''' - ''' (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer,''' - ''' tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''') - self.cur.execute('''DROP TABLE objectprocessorqueue''') - self.cur.execute( - '''CREATE TABLE objectprocessorqueue''' - ''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''') - item = '''update settings set value=? WHERE key='version';''' - parameters = (7,) - self.cur.execute(item, parameters) - logger.debug( - 'Finished dropping and recreating the inventory table.') - - # The format of data stored in the pubkeys table has changed. Let's - # clear it, and the pubkeys from inventory, so that they'll - # be re-downloaded. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 7: - logger.debug( - 'In messages.dat database, clearing pubkeys table' - ' because the data format has been updated.') - self.cur.execute( - '''delete from inventory where objecttype = 1;''') - self.cur.execute( - '''delete from pubkeys;''') - # Any sending messages for which we *thought* that we had - # the pubkey must be rechecked. - self.cur.execute( - '''UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey';''') - query = '''update settings set value=? WHERE key='version';''' - parameters = (8,) - self.cur.execute(query, parameters) - logger.debug('Finished clearing currently held pubkeys.') - - # Add a new column to the inbox table to store the hash of - # the message signature. We'll use this as temporary message UUID - # in order to detect duplicates. - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 8: - logger.debug( - 'In messages.dat database, adding sighash field to' - ' the inbox table.') - item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' ''' - parameters = '' - self.cur.execute(item, parameters) - item = '''update settings set value=? WHERE key='version';''' - parameters = (9,) - self.cur.execute(item, parameters) - - # We'll also need a `sleeptill` field and a `ttl` field. Also we - # can combine the pubkeyretrynumber and msgretrynumber into one. - - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 9: - logger.info( - 'In messages.dat database, making TTL-related changes:' - ' combining the pubkeyretrynumber and msgretrynumber' - ' fields into the retrynumber field and adding the' - ' sleeptill and ttl fields...') - self.cur.execute( - '''CREATE TEMPORARY TABLE sent_backup''' - ''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,''' - ''' ackdata blob, lastactiontime integer, status text, retrynumber integer,''' - ''' folder text, encodingtype int)''') - self.cur.execute( - '''INSERT INTO sent_backup SELECT msgid, toaddress, toripe, fromaddress,''' - ''' subject, message, ackdata, lastactiontime,''' - ''' status, 0, folder, encodingtype FROM sent;''') - self.cur.execute('''DROP TABLE sent''') - self.cur.execute( - '''CREATE TABLE sent''' - ''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,''' - ''' ackdata blob, senttime integer, lastactiontime integer, sleeptill int, status text,''' - ''' retrynumber integer, folder text, encodingtype int, ttl int)''') - self.cur.execute( - '''INSERT INTO sent SELECT msgid, toaddress, toripe, fromaddress, subject, message, ackdata,''' - ''' lastactiontime, lastactiontime, 0, status, 0, folder, encodingtype, 216000 FROM sent_backup;''') - self.cur.execute('''DROP TABLE sent_backup''') - logger.info('In messages.dat database, finished making TTL-related changes.') - logger.debug('In messages.dat database, adding address field to the pubkeys table.') - # We're going to have to calculate the address for each row in the pubkeys - # table. Then we can take out the hash field. - self.cur.execute('''ALTER TABLE pubkeys ADD address text DEFAULT '' ;''') - - # replica for loop to update hashed address - self.cur.execute('''UPDATE pubkeys SET address=(enaddr(pubkeys.addressversion, 1, hash)); ''') - - # Now we can remove the hash field from the pubkeys table. - self.cur.execute( - '''CREATE TEMPORARY TABLE pubkeys_backup''' - ''' (address text, addressversion int, transmitdata blob, time int,''' - ''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''') - self.cur.execute( - '''INSERT INTO pubkeys_backup''' - ''' SELECT address, addressversion, transmitdata, time, usedpersonally FROM pubkeys;''') - self.cur.execute('''DROP TABLE pubkeys''') - self.cur.execute( - '''CREATE TABLE pubkeys''' - ''' (address text, addressversion int, transmitdata blob, time int, usedpersonally text,''' - ''' UNIQUE(address) ON CONFLICT REPLACE)''') - self.cur.execute( - '''INSERT INTO pubkeys SELECT''' - ''' address, addressversion, transmitdata, time, usedpersonally FROM pubkeys_backup;''') - self.cur.execute('''DROP TABLE pubkeys_backup''') - logger.debug( - 'In messages.dat database, done adding address field to the pubkeys table' - ' and removing the hash field.') - self.cur.execute('''update settings set value=10 WHERE key='version';''') - - # Update the address colunm to unique in addressbook table - item = '''SELECT value FROM settings WHERE key='version';''' - parameters = '' - self.cur.execute(item, parameters) - currentVersion = int(self.cur.fetchall()[0][0]) - if currentVersion == 10: - logger.debug( - 'In messages.dat database, updating address column to UNIQUE' - ' in the addressbook table.') - self.cur.execute( - '''ALTER TABLE addressbook RENAME TO old_addressbook''') - self.cur.execute( - '''CREATE TABLE addressbook''' - ''' (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''') - self.cur.execute( - '''INSERT INTO addressbook SELECT label, address FROM old_addressbook;''') - self.cur.execute('''DROP TABLE old_addressbook''') - self.cur.execute('''update settings set value=11 WHERE key='version';''') - - # Are you hoping to add a new option to the keys.dat file of existing - # Bitmessage users or modify the SQLite database? Add it right - # above this line! + self.initialize_sql("upg_sc_if_old_ver_2") + def check_columns_can_store_binary_null(self): + """ + Check if sqlite can store binary zeros. + """ try: testpayload = '\x00\x00' t = ('1234', 1, testpayload, '12345678', 'no') @@ -443,28 +170,19 @@ class sqlThread(threading.Thread): ' but the problem you need to solve is related to SQLite.\n\n') os._exit(0) except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(While null value test) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - else: - logger.error(err) + sqlThread().error_handler(err, 'null value test') - # Let us check to see the last time we vaccumed the messages.dat file. - # If it has been more than a month let's do it now. - item = '''SELECT value FROM settings WHERE key='lastvacuumtime';''' - parameters = '' - self.cur.execute(item, parameters) + def check_vacuum(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements, + # Redefinition-of-parameters-type-from-tuple-to-str, R0204, line-too-long, E501 + """ + Check vacuum and apply sql queries for different different conditions. + Let us check to see the last time we vaccumed the messages.dat file. + If it has been more than a month let's do it now. + """ + + query = "SELECT value FROM settings WHERE key='lastvacuumtime'" + parameters = () + self.cur.execute(query, parameters) queryreturn = self.cur.fetchall() for row in queryreturn: value, = row @@ -473,168 +191,247 @@ class sqlThread(threading.Thread): try: self.cur.execute(''' VACUUM ''') except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(While VACUUM) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - item = '''update settings set value=? WHERE key='lastvacuumtime';''' + sqlThread().error_handler(err, 'VACUUM') + query = "update settings set value=? WHERE key='lastvacuumtime'" parameters = (int(time.time()),) - self.cur.execute(item, parameters) + self.cur.execute(query, parameters) - helper_sql.sql_ready.set() + def upgrade_config_parser_setting_version(self, settingsversion): + """ + Upgrade schema with respect setting version + """ - while True: - item = helper_sql.sqlSubmitQueue.get() - if item == 'commit': - try: - self.conn.commit() - except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(While committing) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - elif item == 'exit': - self.conn.close() - logger.info('sqlThread exiting gracefully.') + self.initialize_sql("config_setting_ver_{}".format(settingsversion)) - return - elif item == 'movemessagstoprog': - logger.debug('the sqlThread is moving the messages.dat file to the local program directory.') - - try: - self.conn.commit() - except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(while movemessagstoprog) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - self.conn.close() - shutil.move( - paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat') - self.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat') - self.conn.text_factory = str - self.cur = self.conn.cursor() - elif item == 'movemessagstoappdata': - logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.') - - try: - self.conn.commit() - except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(while movemessagstoappdata) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - self.conn.close() - shutil.move( - paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat') - self.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat') - self.conn.text_factory = str - self.cur = self.conn.cursor() - elif item == 'deleteandvacuume': - self.cur.execute('''delete from inbox where folder='trash' ''') - self.cur.execute('''delete from sent where folder='trash' ''') + def initialize_schema(self): + """ + Initialize DB schema + """ + try: + inbox_exists = list(self.cur.execute('PRAGMA table_info(inbox)')) + if not inbox_exists: + self.initialize_sql("initialize_schema") self.conn.commit() - try: - self.cur.execute(''' VACUUM ''') - except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(while deleteandvacuume) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) + logger.info('Created messages database file') + except Exception as err: + if str(err) == 'table inbox already exists': + logger.debug('Database file already exists.') else: - parameters = helper_sql.sqlSubmitQueue.get() - rowcount = 0 - try: - self.cur.execute(item, parameters) - rowcount = self.cur.rowcount - except Exception as err: - if str(err) == 'database or disk is full': - logger.fatal( - '(while cur.execute) Alert: Your disk or data storage volume is full.' - ' sqlThread will now exit.') - queues.UISignalQueue.put(( - 'alert', ( - _translate( - "MainWindow", - "Disk full"), - _translate( - "MainWindow", - 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), - True))) - os._exit(0) - else: - logger.fatal( - 'Major error occurred when trying to execute a SQL statement within the sqlThread.' - ' Please tell Atheros about this error message or post it in the forum!' - ' Error occurred while trying to execute statement: "%s" Here are the parameters;' - ' you might want to censor this data with asterisks (***)' - ' as it can contain private information: %s.' - ' Here is the actual error message thrown by the sqlThread: %s', - str(item), - str(repr(parameters)), - str(err)) - logger.fatal('This program shall now abruptly exit!') + os._exit( + 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) - os._exit(0) - - helper_sql.sqlReturnQueue.put((self.cur.fetchall(), rowcount)) - # helper_sql.sqlSubmitQueue.task_done() - - def create_function(self): - # create_function + def create_sql_function(self): + """ + Apply create_function to DB + """ try: self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True) except (TypeError, sqlite3.NotSupportedError) as err: logger.debug( "Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err)) self.conn.create_function("enaddr", 3, encodeAddress) + + +class sqlThread(threading.Thread): + """A thread for all SQL operations""" + + def __init__(self): + threading.Thread.__init__(self, name="SQL") + self.db = None + self.max_setting_level = 4 + self.rowcount = 0 + logger.debug('Init thread in sqlthread') + + @property + def sql_config_settings_version(self): + """ Getter for BMConfigParser (obj) """ + + return config.getint( + 'bitmessagesettings', 'settingsversion') + + @sql_config_settings_version.setter + def sql_config_settings_version(self, settingsversion): # pylint: disable=R0201, no-self-use + # Setter for BmConfigparser + + config.set( + 'bitmessagesettings', 'settingsversion', str(int(settingsversion) + 1)) + return config.save() + + def upgrade_config_setting_version(self): + """ + upgrade config parser setting version. + If the settings version is equal to 2 or 3 then the + sqlThread will modify the pubkeys table and change + the settings version to 4. + """ + while self.sql_config_settings_version < self.max_setting_level: + self.db.upgrade_config_parser_setting_version(self.sql_config_settings_version) + self.sql_config_settings_version = self.sql_config_settings_version + + @staticmethod + def error_handler(err, command, query=None, parameters=None): + """ + Common error handler + """ + if str(err) == 'database or disk is full': + logger.fatal( + "(While %s) Alert: Your disk or data storage volume is full. sqlThread will now exit.", command + ) + queues.UISignalQueue.put(( + 'alert', ( + _translate( + "MainWindow", + "Disk full"), + _translate( + "MainWindow", + 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), + True))) + else: + logger.fatal( + 'Major error occurred when trying to execute a SQL statement within the sqlThread.' + ' Please tell Atheros about this error message or post it in the forum!' + ' Error occurred while trying to execute statement: "%s" Here are the parameters;' + ' you might want to censor this data with asterisks (***)' + ' as it can contain private information: %s.' + ' Here is the actual error message thrown by the sqlThread: %s', + str(query), + str(repr(parameters)), + str(err)) + logger.fatal('This program shall now abruptly exit!') + os._exit(0) + + def is_query_commit(self): + """ + When query == 'commit' + """ + try: + self.db.conn.commit() + except Exception as err: + self.error_handler(err, 'committing') + + def is_query_movemessagstoprog(self): + """ + When query == 'movemessagstoprogs' + """ + logger.debug('the sqlThread is moving the messages.dat file to the local program directory.') + try: + self.db.conn.commit() + except Exception as err: + self.error_handler(err, 'movemessagstoprog') + self.db.conn.close() + shutil.move( + paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat') + self.db.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat') + self.db.conn.text_factory = str + self.db.cur = self.db.conn.cursor() + + def is_query_deleteandvacuume(self): + """ + When query == 'deleteandvacuume' + """ + try: + self.db.cur.execute(''' VACUUM ''') + except Exception as err: + self.error_handler(err, 'deleteandvacuume') + self.db.cur.execute('''delete from inbox where folder='trash' ''') + self.db.cur.execute('''delete from sent where folder='trash' ''') + self.db.conn.commit() + + def is_query_other(self, query): + """ + When the query can be default or other ' + """ + parameters = helper_sql.sqlSubmitQueue.get() + try: + self.db.cur.execute(query, parameters) + self.rowcount = self.db.cur.rowcount + return self.rowcount + except Exception as err: + self.error_handler(err, 'cur.execute', query, parameters) + + def is_query_movemessagestoappdata(self): + """ + When query == 'movemessagestoappdata' + """ + logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.') + try: + self.db.conn.commit() + except Exception as err: + self.error_handler(err, 'movemessagstoappdata') + self.db.conn.close() + shutil.move( + paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat') + self.db.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat') + self.db.conn.text_factory = str + self.db.cur = self.db.conn.cursor() + + def is_query_exit(self): + """ + When query == 'exit' + """ + self.db.conn.close() + logger.info('sqlThread exiting gracefully.') + + def loop_queue(self): + """ + Looping queue and process them + """ + query = helper_sql.sqlSubmitQueue.get() + if query == 'commit': + self.is_query_commit() + elif query == 'exit': + self.is_query_exit() + return False + elif query == 'movemessagstoprog': + self.is_query_movemessagstoprog() + elif query == 'movemessagstoappdata': + self.is_query_movemessagestoappdata() + elif query == 'deleteandvacuume': + self.is_query_deleteandvacuume() + else: + self.rowcount = self.is_query_other(query) + helper_sql.sqlReturnQueue.put((self.db.cur.fetchall(), self.rowcount)) + return True + + def run(self): # pylint: disable=R0204, E501 + """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" + + logger.info('Init thread in sqlthread') + # pylint: disable=redefined-variable-type + if state.testmode: + self.db = TestDB() + else: + self.db = BitmessageDB() + helper_sql.sql_available = True + + config_ready.wait() + + self.db.create_sql_function() + + self.db.initialize_schema() + + self.upgrade_config_setting_version() + + helper_startup.updateConfig() + + self.db.upgrade_schema_if_old_version() + + self.db.upgrade_to_latest() + + self.db.check_columns_can_store_binary_null() + + self.db.check_vacuum() + + helper_sql.sql_ready.set() + + while self.loop_queue(): + pass + + +class TestDB(BitmessageDB): + """ + Database connection build for test e + """ + def _connection_build(self): + self._connection_build_internal("memory", True) + return self.conn, self.cur diff --git a/src/tests/core.py b/src/tests/core.py index a7247971..4fc9d0b3 100644 --- a/src/tests/core.py +++ b/src/tests/core.py @@ -23,7 +23,7 @@ import helper_addressbook from bmconfigparser import config from helper_msgcoding import MsgEncode, MsgDecode -from helper_sql import sqlQuery +from helper_sql import sqlQuery, sqlExecute from network import asyncore_pollchoose as asyncore, knownnodes from network.bmproto import BMProto from network.connectionpool import BMConnectionPool @@ -412,6 +412,22 @@ class TestCore(unittest.TestCase): self.delete_address_from_addressbook(address1) self.delete_address_from_addressbook(address2) + def test_sqlscripts(self): + """ Test sql statements""" + + sqlExecute('create table if not exists testtbl (id integer)') + tables = list(sqlQuery("select name from sqlite_master where type is 'table'")) + res = [item for item in tables if 'testtbl' in item] + self.assertEqual(res[0][0], 'testtbl') + + queryreturn = sqlExecute("INSERT INTO testtbl VALUES(101);") + self.assertEqual(queryreturn, 1) + + queryreturn = sqlQuery('''SELECT * FROM testtbl''') + self.assertEqual(queryreturn[0][0], 101) + + sqlQuery("DROP TABLE testtbl") + def run(): """Starts all tests intended for core run""" diff --git a/src/tests/sql/__init__.py b/src/tests/sql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/tests/test_sqlthread.py b/src/tests/test_sqlthread.py index a612df3a..ae823593 100644 --- a/src/tests/test_sqlthread.py +++ b/src/tests/test_sqlthread.py @@ -2,43 +2,221 @@ # flake8: noqa:E402 import os import tempfile -import threading import unittest -from .common import skip_python3 - -skip_python3() - os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir() -from pybitmessage.helper_sql import ( - sqlQuery, sql_ready, sqlStoredProcedure) # noqa:E402 -from pybitmessage.class_sqlThread import sqlThread # noqa:E402 +from pybitmessage.class_sqlThread import TestDB # noqa:E402 from pybitmessage.addresses import encodeAddress # noqa:E402 -class TestSqlThread(unittest.TestCase): - """Test case for SQL thread""" +def filter_table_column(schema, column): + """ + Filter column from schema + """ + for x in schema: + for y in x: + if y == column: + yield y - @classmethod - def setUpClass(cls): - # Start SQL thread - sqlLookup = sqlThread() - sqlLookup.daemon = True - sqlLookup.start() - sql_ready.wait() - @classmethod - def tearDownClass(cls): - sqlStoredProcedure('exit') - for thread in threading.enumerate(): - if thread.name == "SQL": - thread.join() +class TestSqlBase(object): # pylint: disable=E1101, too-few-public-methods, E1004, W0232 + """ Base for test case """ + + __name__ = None + root_path = os.path.dirname(os.path.dirname(__file__)) + test_db = None + + def _setup_db(self): # pylint: disable=W0622, redefined-builtin + """ + Drop all tables before each test case start + """ + self.test_db = TestDB() + self.test_db.create_sql_function() + self.test_db.initialize_schema() + + def initialise_database(self, test_db_cur, file): # pylint: disable=W0622, redefined-builtin + """ + Initialise DB + """ + + with open(os.path.join(self.root_path, "tests/sql/{}.sql".format(file)), 'r') as sql_as_string: + sql_as_string = sql_as_string.read() + + test_db_cur.cur.executescript(sql_as_string) + + +class TestFnBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access + """ Test case for Sql function""" + + def setUp(self): + """ + setup for test case + """ + self._setup_db() def test_create_function(self): """Check the result of enaddr function""" - encoded_str = encodeAddress(4, 1, "21122112211221122112") + st = "21122112211221122112".encode() + encoded_str = encodeAddress(4, 1, st) - query = sqlQuery('SELECT enaddr(4, 1, "21122112211221122112")') - self.assertEqual( - query[0][-1], encoded_str, "test case fail for create_function") + item = '''SELECT enaddr(4, 1, ?);''' + parameters = (st, ) + self.test_db.cur.execute(item, parameters) + query = self.test_db.cur.fetchall() + self.assertEqual(query[0][-1], encoded_str, "test case fail for create_function") + + +class TestUpgradeBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access + """Test case for SQL versions""" + + def setUp(self): + """ + Setup DB schema before start. + And applying default schema for version test. + """ + self._setup_db() + self.test_db.cur.execute('''INSERT INTO settings VALUES('version','2')''') + + def version(self): + """ + Run SQL Scripts, Initialize DB with respect to versioning + and Upgrade DB schema for all versions + """ + def wrapper(*args): + """ + Run SQL and mocking DB for versions + """ + self = args[0] + func_name = func.__name__ + version = func_name.rsplit('_', 1)[-1] + self.test_db._upgrade_one_level_sql_statement(int(version)) # pylint: disable= W0212, protected-access + + # Update versions DB mocking + self.initialise_database(self.test_db, "init_version_{}".format(version)) + + return func(*args) # <-- use (self, ...) + func = self + return wrapper + + @version + def test_bm_db_version_2(self): + """ + Test with version 2 + """ + + res = self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master + WHERE type='table' AND name='inventory_backup' ''') + self.assertNotEqual(res, 1, "Table inventory_backup not deleted in versioning 2") + + @version + def test_bm_db_version_3(self): + """ + Test with version 1 + Version 1 and 3 are same so will skip 3 + """ + + res = self.test_db.cur.execute('''PRAGMA table_info('inventory');''') + result = list(filter_table_column(res, "tag")) + self.assertEqual(result, ['tag'], "Data not migrated for version 3") + + @version + def test_bm_db_version_4(self): + """ + Test with version 4 + """ + + self.test_db.cur.execute("select * from pubkeys where addressversion = '1';") + res = self.test_db.cur.fetchall() + self.assertEqual(len(res), 1, "Table inventory not deleted in versioning 4") + + @version + def test_bm_db_version_5(self): + """ + Test with version 5 + """ + + self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='knownnodes' ''') # noqa + res = self.test_db.cur.fetchall() + self.assertNotEqual(res[0][0], 1, "Table knownnodes not deleted in versioning 5") + self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master + WHERE type='table' AND name='objectprocessorqueue'; ''') + res = self.test_db.cur.fetchall() + self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5") + self.test_db.cur.execute(''' SELECT * FROM objectprocessorqueue where objecttype='hash' ; ''') + res = self.test_db.cur.fetchall() + self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5") + + @version + def test_bm_db_version_6(self): + """ + Test with version 6 + """ + + self.test_db.cur.execute('''PRAGMA table_info('inventory');''') + inventory = self.test_db.cur.fetchall() + inventory = list(filter_table_column(inventory, "expirestime")) + self.assertEqual(inventory, ['expirestime'], "Data not migrated for version 6") + + self.test_db.cur.execute('''PRAGMA table_info('objectprocessorqueue');''') + objectprocessorqueue = self.test_db.cur.fetchall() + objectprocessorqueue = list(filter_table_column(objectprocessorqueue, "objecttype")) + self.assertEqual(objectprocessorqueue, ['objecttype'], "Data not migrated for version 6") + + @version + def test_bm_db_version_7(self): + """ + Test with version 7 + """ + + self.test_db.cur.execute('''SELECT * FROM pubkeys ''') + pubkeys = self.test_db.cur.fetchall() + self.assertEqual(pubkeys, [], "Data not migrated for version 7") + + self.test_db.cur.execute('''SELECT * FROM inventory ''') + inventory = self.test_db.cur.fetchall() + self.assertEqual(inventory, [], "Data not migrated for version 7") + + self.test_db.cur.execute('''SELECT status FROM sent ''') + sent = self.test_db.cur.fetchall() + self.assertEqual(sent, [('msgqueued',), ('msgqueued',)], "Data not migrated for version 7") + + @version + def test_bm_db_version_8(self): + """ + Test with version 8 + """ + + self.test_db.cur.execute('''PRAGMA table_info('inbox');''') + res = self.test_db.cur.fetchall() + result = list(filter_table_column(res, "sighash")) + self.assertEqual(result, ['sighash'], "Data not migrated for version 8") + + @version + def test_bm_db_version_9(self): + """ + Test with version 9 + """ + + self.test_db.cur.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='pubkeys_backup'") # noqa + res = self.test_db.cur.fetchall() + self.assertNotEqual(res[0][0], 1, "Table pubkeys_backup not deleted") + + @version + def test_bm_db_version_10(self): + """ + Test with version 10 + """ + + label = "test" + self.test_db.cur.execute("SELECT * FROM addressbook WHERE label='test' ") # noqa + res = self.test_db.cur.fetchall() + self.assertEqual(res[0][0], label, "Data not migrated for version 10") + + def test_bm_db_version_type(self): + """ + Test version type + """ + self.test_db.cur.execute('''INSERT INTO settings VALUES('version','test_string')''') # noqa + res = self.test_db.sql_schema_version + self.assertEqual(type(res), int)