Refactor sqlthread #2150

Open
shportix wants to merge 2 commits from shportix/sql-refactoring into v0.6
27 changed files with 1020 additions and 637 deletions

View File

@ -19,6 +19,7 @@ except ImportError:
app_dir = pathmagic.setup() app_dir = pathmagic.setup()
import depends import depends
depends.check_dependencies() depends.check_dependencies()
import getopt import getopt
@ -81,6 +82,7 @@ def signal_handler(signum, frame):
class Main(object): class Main(object):
"""Main PyBitmessage class""" """Main PyBitmessage class"""
def start(self): def start(self):
"""Start main application""" """Start main application"""
# pylint: disable=too-many-statements,too-many-branches,too-many-locals # pylint: disable=too-many-statements,too-many-branches,too-many-locals
@ -261,8 +263,8 @@ class Main(object):
while state.shutdown == 0: while state.shutdown == 0:
time.sleep(1) time.sleep(1)
if ( if (
state.testmode state.testmode
and time.time() - state.last_api_response >= 30 and time.time() - state.last_api_response >= 30
): ):
self.stop() self.stop()
elif not state.enableGUI: elif not state.enableGUI:
@ -385,7 +387,6 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()
# So far, the creation of and management of the Bitmessage protocol and this # So far, the creation of and management of the Bitmessage protocol and this
# client is a one-man operation. Bitcoin tips are quite appreciated. # client is a one-man operation. Bitcoin tips are quite appreciated.
# 1H5XaDA6fYENLbknwZyjiYXYPQaFjjLX2u # 1H5XaDA6fYENLbknwZyjiYXYPQaFjjLX2u

View File

@ -1,11 +1,11 @@
""" """
sqlThread is defined here SQLThread is defined here
""" """
import logging
import os import os
import shutil # used for moving the messages.dat file import shutil # used for moving the messages.dat file
import sqlite3 import sqlite3
import sys
import threading import threading
import time import time
@ -17,624 +17,376 @@ try:
import state import state
from addresses import encodeAddress from addresses import encodeAddress
from bmconfigparser import config, config_ready from bmconfigparser import config, config_ready
from debug import logger
from tr import _translate from tr import _translate
except ImportError: except ImportError:
from . import helper_sql, helper_startup, paths, queues, state from . import helper_sql, helper_startup, paths, queues, state
from .addresses import encodeAddress from .addresses import encodeAddress
from .bmconfigparser import config, config_ready from .bmconfigparser import config, config_ready
from .debug import logger
from .tr import _translate from .tr import _translate
logger = logging.getLogger('default')
class BitmessageDB(object):
""" Upgrade Db with respect to versions """
def __init__(self):
self._current_level = None
self.max_level = 11
self.conn = None
self.cur = None
self._connection_build()
def _connection_build(self):
self._connection_build_internal('messages.dat', False)
def _connection_build_internal(
self, file_name="messages.dat", memory=False
):
"""Establish SQL connection"""
self.conn = sqlite3.connect(
':memory:' if memory else os.path.join(state.appdata, file_name))
self.conn.text_factory = str
self.cur = self.conn.cursor()
self.cur.execute("PRAGMA secure_delete = true")
def __get_current_settings_version(self):
"""Get current setting Version"""
self.cur.execute(
"SELECT value FROM settings WHERE key='version'")
try:
return int(self.cur.fetchall()[0][0])
except (ValueError, IndexError):
return 0
def _upgrade_one_level_sql_statement(self, file_name):
"""Upgrade database versions with applying sql scripts"""
self.initialize_sql("init_version_{}".format(file_name))
def initialize_sql(self, file_name):
"""Initializing sql"""
try:
with open(os.path.join(
paths.codePath(), 'sql', '{}.sql'.format(file_name))
) as sql_file:
sql_as_string = sql_file.read()
self.cur.executescript(sql_as_string)
return True
except OSError as err:
logger.debug('The file is missing. Error message: %s\n',
str(err))
except IOError as err:
logger.debug(
'ERROR trying to initialize database. Error message: %s\n',
str(err))
except sqlite3.Error as err:
logger.error(err)
except Exception as err:
logger.debug(
'ERROR trying to initialize database. Error message: %s\n',
str(err))
return False
@property
def sql_schema_version(self):
"""Getter for get current schema version"""
return self.__get_current_settings_version()
def upgrade_to_latest(self):
"""Initialize upgrade level"""
self.cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='settings'")
if not self.cur.fetchall():
# The settings table doesn't exist. We need to make it.
logger.debug(
"In messages.dat database, creating new 'settings' table.")
self.cur.execute(
"CREATE TABLE settings (key text, value blob, UNIQUE(key)"
" ON CONFLICT REPLACE)")
self.cur.execute("INSERT INTO settings VALUES('version','1')")
self.cur.execute(
"INSERT INTO settings VALUES('lastvacuumtime',?)",
(int(time.time()),))
logger.debug(
'In messages.dat database, removing an obsolete field'
'from the pubkeys table.')
# initiate sql file
self.initialize_sql("upg_sc_if_old_ver_1")
self.conn.commit()
# After code refactoring, the possible status values for sent messages
# have changed.
self.initialize_sql("upg_sc_if_old_ver_2")
self.conn.commit()
while self.sql_schema_version < self.max_level:
self._upgrade_one_level_sql_statement(self.sql_schema_version)
self.conn.commit()
def check_columns_can_store_binary_null(self):
"""Check if sqlite can store binary zeros."""
try:
testpayload = '\x00\x00'
t = ('1234', 1, testpayload, '12345678', 'no')
self.cur.execute("INSERT INTO pubkeys VALUES(?,?,?,?,?)", t)
self.conn.commit()
self.cur.execute(
"SELECT transmitdata FROM pubkeys WHERE address='1234' ")
transmitdata = self.cur.fetchall()[-1][0]
self.cur.execute("DELETE FROM pubkeys WHERE address='1234' ")
self.conn.commit()
if transmitdata != testpayload:
logger.fatal(
'Problem: The version of SQLite you have cannot store Null'
'values. Please download and install the latest revision'
'of your version of Python (for example, the latest '
'Python 2.7 revision) and try again.\n')
logger.fatal(
'PyBitmessage will now exit very abruptly.'
' You may now see threading errors related to this abrupt'
'exit but the problem you need to solve is related to'
'SQLite.\n\n')
os._exit(1)
except Exception as err:
sqlThread.error_handler(err, 'null value test')
def check_vacuum(self):
"""
Check vacuum and apply sql queries for different conditions.
Let us check to see the last time we vaccumed the messages.dat file.
If it has been more than a month let's do it now.
"""
self.cur.execute(
"SELECT value FROM settings WHERE key='lastvacuumtime'")
try:
date = self.cur.fetchall()[-1][0]
except IndexError:
return
if int(date) < int(time.time()) - 86400:
logger.info(
'It has been a long time since the messages.dat file'
' has been vacuumed. Vacuuming now...')
try:
self.cur.execute(''' VACUUM ''')
except Exception as err:
sqlThread.error_handler(err, 'VACUUM')
self.cur.execute(
"UPDATE settings SET value=? WHERE key='lastvacuumtime'",
(int(time.time()),))
def upgrade_config_parser_setting_version(self, settingsversion):
"""Upgrade schema with respect setting version"""
self.initialize_sql("config_setting_ver_{}".format(settingsversion))
def initialize_schema(self):
"""Initialize DB schema"""
try:
inbox_exists = list(self.cur.execute("PRAGMA table_info(inbox)"))
if not inbox_exists:
self.initialize_sql("initialize_schema")
self.conn.commit()
logger.info('Created messages database file')
except Exception as err:
if str(err) == 'table inbox already exists':
logger.debug('Database file already exists.')
else:
logger.fatal(
'Error trying to create database file (message.dat).'
' Error message: %s\n', str(err))
os._exit(1)
def create_sql_function(self):
"""Apply create_function to DB"""
try:
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
except (TypeError, sqlite3.NotSupportedError) as err:
logger.debug(
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
self.conn.create_function("enaddr", 3, encodeAddress)
class sqlThread(threading.Thread): class sqlThread(threading.Thread):
"""A thread for all SQL operations""" """A thread for all SQL operations"""
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="SQL") threading.Thread.__init__(self, name="SQL")
self.db = None
self.max_setting_level = 4
self.rowcount = 0
logger.debug('Init thread in sqlthread')
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements @property
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`""" def sql_config_settings_version(self):
helper_sql.sql_available = True """ Getter for BMConfigParser (obj) """
config_ready.wait()
self.conn = sqlite3.connect(state.appdata + 'messages.dat')
self.conn.text_factory = str
self.cur = self.conn.cursor()
self.cur.execute('PRAGMA secure_delete = true') return config.getint(
# call create_function for encode address
self.create_function()
try:
self.cur.execute(
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,'''
''' received text, message text, folder text, encodingtype int, read bool, sighash blob,'''
''' UNIQUE(msgid) ON CONFLICT REPLACE)''')
self.cur.execute(
'''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,'''
''' message text, ackdata blob, senttime integer, lastactiontime integer,'''
''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''')
self.cur.execute(
'''CREATE TABLE subscriptions (label text, address text, enabled bool)''')
self.cur.execute(
'''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
self.cur.execute(
'''CREATE TABLE blacklist (label text, address text, enabled bool)''')
self.cur.execute(
'''CREATE TABLE whitelist (label text, address text, enabled bool)''')
self.cur.execute(
'''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,'''
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
self.cur.execute(
'''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,'''
''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
self.cur.execute(
'''INSERT INTO subscriptions VALUES'''
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
self.cur.execute(
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''')
self.cur.execute('''INSERT INTO settings VALUES('version','11')''')
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
int(time.time()),))
self.cur.execute(
'''CREATE TABLE objectprocessorqueue'''
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
self.conn.commit()
logger.info('Created messages database file')
except Exception as err:
if str(err) == 'table inbox already exists':
logger.debug('Database file already exists.')
else:
sys.stderr.write(
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
os._exit(0)
# If the settings version is equal to 2 or 3 then the
# sqlThread will modify the pubkeys table and change
# the settings version to 4.
settingsversion = config.getint(
'bitmessagesettings', 'settingsversion') 'bitmessagesettings', 'settingsversion')
# People running earlier versions of PyBitmessage do not have the @sql_config_settings_version.setter
# usedpersonally field in their pubkeys table. Let's add it. def sql_config_settings_version(self, settingsversion): # pylint: disable=R0201, no-self-use
if settingsversion == 2: # Setter for BmConfigparser
item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' '''
parameters = ''
self.cur.execute(item, parameters)
self.conn.commit()
settingsversion = 3
# People running earlier versions of PyBitmessage do not have the
# encodingtype field in their inbox and sent tables or the read field
# in the inbox table. Let's add them.
if settingsversion == 3:
item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' '''
parameters = ''
self.cur.execute(item, parameters)
item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' '''
parameters = ''
self.cur.execute(item, parameters)
item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' '''
parameters = ''
self.cur.execute(item, parameters)
self.conn.commit()
settingsversion = 4
config.set( config.set(
'bitmessagesettings', 'settingsversion', str(settingsversion)) 'bitmessagesettings', 'settingsversion', str(int(settingsversion) + 1))
config.save() return config.save()
def upgrade_config_setting_version(self):
"""
upgrade config parser setting version.
If the settings version is equal to 2 or 3 then the
sqlThread will modify the pubkeys table and change
the settings version to 4.
"""
while self.sql_config_settings_version < self.max_setting_level:
self.db.upgrade_config_parser_setting_version(self.sql_config_settings_version)
self.sql_config_settings_version = self.sql_config_settings_version
@staticmethod
def error_handler(err, command, query=None, parameters=None):
"""Common error handler"""
if str(err) == 'database or disk is full':
logger.fatal(
"(While %s) Alert: Your disk or data storage volume is full. sqlThread will now exit.", command
)
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
else:
logger.fatal(
'Major error occurred when trying to execute a SQL statement within the sqlThread.'
' Please tell Atheros about this error message or post it in the forum!'
' Error occurred while trying to execute statement: "%s" Here are the parameters;'
' you might want to censor this data with asterisks (***)'
' as it can contain private information: %s.'
' Here is the actual error message thrown by the sqlThread: %s',
str(query),
str(repr(parameters)),
str(err))
logger.fatal('This program shall now abruptly exit!')
os._exit(0)
def is_query_commit(self):
"""When query == 'commit'"""
try:
self.db.conn.commit()
except Exception as err:
self.error_handler(err, 'committing')
def is_query_movemessagstoprog(self):
"""When query == 'movemessagstoprogs'"""
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
try:
self.db.conn.commit()
except Exception as err:
self.error_handler(err, 'movemessagstoprog')
self.db.conn.close()
shutil.move(
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
self.db.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
self.db.conn.text_factory = str
self.db.cur = self.db.conn.cursor()
def is_query_deleteandvacuume(self):
"""When query == 'deleteandvacuume'"""
try:
self.db.cur.execute(''' VACUUM ''')
except Exception as err:
self.error_handler(err, 'deleteandvacuume')
self.db.cur.execute('''delete from inbox where folder='trash' ''')
self.db.cur.execute('''delete from sent where folder='trash' ''')
self.db.conn.commit()
def is_query_other(self, query):
"""When the query can be default or other '"""
parameters = helper_sql.sqlSubmitQueue.get()
try:
self.db.cur.execute(query, parameters)
self.rowcount = self.db.cur.rowcount
return self.rowcount
except Exception as err:
self.error_handler(err, 'cur.execute', query, parameters)
def is_query_movemessagestoappdata(self):
"""When query == 'movemessagestoappdata'"""
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
try:
self.db.conn.commit()
except Exception as err:
self.error_handler(err, 'movemessagstoappdata')
self.db.conn.close()
shutil.move(
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
self.db.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
self.db.conn.text_factory = str
self.db.cur = self.db.conn.cursor()
def is_query_exit(self):
"""When query == 'exit'"""
self.db.conn.close()
logger.info('sqlThread exiting gracefully.')
def loop_queue(self):
"""Looping queue and process them"""
query = helper_sql.sqlSubmitQueue.get()
if query == 'commit':
self.is_query_commit()
elif query == 'exit':
self.is_query_exit()
return False
elif query == 'movemessagstoprog':
self.is_query_movemessagstoprog()
elif query == 'movemessagstoappdata':
self.is_query_movemessagestoappdata()
elif query == 'deleteandvacuume':
self.is_query_deleteandvacuume()
else:
self.rowcount = self.is_query_other(query)
helper_sql.sqlReturnQueue.put((self.db.cur.fetchall(), self.rowcount))
return True
def run(self): # pylint: disable=R0204, E501
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
logger.info('Init thread in sqlthread')
# pylint: disable=redefined-variable-type
if state.testmode:
self.db = TestDB()
else:
self.db = BitmessageDB()
helper_sql.sql_available = True
config_ready.wait()
self.db.create_sql_function()
self.db.initialize_schema()
self.upgrade_config_setting_version()
helper_startup.updateConfig() helper_startup.updateConfig()
# From now on, let us keep a 'version' embedded in the messages.dat self.db.upgrade_to_latest()
# file so that when we make changes to the database, the database
# version we are on can stay embedded in the messages.dat file. Let us
# check to see if the settings table exists yet.
item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';'''
parameters = ''
self.cur.execute(item, parameters)
if self.cur.fetchall() == []:
# The settings table doesn't exist. We need to make it.
logger.debug(
"In messages.dat database, creating new 'settings' table.")
self.cur.execute(
'''CREATE TABLE settings (key text, value blob, UNIQUE(key) ON CONFLICT REPLACE)''')
self.cur.execute('''INSERT INTO settings VALUES('version','1')''')
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
int(time.time()),))
logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.')
self.cur.execute(
'''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int,'''
''' usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''')
self.cur.execute(
'''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''')
self.cur.execute('''DROP TABLE pubkeys''')
self.cur.execute(
'''CREATE TABLE pubkeys'''
''' (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''')
self.cur.execute(
'''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
self.cur.execute('''DROP TABLE pubkeys_backup;''')
logger.debug(
'Deleting all pubkeys from inventory.'
' They will be redownloaded and then saved with the correct times.')
self.cur.execute(
'''delete from inventory where objecttype = 'pubkey';''')
logger.debug('replacing Bitmessage announcements mailing list with a new one.')
self.cur.execute(
'''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''')
self.cur.execute(
'''INSERT INTO subscriptions VALUES'''
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
logger.debug('Commiting.')
self.conn.commit()
logger.debug('Vacuuming message.dat. You might notice that the file size gets much smaller.')
self.cur.execute(''' VACUUM ''')
# After code refactoring, the possible status values for sent messages self.db.check_columns_can_store_binary_null()
# have changed.
self.cur.execute(
'''update sent set status='doingmsgpow' where status='doingpow' ''')
self.cur.execute(
'''update sent set status='msgsent' where status='sentmessage' ''')
self.cur.execute(
'''update sent set status='doingpubkeypow' where status='findingpubkey' ''')
self.cur.execute(
'''update sent set status='broadcastqueued' where status='broadcastpending' ''')
self.conn.commit()
# Let's get rid of the first20bytesofencryptedmessage field in self.db.check_vacuum()
# the inventory table.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
if int(self.cur.fetchall()[0][0]) == 2:
logger.debug(
'In messages.dat database, removing an obsolete field from'
' the inventory table.')
self.cur.execute(
'''CREATE TEMPORARY TABLE inventory_backup'''
'''(hash blob, objecttype text, streamnumber int, payload blob,'''
''' receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE);''')
self.cur.execute(
'''INSERT INTO inventory_backup SELECT hash, objecttype, streamnumber, payload, receivedtime'''
''' FROM inventory;''')
self.cur.execute('''DROP TABLE inventory''')
self.cur.execute(
'''CREATE TABLE inventory'''
''' (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer,'''
''' UNIQUE(hash) ON CONFLICT REPLACE)''')
self.cur.execute(
'''INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime'''
''' FROM inventory_backup;''')
self.cur.execute('''DROP TABLE inventory_backup;''')
item = '''update settings set value=? WHERE key='version';'''
parameters = (3,)
self.cur.execute(item, parameters)
# Add a new column to the inventory table to store tags.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 1 or currentVersion == 3:
logger.debug(
'In messages.dat database, adding tag field to'
' the inventory table.')
item = '''ALTER TABLE inventory ADD tag blob DEFAULT '' '''
parameters = ''
self.cur.execute(item, parameters)
item = '''update settings set value=? WHERE key='version';'''
parameters = (4,)
self.cur.execute(item, parameters)
# Add a new column to the pubkeys table to store the address version.
# We're going to trash all of our pubkeys and let them be redownloaded.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 4:
self.cur.execute('''DROP TABLE pubkeys''')
self.cur.execute(
'''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int,'''
'''usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''')
self.cur.execute(
'''delete from inventory where objecttype = 'pubkey';''')
item = '''update settings set value=? WHERE key='version';'''
parameters = (5,)
self.cur.execute(item, parameters)
# Add a new table: objectprocessorqueue with which to hold objects
# that have yet to be processed if the user shuts down Bitmessage.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 5:
self.cur.execute('''DROP TABLE knownnodes''')
self.cur.execute(
'''CREATE TABLE objectprocessorqueue'''
''' (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
item = '''update settings set value=? WHERE key='version';'''
parameters = (6,)
self.cur.execute(item, parameters)
# changes related to protocol v3
# In table inventory and objectprocessorqueue, objecttype is now
# an integer (it was a human-friendly string previously)
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 6:
logger.debug(
'In messages.dat database, dropping and recreating'
' the inventory table.')
self.cur.execute('''DROP TABLE inventory''')
self.cur.execute(
'''CREATE TABLE inventory'''
''' (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer,'''
''' tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
self.cur.execute('''DROP TABLE objectprocessorqueue''')
self.cur.execute(
'''CREATE TABLE objectprocessorqueue'''
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
item = '''update settings set value=? WHERE key='version';'''
parameters = (7,)
self.cur.execute(item, parameters)
logger.debug(
'Finished dropping and recreating the inventory table.')
# The format of data stored in the pubkeys table has changed. Let's
# clear it, and the pubkeys from inventory, so that they'll
# be re-downloaded.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 7:
logger.debug(
'In messages.dat database, clearing pubkeys table'
' because the data format has been updated.')
self.cur.execute(
'''delete from inventory where objecttype = 1;''')
self.cur.execute(
'''delete from pubkeys;''')
# Any sending messages for which we *thought* that we had
# the pubkey must be rechecked.
self.cur.execute(
'''UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey';''')
query = '''update settings set value=? WHERE key='version';'''
parameters = (8,)
self.cur.execute(query, parameters)
logger.debug('Finished clearing currently held pubkeys.')
# Add a new column to the inbox table to store the hash of
# the message signature. We'll use this as temporary message UUID
# in order to detect duplicates.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 8:
logger.debug(
'In messages.dat database, adding sighash field to'
' the inbox table.')
item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' '''
parameters = ''
self.cur.execute(item, parameters)
item = '''update settings set value=? WHERE key='version';'''
parameters = (9,)
self.cur.execute(item, parameters)
# We'll also need a `sleeptill` field and a `ttl` field. Also we
# can combine the pubkeyretrynumber and msgretrynumber into one.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 9:
logger.info(
'In messages.dat database, making TTL-related changes:'
' combining the pubkeyretrynumber and msgretrynumber'
' fields into the retrynumber field and adding the'
' sleeptill and ttl fields...')
self.cur.execute(
'''CREATE TEMPORARY TABLE sent_backup'''
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
''' ackdata blob, lastactiontime integer, status text, retrynumber integer,'''
''' folder text, encodingtype int)''')
self.cur.execute(
'''INSERT INTO sent_backup SELECT msgid, toaddress, toripe, fromaddress,'''
''' subject, message, ackdata, lastactiontime,'''
''' status, 0, folder, encodingtype FROM sent;''')
self.cur.execute('''DROP TABLE sent''')
self.cur.execute(
'''CREATE TABLE sent'''
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
''' ackdata blob, senttime integer, lastactiontime integer, sleeptill int, status text,'''
''' retrynumber integer, folder text, encodingtype int, ttl int)''')
self.cur.execute(
'''INSERT INTO sent SELECT msgid, toaddress, toripe, fromaddress, subject, message, ackdata,'''
''' lastactiontime, lastactiontime, 0, status, 0, folder, encodingtype, 216000 FROM sent_backup;''')
self.cur.execute('''DROP TABLE sent_backup''')
logger.info('In messages.dat database, finished making TTL-related changes.')
logger.debug('In messages.dat database, adding address field to the pubkeys table.')
# We're going to have to calculate the address for each row in the pubkeys
# table. Then we can take out the hash field.
self.cur.execute('''ALTER TABLE pubkeys ADD address text DEFAULT '' ;''')
# replica for loop to update hashed address
self.cur.execute('''UPDATE pubkeys SET address=(enaddr(pubkeys.addressversion, 1, hash)); ''')
# Now we can remove the hash field from the pubkeys table.
self.cur.execute(
'''CREATE TEMPORARY TABLE pubkeys_backup'''
''' (address text, addressversion int, transmitdata blob, time int,'''
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
self.cur.execute(
'''INSERT INTO pubkeys_backup'''
''' SELECT address, addressversion, transmitdata, time, usedpersonally FROM pubkeys;''')
self.cur.execute('''DROP TABLE pubkeys''')
self.cur.execute(
'''CREATE TABLE pubkeys'''
''' (address text, addressversion int, transmitdata blob, time int, usedpersonally text,'''
''' UNIQUE(address) ON CONFLICT REPLACE)''')
self.cur.execute(
'''INSERT INTO pubkeys SELECT'''
''' address, addressversion, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
self.cur.execute('''DROP TABLE pubkeys_backup''')
logger.debug(
'In messages.dat database, done adding address field to the pubkeys table'
' and removing the hash field.')
self.cur.execute('''update settings set value=10 WHERE key='version';''')
# Update the address colunm to unique in addressbook table
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 10:
logger.debug(
'In messages.dat database, updating address column to UNIQUE'
' in the addressbook table.')
self.cur.execute(
'''ALTER TABLE addressbook RENAME TO old_addressbook''')
self.cur.execute(
'''CREATE TABLE addressbook'''
''' (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
self.cur.execute(
'''INSERT INTO addressbook SELECT label, address FROM old_addressbook;''')
self.cur.execute('''DROP TABLE old_addressbook''')
self.cur.execute('''update settings set value=11 WHERE key='version';''')
# Are you hoping to add a new option to the keys.dat file of existing
# Bitmessage users or modify the SQLite database? Add it right
# above this line!
try:
testpayload = '\x00\x00'
t = ('1234', 1, testpayload, '12345678', 'no')
self.cur.execute('''INSERT INTO pubkeys VALUES(?,?,?,?,?)''', t)
self.conn.commit()
self.cur.execute(
'''SELECT transmitdata FROM pubkeys WHERE address='1234' ''')
queryreturn = self.cur.fetchall()
for row in queryreturn:
transmitdata, = row
self.cur.execute('''DELETE FROM pubkeys WHERE address='1234' ''')
self.conn.commit()
if transmitdata == '':
logger.fatal(
'Problem: The version of SQLite you have cannot store Null values.'
' Please download and install the latest revision of your version of Python'
' (for example, the latest Python 2.7 revision) and try again.\n')
logger.fatal(
'PyBitmessage will now exit very abruptly.'
' You may now see threading errors related to this abrupt exit'
' but the problem you need to solve is related to SQLite.\n\n')
os._exit(0)
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(While null value test) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
else:
logger.error(err)
# Let us check to see the last time we vaccumed the messages.dat file.
# If it has been more than a month let's do it now.
item = '''SELECT value FROM settings WHERE key='lastvacuumtime';'''
parameters = ''
self.cur.execute(item, parameters)
queryreturn = self.cur.fetchall()
for row in queryreturn:
value, = row
if int(value) < int(time.time()) - 86400:
logger.info('It has been a long time since the messages.dat file has been vacuumed. Vacuuming now...')
try:
self.cur.execute(''' VACUUM ''')
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(While VACUUM) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
item = '''update settings set value=? WHERE key='lastvacuumtime';'''
parameters = (int(time.time()),)
self.cur.execute(item, parameters)
helper_sql.sql_ready.set() helper_sql.sql_ready.set()
while True: while self.loop_queue():
item = helper_sql.sqlSubmitQueue.get() pass
if item == 'commit':
try:
self.conn.commit()
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(While committing) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
elif item == 'exit':
self.conn.close()
logger.info('sqlThread exiting gracefully.')
return
elif item == 'movemessagstoprog':
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
try: class TestDB(BitmessageDB):
self.conn.commit() """Database connection build for test e"""
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(while movemessagstoprog) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
self.conn.close()
shutil.move(
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
self.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
self.conn.text_factory = str
self.cur = self.conn.cursor()
elif item == 'movemessagstoappdata':
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
try: def _connection_build(self):
self.conn.commit() self._connection_build_internal("memory", True)
except Exception as err: return self.conn, self.cur
if str(err) == 'database or disk is full':
logger.fatal(
'(while movemessagstoappdata) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
self.conn.close()
shutil.move(
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
self.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
self.conn.text_factory = str
self.cur = self.conn.cursor()
elif item == 'deleteandvacuume':
self.cur.execute('''delete from inbox where folder='trash' ''')
self.cur.execute('''delete from sent where folder='trash' ''')
self.conn.commit()
try:
self.cur.execute(''' VACUUM ''')
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(while deleteandvacuume) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
else:
parameters = helper_sql.sqlSubmitQueue.get()
rowcount = 0
try:
self.cur.execute(item, parameters)
rowcount = self.cur.rowcount
except Exception as err:
if str(err) == 'database or disk is full':
logger.fatal(
'(while cur.execute) Alert: Your disk or data storage volume is full.'
' sqlThread will now exit.')
queues.UISignalQueue.put((
'alert', (
_translate(
"MainWindow",
"Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
True)))
os._exit(0)
else:
logger.fatal(
'Major error occurred when trying to execute a SQL statement within the sqlThread.'
' Please tell Atheros about this error message or post it in the forum!'
' Error occurred while trying to execute statement: "%s" Here are the parameters;'
' you might want to censor this data with asterisks (***)'
' as it can contain private information: %s.'
' Here is the actual error message thrown by the sqlThread: %s',
str(item),
str(repr(parameters)),
str(err))
logger.fatal('This program shall now abruptly exit!')
os._exit(0)
helper_sql.sqlReturnQueue.put((self.cur.fetchall(), rowcount))
# helper_sql.sqlSubmitQueue.task_done()
def create_function(self):
# create_function
try:
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
except (TypeError, sqlite3.NotSupportedError) as err:
logger.debug(
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
self.conn.create_function("enaddr", 3, encodeAddress)

View File

@ -0,0 +1,3 @@
ALTER TABLE inventory ADD first20bytesofencryptedmessage blob DEFAULT '';
UPDATE settings SET value = 2 WHERE key = 'version';

View File

@ -13,3 +13,5 @@ CREATE TABLE `addressbook` (
INSERT INTO addressbook SELECT label, address FROM old_addressbook; INSERT INTO addressbook SELECT label, address FROM old_addressbook;
DROP TABLE old_addressbook; DROP TABLE old_addressbook;
UPDATE settings SET value = 11 WHERE key = 'version';

View File

@ -27,3 +27,5 @@ CREATE TABLE `inventory` (
INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime FROM inventory_backup; INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime FROM inventory_backup;
DROP TABLE inventory_backup; DROP TABLE inventory_backup;
UPDATE settings SET value = 3 WHERE key = 'version';

View File

@ -3,3 +3,5 @@
-- --
ALTER TABLE inventory ADD tag blob DEFAULT ''; ALTER TABLE inventory ADD tag blob DEFAULT '';
UPDATE settings SET value = 4 WHERE key = 'version';

View File

@ -15,3 +15,5 @@ CREATE TABLE `pubkeys` (
) ; ) ;
DELETE FROM inventory WHERE objecttype = 'pubkey'; DELETE FROM inventory WHERE objecttype = 'pubkey';
UPDATE settings SET value = 5 WHERE key = 'version';

View File

@ -10,3 +10,5 @@ CREATE TABLE `objectprocessorqueue` (
`data` blob, `data` blob,
UNIQUE(objecttype, data) ON CONFLICT REPLACE UNIQUE(objecttype, data) ON CONFLICT REPLACE
) ; ) ;
UPDATE settings SET value = 6 WHERE key = 'version';

View File

@ -23,3 +23,5 @@ CREATE TABLE `objectprocessorqueue` (
`data` blob, `data` blob,
UNIQUE(objecttype, data) ON CONFLICT REPLACE UNIQUE(objecttype, data) ON CONFLICT REPLACE
) ; ) ;
UPDATE settings SET value = 7 WHERE key = 'version';

View File

@ -9,3 +9,5 @@ DELETE FROM inventory WHERE objecttype = 1;
DELETE FROM pubkeys; DELETE FROM pubkeys;
UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey'; UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey';
UPDATE settings SET value = 8 WHERE key = 'version';

View File

@ -5,3 +5,5 @@
-- --
ALTER TABLE inbox ADD sighash blob DEFAULT ''; ALTER TABLE inbox ADD sighash blob DEFAULT '';
UPDATE settings SET value = 9 WHERE key = 'version';

View File

@ -27,7 +27,7 @@ CREATE TABLE `sent` (
`ackdata` blob, `ackdata` blob,
`senttime` integer, `senttime` integer,
`lastactiontime` integer, `lastactiontime` integer,
`sleeptill` int, `sleeptill` integer,
`status` text, `status` text,
`retrynumber` integer, `retrynumber` integer,
`folder` text, `folder` text,
@ -72,3 +72,5 @@ CREATE TABLE `pubkeys` (
INSERT INTO pubkeys SELECT address, addressversion, transmitdata, `time`, usedpersonally FROM pubkeys_backup; INSERT INTO pubkeys SELECT address, addressversion, transmitdata, `time`, usedpersonally FROM pubkeys_backup;
DROP TABLE pubkeys_backup; DROP TABLE pubkeys_backup;
UPDATE settings SET value = 10 WHERE key = 'version';

View File

@ -0,0 +1,92 @@
CREATE TABLE `inbox` (
`msgid` blob,
`toaddress` text,
`fromaddress` text,
`subject` text,
`received` text,
`message` text,
`folder` text,
`encodingtype` int,
`read` bool,
UNIQUE(msgid) ON CONFLICT REPLACE
);
CREATE TABLE `sent` (
`msgid` blob,
`toaddress` text,
`toripe` blob,
`fromaddress` text,
`subject` text,
`message` text,
`ackdata` blob,
`lastactiontime` integer,
`status` text,
`pubkeyretrynumber` integer,
`msgretrynumber` integer,
`folder` text,
`encodingtype` int
);
CREATE TABLE `subscriptions` (
`label` text,
`address` text,
`enabled` bool
);
CREATE TABLE `addressbook` (
`label` text,
`address` text
);
CREATE TABLE `blacklist` (
`label` text,
`address` text,
`enabled` bool
);
CREATE TABLE `whitelist` (
`label` text,
`address` text,
`enabled` bool
);
CREATE TABLE `pubkeys` (
`hash` blob,
`transmitdata` blob,
`time` int,
`usedpersonally` text,
UNIQUE(hash) ON CONFLICT REPLACE
);
CREATE TABLE `inventory` (
`hash` blob,
`objecttype` text,
`streamnumber` int,
`payload` blob,
`receivedtime` integer,
UNIQUE(hash) ON CONFLICT REPLACE
);
CREATE TABLE `knownnodes` (
`timelastseen` int,
`stream` int,
`services` blob,
`host` blob,
`port` blob,
UNIQUE(host, stream, port) ON CONFLICT REPLACE
);
CREATE TABLE `settings` (
`key` blob,
`value` blob,
UNIQUE(key) ON CONFLICT REPLACE
);
INSERT INTO subscriptions VALUES ('Bitmessage new releases/announcements', 'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw', 1);
INSERT INTO settings VALUES('version', 1);
INSERT INTO settings VALUES('lastvacuumtime', CAST(strftime('%s', 'now') AS STR) );
INSERT INTO inventory VALUES( '', 'pubkey', 1, '', 1);

View File

@ -23,7 +23,7 @@ import helper_addressbook
from bmconfigparser import config from bmconfigparser import config
from helper_msgcoding import MsgEncode, MsgDecode from helper_msgcoding import MsgEncode, MsgDecode
from helper_sql import sqlQuery from helper_sql import sqlQuery, sqlExecute
from network import asyncore_pollchoose as asyncore, knownnodes from network import asyncore_pollchoose as asyncore, knownnodes
from network.bmproto import BMProto from network.bmproto import BMProto
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
@ -412,6 +412,22 @@ class TestCore(unittest.TestCase):
self.delete_address_from_addressbook(address1) self.delete_address_from_addressbook(address1)
self.delete_address_from_addressbook(address2) self.delete_address_from_addressbook(address2)
def test_sqlscripts(self):
""" Test sql statements"""
sqlExecute('create table if not exists testtbl (id integer)')
tables = list(sqlQuery("select name from sqlite_master where type is 'table'"))
res = [item for item in tables if 'testtbl' in item]
self.assertEqual(res[0][0], 'testtbl')
queryreturn = sqlExecute("INSERT INTO testtbl VALUES(101);")
self.assertEqual(queryreturn, 1)
queryreturn = sqlQuery('''SELECT * FROM testtbl''')
self.assertEqual(queryreturn[0][0], 101)
sqlQuery("DROP TABLE testtbl")
def run(): def run():
"""Starts all tests intended for core run""" """Starts all tests intended for core run"""

View File

@ -1 +0,0 @@
INSERT INTO `addressbook` VALUES ('test', "BM-2cWzMnxjJ7yRP3nLEWUV5LisTZyREWSxYz"), ('testone', "BM-2cWzMnxjJ7yRP3nLEWUV5LisTZyREWSxYz");

View File

@ -1 +0,0 @@
INSERT INTO `inventory` VALUES ('hash', 1, 1,1, 1,'test');

View File

@ -1 +0,0 @@
INSERT INTO `settings` VALUES ('version','3');

View File

@ -1 +0,0 @@
INSERT INTO `pubkeys` VALUES ('hash', 1, 1, 1,'test');

View File

@ -1 +0,0 @@
INSERT INTO `objectprocessorqueue` VALUES ('hash', 1);

View File

@ -1 +0,0 @@
INSERT INTO `inventory` VALUES ('hash', 1, 1, 1,'test','test');

View File

@ -1,3 +0,0 @@
INSERT INTO `sent` VALUES
(1,'BM-2cWzMnxjJ7yRP3nLEWUV5LisTZyREWSxYz',1,'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK','Test1 subject','message test 1','ackdata',1638176409,1638176409,1638176423,'msgqueued',1,'testfolder',1,2),
(2,'BM-2cWzMnxjJ7yRP3nLEWUV5LisTZyREWSxYz',1,'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK','Test2 subject','message test 2','ackdata',1638176423,1638176423,1638176423,'msgqueued',1,'testfolder',1,2);

View File

@ -1 +0,0 @@
INSERT INTO `inbox` VALUES (1, "poland", "malasia", "test", "yes", "test message", "folder", 1, 1, 1);

View File

@ -1,2 +0,0 @@
INSERT INTO `sent` VALUES
(1,'BM-2cWzMnxjJ7yRP3nLEWUV5LisTZyREWSxYz',1,'BM-2cWzSnwjJ7yRP3nLEWUV5LisTZyREWSzUK','Test1 subject','message test 1','ackdata',1638176409,1638176409,1638176423,'msgqueued',1,'testfolder',1,2);

View File

@ -0,0 +1,3 @@
INSERT INTO addressbook VALUES ('', '');
INSERT INTO addressbook VALUES ('', '');

View File

@ -0,0 +1,7 @@
INSERT INTO inventory VALUES( '', 1, 1, '', 1, '');
INSERT INTO pubkeys VALUES( '', 1, '', 1, '');
INSERT INTO sent VALUES( '', '', '', '', '', '', '', 1, 'doingmsgpow', 1, 1, '', 1);
INSERT INTO sent VALUES( '', '', '', '', '', '', '', 1, 'badkey', 1, 1, '', 1);

View File

@ -0,0 +1 @@
INSERT INTO pubkeys VALUES( x'0001010101010101010101010101010101010101', 3, '', 1, '');

View File

@ -2,43 +2,545 @@
# flake8: noqa:E402 # flake8: noqa:E402
import os import os
import tempfile import tempfile
import threading
import unittest import unittest
from .common import skip_python3
skip_python3()
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir() os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
from pybitmessage.helper_sql import ( from pybitmessage.class_sqlThread import TestDB # noqa:E402
sqlQuery, sql_ready, sqlStoredProcedure) # noqa:E402
from pybitmessage.class_sqlThread import sqlThread # noqa:E402
from pybitmessage.addresses import encodeAddress # noqa:E402 from pybitmessage.addresses import encodeAddress # noqa:E402
class TestSqlThread(unittest.TestCase): class TestSqlBase(object): # pylint: disable=E1101, too-few-public-methods, E1004, W0232
"""Test case for SQL thread""" """ Base for test case """
@classmethod __name__ = None
def setUpClass(cls): root_path = os.path.dirname(os.path.dirname(__file__))
# Start SQL thread test_db = None
sqlLookup = sqlThread()
sqlLookup.daemon = True
sqlLookup.start()
sql_ready.wait()
@classmethod def _setup_db(self): # pylint: disable=W0622, redefined-builtin
def tearDownClass(cls): """
sqlStoredProcedure('exit') Drop all tables before each test case start
for thread in threading.enumerate(): """
if thread.name == "SQL": self.test_db = TestDB()
thread.join() self.test_db.create_sql_function()
self.test_db.initialize_schema()
def get_table_schema(self, table_name):
"""Get table list of column names and value types by table name"""
self.test_db.cur.execute("""PRAGMA table_info({})""".format(table_name))
res = self.test_db.cur.fetchall()
res = [[x[1], x[2].lower()] for x in res]
return res
def execute_test_script(self, test_db_cur, file_name): # pylint: disable=W0622, redefined-builtin
"""
Executing sql script from file
"""
with open(os.path.join(self.root_path, "tests/sql/{}.sql".format(file_name)), 'r') as sql_as_string:
sql_as_string = sql_as_string.read()
test_db_cur.cur.executescript(sql_as_string)
class TestFnBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
""" Test case for Sql function"""
def setUp(self):
"""setup for test case"""
self._setup_db()
def test_create_function(self): def test_create_function(self):
"""Check the result of enaddr function""" """Check the result of enaddr function"""
encoded_str = encodeAddress(4, 1, "21122112211221122112") st = "21122112211221122112".encode()
encoded_str = encodeAddress(4, 1, st)
query = sqlQuery('SELECT enaddr(4, 1, "21122112211221122112")') item = '''SELECT enaddr(4, 1, ?);'''
self.assertEqual( parameters = (st, )
query[0][-1], encoded_str, "test case fail for create_function") self.test_db.cur.execute(item, parameters)
query = self.test_db.cur.fetchall()
self.assertEqual(query[0][-1], encoded_str, "test case fail for create_function")
class TestInitializerBitmessageDB(TestSqlBase, unittest.TestCase):
"""Test case for SQL initializer"""
def setUp(self):
"""
Setup DB schema before start.
And applying default schema for initializer test.
"""
self._setup_db()
def test_initializer(self):
"""
Test db initialization
"""
# check inbox table
res = self.get_table_schema("inbox")
check = [['msgid', 'blob'],
['toaddress', 'text'],
['fromaddress', 'text'],
['subject', 'text'],
['received', 'text'],
['message', 'text'],
['folder', 'text'],
['encodingtype', 'int'],
['read', 'bool'],
['sighash', 'blob']]
self.assertEqual(res, check, "inbox table not valid")
# check sent table
res = self.get_table_schema("sent")
check = [['msgid', 'blob'],
['toaddress', 'text'],
['toripe', 'blob'],
['fromaddress', 'text'],
['subject', 'text'],
['message', 'text'],
['ackdata', 'blob'],
['senttime', 'integer'],
['lastactiontime', 'integer'],
['sleeptill', 'integer'],
['status', 'text'],
['retrynumber', 'integer'],
['folder', 'text'],
['encodingtype', 'int'],
['ttl', 'int']]
self.assertEqual(res, check, "sent table not valid")
# check subscriptions table
res = self.get_table_schema("subscriptions")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "subscriptions table not valid")
# check addressbook table
res = self.get_table_schema("addressbook")
check = [['label', 'text'],
['address', 'text']]
self.assertEqual(res, check, "addressbook table not valid")
# check blacklist table
res = self.get_table_schema("blacklist")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "blacklist table not valid")
# check whitelist table
res = self.get_table_schema("whitelist")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "whitelist table not valid")
# check pubkeys table
res = self.get_table_schema("pubkeys")
check = [['address', 'text'],
['addressversion', 'int'],
['transmitdata', 'blob'],
['time', 'int'],
['usedpersonally', 'text']]
self.assertEqual(res, check, "pubkeys table not valid")
# check inventory table
res = self.get_table_schema("inventory")
check = [['hash', 'blob'],
['objecttype', 'int'],
['streamnumber', 'int'],
['payload', 'blob'],
['expirestime', 'integer'],
['tag', 'blob']]
self.assertEqual(res, check, "inventory table not valid")
# check settings table
res = self.get_table_schema("settings")
check = [['key', 'blob'],
['value', 'blob']]
self.assertEqual(res, check, "settings table not valid")
# check objectprocessorqueue table
res = self.get_table_schema("objectprocessorqueue")
check = [['objecttype', 'int'],
['data', 'blob']]
self.assertEqual(res, check, "objectprocessorqueue table not valid")
class TestUpgradeBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
"""Test case for SQL versions"""
def setUp(self):
"""
Setup DB schema before start.
And applying default schema for version test.
"""
self.test_db = TestDB()
self.test_db.create_sql_function()
self.test_db.initialize_sql("initialize_schema_v1")
self.test_db.conn.commit()
def version(self):
"""
Run SQL Scripts, Initialize DB with respect to versioning
and Upgrade DB schema for all versions
"""
def wrapper(*args):
"""
Run SQL and mocking DB for versions
"""
self = args[0]
func_name = func.__name__
version = func_name.rsplit('_', 1)[-1]
for i in range(1, int(version) + 1):
if i == 7 or i == 9 or i == 10:
self.execute_test_script(self.test_db, 'insert_test_values_version_{}'.format(i))
self.test_db.conn.commit()
self.test_db._upgrade_one_level_sql_statement(i) # pylint: disable= W0212, protected-access
return func(*args) # <-- use (self, ...)
func = self
return wrapper
@version
def test_bm_db_version_1(self):
"""
Test update from version 1 to 2
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 2, "Settings version value not updated")
# check adding first20bytesofencryptedmessage column to inventory table
res = self.get_table_schema('inventory')
check = ['first20bytesofencryptedmessage', 'blob']
answ = (check in res)
self.assertEqual(answ, True, "No first20bytesofencryptedmessage in inventory table in second version")
@version
def test_bm_db_version_2(self):
"""
Test update from version 2 to 3
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 3, "Settings version value not updated")
inventory_schema = self.get_table_schema('inventory')
check_column = ['first20bytesofencryptedmessage', 'blob']
answer = (check_column in inventory_schema)
# check deleting first20bytesofencryptedmessage column to inventory table
self.assertNotEqual(answer, True,
"Column first20bytesofencryptedmessage in table inventory not deleted in version 3")
# check deleting inventory_backup table
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
WHERE type='table' AND name='inventory_backup' ''')
res = self.test_db.cur.fetchall()[0][0]
self.assertNotEqual(res, 1, "Table inventory_backup not deleted in versioning 3")
@version
def test_bm_db_version_3(self):
"""
Test update from version 3 to 4
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 4, "Settings version value not updated")
# check adding tag column to inventory table
inventory_schema = self.get_table_schema('inventory')
check_column = ['tag', 'blob']
answer = (check_column in inventory_schema)
self.assertEqual(answer, True, "No column tag in table inventory in version 4")
@version
def test_bm_db_version_4(self):
"""
Test update from version 4 to 5
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 5, "Settings version value not updated")
# check changing column addressversion type to int in table pubkeys
pubkeys_schema = self.get_table_schema("pubkeys")
check_column = ["addressversion", "int"]
answer = check_column in pubkeys_schema
self.assertEqual(answer, True, "Column addressversion not changed to int in table pubkeys")
# check deleting pubkey objects from inventory table
self.test_db.cur.execute(''' SELECT COUNT(hash) FROM inventory WHERE objecttype = 'pubkey' ''')
res = self.test_db.cur.fetchall()[0][0]
self.assertEqual(res, 0, "Pubkey objects not deleted from inventory table in versioning 5")
@version
def test_bm_db_version_5(self):
"""
Test update from version 5 to 6
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 6, "Settings version value not updated")
# check deleting knownnodes table
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
WHERE type='table' AND name='knownnodes' ''')
res = self.test_db.cur.fetchall()[0][0]
self.assertNotEqual(res, 1, "Table knownnodes not deleted in versioning 6")
# check creating objectprocessorqueue table
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
WHERE type='table' AND name='objectprocessorqueue' ''')
res = self.test_db.cur.fetchall()[0][0]
self.assertNotEqual(res, 0, "Table objectprocessorqueue not created in versioning 6")
# check objectprocessorqueue table schema
objectprocessorqueue_schema = self.get_table_schema("objectprocessorqueue")
check = [['objecttype', 'text'],
['data', 'blob']]
self.assertEqual(objectprocessorqueue_schema, check, "objectprocessorqueue table is not valid")
@version
def test_bm_db_version_6(self):
"""
Test update from version 6 to 7
"""
inventory_schema = self.get_table_schema("inventory")
objectprocessorqueue_schema = self.get_table_schema("objectprocessorqueue")
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 7, "Settings version value not updated")
# check changing objecttype column type to int in table objectprocessorqueue
check = ["objecttype", "int"]
answ = check in objectprocessorqueue_schema
self.assertEqual(answ, True, "Type of objecttype column in table objectprocessorqueue not changed to int")
# check changing objecttype column type to int in table inventory
check = ["objecttype", "int"]
answ = check in inventory_schema
self.assertEqual(answ, True, "Type of objecttype column in table inventory not changed to int")
# check adding expirestime column in table inventory
check = ["expirestime", "integer"]
answ = check in inventory_schema
self.assertEqual(answ, True, "expirestime column not added to table inventory")
# check deleting receivedtime column from table inventory
check = ["receivedtime", "integer"]
answ = check in inventory_schema
self.assertNotEqual(answ, True, "receivedtime column not deleted from table inventory")
@version
def test_bm_db_version_7(self):
"""
Test update from version 7 to 8
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 8, "Settings version value not updated")
# check clearing pubkeys table
self.test_db.cur.execute('''SELECT * FROM pubkeys ''')
pubkeys = self.test_db.cur.fetchall()
self.assertEqual(pubkeys, [], "pubkeys table is not clear")
# check deleting pubkeys from table inventory
self.test_db.cur.execute('''SELECT * FROM inventory WHERE objecttype = 1''')
inventory = self.test_db.cur.fetchall()
self.assertEqual(inventory, [], "pubkeys not deleted from inventory table")
# check updating statuses in sent table
self.test_db.cur.execute('''SELECT status FROM sent ''')
sent = self.test_db.cur.fetchall()
self.assertEqual(sent, [('msgqueued',), ('msgqueued',)], "Statuses in sent table not updated")
@version
def test_bm_db_version_8(self):
"""
Test update from version 8 to 9
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 9, "Settings version value not updated")
# check adding sighash column to inbox table
inbox_schema = self.get_table_schema("inbox")
check = ['sighash', 'blob']
answ = check in inbox_schema
self.assertEqual(answ, True, "sighash column not added to inbox table")
@version
def test_bm_db_version_9(self):
"""
Test update from version 9 to 10
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 10, "Settings version value not updated")
sent_schema = self.get_table_schema('sent')
pubkeys_schema = self.get_table_schema('pubkeys')
# check pubkeys table schema updating
check = ['hash', 'blob']
answ = check in pubkeys_schema
self.assertNotEqual(answ, True, "Column hash not deleted from pubkeys table")
check = ['address', 'text']
answ = check in pubkeys_schema
self.assertEqual(answ, True, "Column address not added to pubkeys table")
# check sent table schema updating
check = ['pubkeyretrynumber', 'integer']
answ = check in sent_schema
self.assertNotEqual(answ, True, "Column pubkeyretrynumber not deleted from sent table")
check = ['msgretrynumber', 'integer']
answ = check in sent_schema
self.assertNotEqual(answ, True, "Column msgretrynumber not deleted from sent table")
check = ['senttime', 'integer']
answ = check in sent_schema
self.assertEqual(answ, True, "Column senttime not added to sent table")
check = ['sleeptill', 'integer']
answ = check in sent_schema
self.assertEqual(answ, True, "Column sleeptill not added to sent table")
check = ['retrynumber', 'integer']
answ = check in sent_schema
self.assertEqual(answ, True, "Column retrynumber not added to sent table")
check = ['ttl', 'int']
answ = check in sent_schema
self.assertEqual(answ, True, "Column ttl not added to sent table")
# check pubkeys_backup table deleting
self.test_db.cur.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='pubkeys_backup'") # noqa
res = self.test_db.cur.fetchall()
self.assertNotEqual(res[0][0], 1, "Table pubkeys_backup not deleted")
# check data migration
check_pubkey = [('BM-2D77qGjcBfFmqn3EGs85ojKJtCh7b3tutK', 3, '', 1, '')]
self.test_db.cur.execute('''SELECT * FROM pubkeys''')
res = self.test_db.cur.fetchall()
self.assertEqual(res, check_pubkey, "Migration pubkeys table data failed")
self.test_db.cur.execute('''SELECT * FROM sent''')
res = self.test_db.cur.fetchall()
check_sent = [('', '', '', '', '', '', '', 1, 1, 0, 'msgqueued', 0, '', 1, 216000),
('', '', '', '', '', '', '', 1, 1, 0, 'msgqueued', 0, '', 1, 216000)]
self.assertEqual(res, check_sent, "Migration sent table data failed")
@version
def test_bm_db_version_10(self):
"""
Test update from version 10 to 11
"""
# check version update in settings table
version = self.test_db.sql_schema_version
self.assertEqual(version, 11, "Settings version value not updated")
# check data migration in addressbook table
self.test_db.cur.execute('''SELECT * FROM addressbook''')
res = self.test_db.cur.fetchall()
self.assertEqual(res, [('', '')], "Migration addressbook table data failed")
def test_upgrade_to_latest(self):
"""
Test upgrade_to_latest func
"""
self.test_db.upgrade_to_latest()
# check inbox table
res = self.get_table_schema("inbox")
check = [['msgid', 'blob'],
['toaddress', 'text'],
['fromaddress', 'text'],
['subject', 'text'],
['received', 'text'],
['message', 'text'],
['folder', 'text'],
['encodingtype', 'int'],
['read', 'bool'],
['sighash', 'blob']]
self.assertEqual(res, check, "inbox table not valid")
# check sent table
res = self.get_table_schema("sent")
check = [['msgid', 'blob'],
['toaddress', 'text'],
['toripe', 'blob'],
['fromaddress', 'text'],
['subject', 'text'],
['message', 'text'],
['ackdata', 'blob'],
['senttime', 'integer'],
['lastactiontime', 'integer'],
['sleeptill', 'integer'],
['status', 'text'],
['retrynumber', 'integer'],
['folder', 'text'],
['encodingtype', 'int'],
['ttl', 'int']]
self.assertEqual(res, check, "sent table not valid")
# check subscriptions table
res = self.get_table_schema("subscriptions")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "subscriptions table not valid")
# check addressbook table
res = self.get_table_schema("addressbook")
check = [['label', 'text'],
['address', 'text']]
self.assertEqual(res, check, "addressbook table not valid")
# check blacklist table
res = self.get_table_schema("blacklist")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "blacklist table not valid")
# check whitelist table
res = self.get_table_schema("whitelist")
check = [['label', 'text'],
['address', 'text'],
['enabled', 'bool']]
self.assertEqual(res, check, "whitelist table not valid")
# check pubkeys table
res = self.get_table_schema("pubkeys")
check = [['address', 'text'],
['addressversion', 'int'],
['transmitdata', 'blob'],
['time', 'int'],
['usedpersonally', 'text']]
self.assertEqual(res, check, "pubkeys table not valid")
# check inventory table
res = self.get_table_schema("inventory")
check = [['hash', 'blob'],
['objecttype', 'int'],
['streamnumber', 'int'],
['payload', 'blob'],
['expirestime', 'integer'],
['tag', 'blob']]
self.assertEqual(res, check, "inventory table not valid")
# check settings table
res = self.get_table_schema("settings")
check = [['key', 'blob'],
['value', 'blob']]
self.assertEqual(res, check, "settings table not valid")
# check objectprocessorqueue table
res = self.get_table_schema("objectprocessorqueue")
check = [['objecttype', 'int'],
['data', 'blob']]
self.assertEqual(res, check, "objectprocessorqueue table not valid")