Refactor sqlthread and add create test cases #1999
|
@ -1,11 +1,13 @@
|
||||||
|
# pylint: disable=protected-access, too-many-locals, too-many-branches, too-many-statements
|
||||||
|
|
||||||
"""
|
"""
|
||||||
sqlThread is defined here
|
sqlThread is defined here
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil # used for moving the messages.dat file
|
import shutil # used for moving the messages.dat file
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import sys
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -17,128 +19,113 @@ try:
|
||||||
import state
|
import state
|
||||||
from addresses import encodeAddress
|
from addresses import encodeAddress
|
||||||
from bmconfigparser import config, config_ready
|
from bmconfigparser import config, config_ready
|
||||||
from debug import logger
|
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from . import helper_sql, helper_startup, paths, queues, state
|
from . import helper_sql, helper_startup, paths, queues, state
|
||||||
from .addresses import encodeAddress
|
from .addresses import encodeAddress
|
||||||
from .bmconfigparser import config, config_ready
|
from .bmconfigparser import config, config_ready
|
||||||
from .debug import logger
|
|
||||||
from .tr import _translate
|
from .tr import _translate
|
||||||
|
|
||||||
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
class sqlThread(threading.Thread):
|
|
||||||
"""A thread for all SQL operations"""
|
class BitmessageDB(object):
|
||||||
|
""" Upgrade Db with respect to versions """
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="SQL")
|
self._current_level = None
|
||||||
|
self.max_level = 11
|
||||||
|
self.conn = None
|
||||||
|
self.cur = None
|
||||||
|
self._connection_build()
|
||||||
|
self.root_path = os.path.dirname(os.path.dirname(__file__))
|
||||||
|
|
||||||
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
|
def _connection_build(self):
|
||||||
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
|
self._connection_build_internal('messages.dat', False)
|
||||||
helper_sql.sql_available = True
|
|
||||||
config_ready.wait()
|
def _connection_build_internal(self, file_name="messages.dat", memory=False):
|
||||||
self.conn = sqlite3.connect(state.appdata + 'messages.dat')
|
"""
|
||||||
|
Stablish SQL connection
|
||||||
|
"""
|
||||||
|
if memory:
|
||||||
|
self.conn = sqlite3.connect(':memory:')
|
||||||
|
else:
|
||||||
|
self.conn = sqlite3.connect(os.path.join(state.appdata + file_name))
|
||||||
self.conn.text_factory = str
|
self.conn.text_factory = str
|
||||||
self.cur = self.conn.cursor()
|
self.cur = self.conn.cursor()
|
||||||
|
|
||||||
self.cur.execute('PRAGMA secure_delete = true')
|
self.cur.execute('PRAGMA secure_delete = true')
|
||||||
|
|
||||||
# call create_function for encode address
|
def __get_current_settings_version(self):
|
||||||
self.create_function()
|
"""
|
||||||
|
Get current setting Version
|
||||||
|
"""
|
||||||
|
query = "SELECT value FROM settings WHERE key='version'"
|
||||||
|
parameters = ()
|
||||||
|
self.cur.execute(query, parameters)
|
||||||
try:
|
try:
|
||||||
self.cur.execute(
|
return int(self.cur.fetchall()[0][0])
|
||||||
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,'''
|
except (ValueError, IndexError):
|
||||||
''' received text, message text, folder text, encodingtype int, read bool, sighash blob,'''
|
return 0
|
||||||
''' UNIQUE(msgid) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
def _upgrade_one_level_sql_statement(self, file_name):
|
||||||
'''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,'''
|
"""
|
||||||
''' message text, ackdata blob, senttime integer, lastactiontime integer,'''
|
Upgrade database versions with applying sql scripts
|
||||||
''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''')
|
"""
|
||||||
self.cur.execute(
|
self.initialize_sql("init_version_{}".format(file_name))
|
||||||
'''CREATE TABLE subscriptions (label text, address text, enabled bool)''')
|
|
||||||
self.cur.execute(
|
def initialize_sql(self, file_name):
|
||||||
'''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
|
"""
|
||||||
self.cur.execute(
|
Initializing sql
|
||||||
'''CREATE TABLE blacklist (label text, address text, enabled bool)''')
|
"""
|
||||||
self.cur.execute(
|
try:
|
||||||
'''CREATE TABLE whitelist (label text, address text, enabled bool)''')
|
with open(os.path.join(self.root_path, "pybitmessage/sql/{}.sql".format(file_name))) as sql_file:
|
||||||
self.cur.execute(
|
sql_as_string = sql_file.read()
|
||||||
'''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,'''
|
self.cur.executescript(sql_as_string)
|
||||||
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
|
return True
|
||||||
self.cur.execute(
|
except OSError as err:
|
||||||
'''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,'''
|
logger.debug('The file is missing. Error message: %s\n', str(err))
|
||||||
''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
except IOError as err:
|
||||||
self.cur.execute(
|
logger.debug(
|
||||||
'''INSERT INTO subscriptions VALUES'''
|
'ERROR trying to initialize database. Error message: %s\n', str(err))
|
||||||
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
except sqlite3.Error as err:
|
||||||
self.cur.execute(
|
logger.error(err)
|
||||||
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('version','11')''')
|
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
|
||||||
int(time.time()),))
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
self.conn.commit()
|
|
||||||
logger.info('Created messages database file')
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'table inbox already exists':
|
logger.debug(
|
||||||
logger.debug('Database file already exists.')
|
'ERROR trying to initialize database. Error message: %s\n', str(err))
|
||||||
|
return False
|
||||||
|
|
||||||
else:
|
@property
|
||||||
sys.stderr.write(
|
def sql_schema_version(self):
|
||||||
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
"""
|
||||||
os._exit(0)
|
Getter for get current schema version
|
||||||
|
"""
|
||||||
|
return self.__get_current_settings_version()
|
||||||
|
|
||||||
# If the settings version is equal to 2 or 3 then the
|
@sql_schema_version.setter
|
||||||
# sqlThread will modify the pubkeys table and change
|
def sql_schema_version(self, setter):
|
||||||
# the settings version to 4.
|
"""
|
||||||
settingsversion = config.getint(
|
Update version with one level
|
||||||
'bitmessagesettings', 'settingsversion')
|
"""
|
||||||
|
|
||||||
# People running earlier versions of PyBitmessage do not have the
|
if setter:
|
||||||
# usedpersonally field in their pubkeys table. Let's add it.
|
query = "UPDATE settings SET value=CAST(value AS INT) + 1 WHERE key = 'version'"
|
||||||
if settingsversion == 2:
|
self.cur.execute(query)
|
||||||
item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
settingsversion = 3
|
def upgrade_to_latest(self):
|
||||||
|
"""
|
||||||
|
Initialize upgrade level
|
||||||
|
"""
|
||||||
|
|
||||||
# People running earlier versions of PyBitmessage do not have the
|
while self.sql_schema_version < self.max_level:
|
||||||
# encodingtype field in their inbox and sent tables or the read field
|
self._upgrade_one_level_sql_statement(self.sql_schema_version)
|
||||||
# in the inbox table. Let's add them.
|
self.sql_schema_version = True # bump sql_schema_version by one
|
||||||
if settingsversion == 3:
|
|
||||||
item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' '''
|
def upgrade_schema_if_old_version(self):
|
||||||
parameters = ''
|
""" check settings table exists """
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' '''
|
query = "SELECT name FROM sqlite_master WHERE type='table' AND name='settings'"
|
||||||
parameters = ''
|
parameters = ()
|
||||||
self.cur.execute(item, parameters)
|
self.cur.execute(query, parameters)
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
settingsversion = 4
|
|
||||||
|
|
||||||
config.set(
|
|
||||||
'bitmessagesettings', 'settingsversion', str(settingsversion))
|
|
||||||
config.save()
|
|
||||||
|
|
||||||
helper_startup.updateConfig()
|
|
||||||
|
|
||||||
# From now on, let us keep a 'version' embedded in the messages.dat
|
|
||||||
# file so that when we make changes to the database, the database
|
|
||||||
# version we are on can stay embedded in the messages.dat file. Let us
|
|
||||||
# check to see if the settings table exists yet.
|
|
||||||
item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
if self.cur.fetchall() == []:
|
if self.cur.fetchall() == []:
|
||||||
# The settings table doesn't exist. We need to make it.
|
# The settings table doesn't exist. We need to make it.
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
@ -149,277 +136,17 @@ class sqlThread(threading.Thread):
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
||||||
int(time.time()),))
|
int(time.time()),))
|
||||||
logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.')
|
logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.')
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int,'''
|
|
||||||
''' usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys'''
|
|
||||||
''' (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys_backup;''')
|
|
||||||
logger.debug(
|
|
||||||
'Deleting all pubkeys from inventory.'
|
|
||||||
' They will be redownloaded and then saved with the correct times.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 'pubkey';''')
|
|
||||||
logger.debug('replacing Bitmessage announcements mailing list with a new one.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO subscriptions VALUES'''
|
|
||||||
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
|
||||||
logger.debug('Commiting.')
|
|
||||||
self.conn.commit()
|
|
||||||
logger.debug('Vacuuming message.dat. You might notice that the file size gets much smaller.')
|
|
||||||
self.cur.execute(''' VACUUM ''')
|
|
||||||
|
|
||||||
|
# initiate sql file
|
||||||
|
self.initialize_sql("upg_sc_if_old_ver_1")
|
||||||
# After code refactoring, the possible status values for sent messages
|
# After code refactoring, the possible status values for sent messages
|
||||||
# have changed.
|
# have changed.
|
||||||
self.cur.execute(
|
self.initialize_sql("upg_sc_if_old_ver_2")
|
||||||
'''update sent set status='doingmsgpow' where status='doingpow' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='msgsent' where status='sentmessage' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='doingpubkeypow' where status='findingpubkey' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='broadcastqueued' where status='broadcastpending' ''')
|
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
# Let's get rid of the first20bytesofencryptedmessage field in
|
|
||||||
# the inventory table.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
if int(self.cur.fetchall()[0][0]) == 2:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, removing an obsolete field from'
|
|
||||||
' the inventory table.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE inventory_backup'''
|
|
||||||
'''(hash blob, objecttype text, streamnumber int, payload blob,'''
|
|
||||||
''' receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE);''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO inventory_backup SELECT hash, objecttype, streamnumber, payload, receivedtime'''
|
|
||||||
''' FROM inventory;''')
|
|
||||||
self.cur.execute('''DROP TABLE inventory''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE inventory'''
|
|
||||||
''' (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer,'''
|
|
||||||
''' UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime'''
|
|
||||||
''' FROM inventory_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE inventory_backup;''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (3,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new column to the inventory table to store tags.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 1 or currentVersion == 3:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, adding tag field to'
|
|
||||||
' the inventory table.')
|
|
||||||
item = '''ALTER TABLE inventory ADD tag blob DEFAULT '' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (4,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new column to the pubkeys table to store the address version.
|
|
||||||
# We're going to trash all of our pubkeys and let them be redownloaded.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 4:
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int,'''
|
|
||||||
'''usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 'pubkey';''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (5,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new table: objectprocessorqueue with which to hold objects
|
|
||||||
# that have yet to be processed if the user shuts down Bitmessage.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 5:
|
|
||||||
self.cur.execute('''DROP TABLE knownnodes''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (6,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# changes related to protocol v3
|
|
||||||
# In table inventory and objectprocessorqueue, objecttype is now
|
|
||||||
# an integer (it was a human-friendly string previously)
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 6:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, dropping and recreating'
|
|
||||||
' the inventory table.')
|
|
||||||
self.cur.execute('''DROP TABLE inventory''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE inventory'''
|
|
||||||
''' (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer,'''
|
|
||||||
''' tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute('''DROP TABLE objectprocessorqueue''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (7,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
logger.debug(
|
|
||||||
'Finished dropping and recreating the inventory table.')
|
|
||||||
|
|
||||||
# The format of data stored in the pubkeys table has changed. Let's
|
|
||||||
# clear it, and the pubkeys from inventory, so that they'll
|
|
||||||
# be re-downloaded.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 7:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, clearing pubkeys table'
|
|
||||||
' because the data format has been updated.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 1;''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from pubkeys;''')
|
|
||||||
# Any sending messages for which we *thought* that we had
|
|
||||||
# the pubkey must be rechecked.
|
|
||||||
self.cur.execute(
|
|
||||||
'''UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey';''')
|
|
||||||
query = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (8,)
|
|
||||||
self.cur.execute(query, parameters)
|
|
||||||
logger.debug('Finished clearing currently held pubkeys.')
|
|
||||||
|
|
||||||
# Add a new column to the inbox table to store the hash of
|
|
||||||
# the message signature. We'll use this as temporary message UUID
|
|
||||||
# in order to detect duplicates.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 8:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, adding sighash field to'
|
|
||||||
' the inbox table.')
|
|
||||||
item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (9,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# We'll also need a `sleeptill` field and a `ttl` field. Also we
|
|
||||||
# can combine the pubkeyretrynumber and msgretrynumber into one.
|
|
||||||
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 9:
|
|
||||||
logger.info(
|
|
||||||
'In messages.dat database, making TTL-related changes:'
|
|
||||||
' combining the pubkeyretrynumber and msgretrynumber'
|
|
||||||
' fields into the retrynumber field and adding the'
|
|
||||||
' sleeptill and ttl fields...')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE sent_backup'''
|
|
||||||
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
|
|
||||||
''' ackdata blob, lastactiontime integer, status text, retrynumber integer,'''
|
|
||||||
''' folder text, encodingtype int)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO sent_backup SELECT msgid, toaddress, toripe, fromaddress,'''
|
|
||||||
''' subject, message, ackdata, lastactiontime,'''
|
|
||||||
''' status, 0, folder, encodingtype FROM sent;''')
|
|
||||||
self.cur.execute('''DROP TABLE sent''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE sent'''
|
|
||||||
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
|
|
||||||
''' ackdata blob, senttime integer, lastactiontime integer, sleeptill int, status text,'''
|
|
||||||
''' retrynumber integer, folder text, encodingtype int, ttl int)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO sent SELECT msgid, toaddress, toripe, fromaddress, subject, message, ackdata,'''
|
|
||||||
''' lastactiontime, lastactiontime, 0, status, 0, folder, encodingtype, 216000 FROM sent_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE sent_backup''')
|
|
||||||
logger.info('In messages.dat database, finished making TTL-related changes.')
|
|
||||||
logger.debug('In messages.dat database, adding address field to the pubkeys table.')
|
|
||||||
# We're going to have to calculate the address for each row in the pubkeys
|
|
||||||
# table. Then we can take out the hash field.
|
|
||||||
self.cur.execute('''ALTER TABLE pubkeys ADD address text DEFAULT '' ;''')
|
|
||||||
|
|
||||||
# replica for loop to update hashed address
|
|
||||||
self.cur.execute('''UPDATE pubkeys SET address=(enaddr(pubkeys.addressversion, 1, hash)); ''')
|
|
||||||
|
|
||||||
# Now we can remove the hash field from the pubkeys table.
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE pubkeys_backup'''
|
|
||||||
''' (address text, addressversion int, transmitdata blob, time int,'''
|
|
||||||
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys_backup'''
|
|
||||||
''' SELECT address, addressversion, transmitdata, time, usedpersonally FROM pubkeys;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys'''
|
|
||||||
''' (address text, addressversion int, transmitdata blob, time int, usedpersonally text,'''
|
|
||||||
''' UNIQUE(address) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys SELECT'''
|
|
||||||
''' address, addressversion, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys_backup''')
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, done adding address field to the pubkeys table'
|
|
||||||
' and removing the hash field.')
|
|
||||||
self.cur.execute('''update settings set value=10 WHERE key='version';''')
|
|
||||||
|
|
||||||
# Update the address colunm to unique in addressbook table
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 10:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, updating address column to UNIQUE'
|
|
||||||
' in the addressbook table.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''ALTER TABLE addressbook RENAME TO old_addressbook''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE addressbook'''
|
|
||||||
''' (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO addressbook SELECT label, address FROM old_addressbook;''')
|
|
||||||
self.cur.execute('''DROP TABLE old_addressbook''')
|
|
||||||
self.cur.execute('''update settings set value=11 WHERE key='version';''')
|
|
||||||
|
|
||||||
# Are you hoping to add a new option to the keys.dat file of existing
|
|
||||||
# Bitmessage users or modify the SQLite database? Add it right
|
|
||||||
# above this line!
|
|
||||||
|
|
||||||
|
def check_columns_can_store_binary_null(self):
|
||||||
|
"""
|
||||||
|
Check if sqlite can store binary zeros.
|
||||||
|
|||||||
|
"""
|
||||||
try:
|
try:
|
||||||
It may be helpful to have a common error handler or at least common error messages for errors. Or, perhaps a decorator would be helpful? It would wrap the whole method in a It may be helpful to have a common error handler or at least common error messages for errors. Or, perhaps a decorator would be helpful? It would wrap the whole method in a `try`/`except`, assuming we can refactor the rest of the code so that all the SQL operations can be done by dedicated methods.
|
|||||||
testpayload = '\x00\x00'
|
testpayload = '\x00\x00'
|
||||||
t = ('1234', 1, testpayload, '12345678', 'no')
|
t = ('1234', 1, testpayload, '12345678', 'no')
|
||||||
|
@ -443,28 +170,19 @@ class sqlThread(threading.Thread):
|
||||||
' but the problem you need to solve is related to SQLite.\n\n')
|
' but the problem you need to solve is related to SQLite.\n\n')
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
sqlThread().error_handler(err, 'null value test')
|
||||||
logger.fatal(
|
|
||||||
'(While null value test) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
else:
|
|
||||||
logger.error(err)
|
|
||||||
|
|
||||||
# Let us check to see the last time we vaccumed the messages.dat file.
|
def check_vacuum(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements,
|
||||||
# If it has been more than a month let's do it now.
|
# Redefinition-of-parameters-type-from-tuple-to-str, R0204, line-too-long, E501
|
||||||
item = '''SELECT value FROM settings WHERE key='lastvacuumtime';'''
|
"""
|
||||||
parameters = ''
|
Check vacuum and apply sql queries for different different conditions.
|
||||||
self.cur.execute(item, parameters)
|
Let us check to see the last time we vaccumed the messages.dat file.
|
||||||
|
If it has been more than a month let's do it now.
|
||||||
|
"""
|
||||||
|
|
||||||
|
query = "SELECT value FROM settings WHERE key='lastvacuumtime'"
|
||||||
|
parameters = ()
|
||||||
|
self.cur.execute(query, parameters)
|
||||||
queryreturn = self.cur.fetchall()
|
queryreturn = self.cur.fetchall()
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
value, = row
|
value, = row
|
||||||
|
@ -473,135 +191,92 @@ class sqlThread(threading.Thread):
|
||||||
try:
|
try:
|
||||||
self.cur.execute(''' VACUUM ''')
|
self.cur.execute(''' VACUUM ''')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
replace with replace with `error_handler`
`error_handler` is in `else` block.
the error handling should be unified, this here isn't a special case the error handling should be unified, this here isn't a special case
Added inside Added inside `error_handler`.
|
|||||||
if str(err) == 'database or disk is full':
|
sqlThread().error_handler(err, 'VACUUM')
|
||||||
logger.fatal(
|
query = "update settings set value=? WHERE key='lastvacuumtime'"
|
||||||
'(While VACUUM) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
item = '''update settings set value=? WHERE key='lastvacuumtime';'''
|
|
||||||
parameters = (int(time.time()),)
|
parameters = (int(time.time()),)
|
||||||
self.cur.execute(item, parameters)
|
self.cur.execute(query, parameters)
|
||||||
|
|
||||||
helper_sql.sql_ready.set()
|
def upgrade_config_parser_setting_version(self, settingsversion):
|
||||||
|
"""
|
||||||
|
Upgrade schema with respect setting version
|
||||||
|
"""
|
||||||
|
|
||||||
while True:
|
self.initialize_sql("config_setting_ver_{}".format(settingsversion))
|
||||||
item = helper_sql.sqlSubmitQueue.get()
|
|
||||||
this should be split into multiple methods this should be split into multiple methods
|
|||||||
if item == 'commit':
|
def initialize_schema(self):
|
||||||
|
"""
|
||||||
|
Initialize DB schema
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
|
inbox_exists = list(self.cur.execute('PRAGMA table_info(inbox)'))
|
||||||
|
if not inbox_exists:
|
||||||
|
self.initialize_sql("initialize_schema")
|
||||||
instead of instead of `custom_error` maybe `command`
|
|||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
logger.info('Created messages database file')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'table inbox already exists':
|
||||||
We should also handle We should also handle `FileNotFoundError`, then log debug that the file is missing, and return.
I have checked that
or
And I have checked that `FileNotFoundError` is for `python3`.
or we can do like
```
if six.PY2:
FileNotFoundError = IOError
```
or
```
try:
FileNotFoundError
except NameError:
# py2
FileNotFoundError = IOError
```
And `IOError` raises `IOError(2, 'No such file or directory')`
we seem to be missing a sqlite error handler. This should also log, but stronger severity (e.g. we seem to be missing a sqlite error handler. This should also log, but stronger severity (e.g. `error`), not `debug`
`return False`
`return False` should be after the `try/except` block so that it's consistent (returns `True` if no exception, otherwise returns `False`)
|
|||||||
logger.fatal(
|
logger.debug('Database file already exists.')
|
||||||
these sections should also be in a separate function/method these sections should also be in a separate function/method
I don't know if there needs to be a variable `command, why not just use the string directly? I don't know if there needs to be a variable `command, why not just use the string directly?
|
|||||||
'(While committing) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
elif item == 'exit':
|
|
||||||
self.conn.close()
|
|
||||||
logger.info('sqlThread exiting gracefully.')
|
|
||||||
|
|
||||||
return
|
|
||||||
elif item == 'movemessagstoprog':
|
|
||||||
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.conn.commit()
|
|
||||||
except Exception as err:
|
|
||||||
if str(err) == 'database or disk is full':
|
|
||||||
logger.fatal(
|
|
||||||
'(while movemessagstoprog) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
self.conn.close()
|
|
||||||
shutil.move(
|
|
||||||
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
|
|
||||||
self.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
|
|
||||||
self.conn.text_factory = str
|
|
||||||
self.cur = self.conn.cursor()
|
|
||||||
elif item == 'movemessagstoappdata':
|
|
||||||
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.conn.commit()
|
|
||||||
except Exception as err:
|
|
||||||
if str(err) == 'database or disk is full':
|
|
||||||
logger.fatal(
|
|
||||||
'(while movemessagstoappdata) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
self.conn.close()
|
|
||||||
shutil.move(
|
|
||||||
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
|
|
||||||
self.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
|
|
||||||
self.conn.text_factory = str
|
|
||||||
self.cur = self.conn.cursor()
|
|
||||||
elif item == 'deleteandvacuume':
|
|
||||||
self.cur.execute('''delete from inbox where folder='trash' ''')
|
|
||||||
self.cur.execute('''delete from sent where folder='trash' ''')
|
|
||||||
self.conn.commit()
|
|
||||||
try:
|
|
||||||
self.cur.execute(''' VACUUM ''')
|
|
||||||
except Exception as err:
|
|
||||||
if str(err) == 'database or disk is full':
|
|
||||||
logger.fatal(
|
|
||||||
'(while deleteandvacuume) Alert: Your disk or data storage volume is full.'
|
|
||||||
' sqlThread will now exit.')
|
|
||||||
queues.UISignalQueue.put((
|
|
||||||
'alert', (
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
"Disk full"),
|
|
||||||
_translate(
|
|
||||||
"MainWindow",
|
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
|
||||||
True)))
|
|
||||||
os._exit(0)
|
|
||||||
else:
|
else:
|
||||||
parameters = helper_sql.sqlSubmitQueue.get()
|
os._exit(
|
||||||
rowcount = 0
|
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
||||||
|
|
||||||
|
def create_sql_function(self):
|
||||||
|
"""
|
||||||
|
Apply create_function to DB
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.cur.execute(item, parameters)
|
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
|
||||||
rowcount = self.cur.rowcount
|
except (TypeError, sqlite3.NotSupportedError) as err:
|
||||||
except Exception as err:
|
logger.debug(
|
||||||
we should have a test of what happens if there is legacy string in the database when retrieving the value we should have a test of what happens if there is legacy string in the database when retrieving the value
|
|||||||
|
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
|
||||||
|
self.conn.create_function("enaddr", 3, encodeAddress)
|
||||||
|
|
||||||
|
|
||||||
|
class sqlThread(threading.Thread):
|
||||||
|
"""A thread for all SQL operations"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
threading.Thread.__init__(self, name="SQL")
|
||||||
|
self.db = None
|
||||||
|
self.max_setting_level = 4
|
||||||
|
self.rowcount = 0
|
||||||
|
logger.debug('Init thread in sqlthread')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sql_config_settings_version(self):
|
||||||
|
""" Getter for BMConfigParser (obj) """
|
||||||
|
|
||||||
|
return config.getint(
|
||||||
|
'bitmessagesettings', 'settingsversion')
|
||||||
|
|
||||||
|
@sql_config_settings_version.setter
|
||||||
|
def sql_config_settings_version(self, settingsversion): # pylint: disable=R0201, no-self-use
|
||||||
|
# Setter for BmConfigparser
|
||||||
|
|
||||||
|
config.set(
|
||||||
|
'bitmessagesettings', 'settingsversion', str(int(settingsversion) + 1))
|
||||||
|
return config.save()
|
||||||
|
|
||||||
|
def upgrade_config_setting_version(self):
|
||||||
|
"""
|
||||||
|
upgrade config parser setting version.
|
||||||
|
If the settings version is equal to 2 or 3 then the
|
||||||
|
sqlThread will modify the pubkeys table and change
|
||||||
|
the settings version to 4.
|
||||||
|
"""
|
||||||
|
while self.sql_config_settings_version < self.max_setting_level:
|
||||||
|
self.db.upgrade_config_parser_setting_version(self.sql_config_settings_version)
|
||||||
|
self.sql_config_settings_version = self.sql_config_settings_version
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def error_handler(err, command, query=None, parameters=None):
|
||||||
|
"""
|
||||||
|
Common error handler
|
||||||
|
"""
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
'(while cur.execute) Alert: Your disk or data storage volume is full.'
|
"(While %s) Alert: Your disk or data storage volume is full. sqlThread will now exit.", command
|
||||||
' sqlThread will now exit.')
|
)
|
||||||
queues.UISignalQueue.put((
|
queues.UISignalQueue.put((
|
||||||
'alert', (
|
'alert', (
|
||||||
_translate(
|
_translate(
|
||||||
|
@ -611,7 +286,6 @@ class sqlThread(threading.Thread):
|
||||||
"MainWindow",
|
"MainWindow",
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
||||||
True)))
|
True)))
|
||||||
os._exit(0)
|
|
||||||
else:
|
else:
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
'Major error occurred when trying to execute a SQL statement within the sqlThread.'
|
'Major error occurred when trying to execute a SQL statement within the sqlThread.'
|
||||||
|
@ -620,21 +294,144 @@ class sqlThread(threading.Thread):
|
||||||
' you might want to censor this data with asterisks (***)'
|
' you might want to censor this data with asterisks (***)'
|
||||||
' as it can contain private information: %s.'
|
' as it can contain private information: %s.'
|
||||||
' Here is the actual error message thrown by the sqlThread: %s',
|
' Here is the actual error message thrown by the sqlThread: %s',
|
||||||
str(item),
|
str(query),
|
||||||
str(repr(parameters)),
|
str(repr(parameters)),
|
||||||
str(err))
|
str(err))
|
||||||
logger.fatal('This program shall now abruptly exit!')
|
logger.fatal('This program shall now abruptly exit!')
|
||||||
|
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
helper_sql.sqlReturnQueue.put((self.cur.fetchall(), rowcount))
|
def is_query_commit(self):
|
||||||
# helper_sql.sqlSubmitQueue.task_done()
|
"""
|
||||||
|
When query == 'commit'
|
||||||
def create_function(self):
|
"""
|
||||||
# create_function
|
|
||||||
try:
|
try:
|
||||||
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
|
self.db.conn.commit()
|
||||||
except (TypeError, sqlite3.NotSupportedError) as err:
|
except Exception as err:
|
||||||
logger.debug(
|
self.error_handler(err, 'committing')
|
||||||
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
|
|
||||||
self.conn.create_function("enaddr", 3, encodeAddress)
|
def is_query_movemessagstoprog(self):
|
||||||
|
"""
|
||||||
|
When query == 'movemessagstoprogs'
|
||||||
|
"""
|
||||||
|
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
|
||||||
|
try:
|
||||||
|
self.db.conn.commit()
|
||||||
|
except Exception as err:
|
||||||
|
self.error_handler(err, 'movemessagstoprog')
|
||||||
|
self.db.conn.close()
|
||||||
|
shutil.move(
|
||||||
|
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
|
||||||
|
self.db.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
|
||||||
|
self.db.conn.text_factory = str
|
||||||
|
self.db.cur = self.db.conn.cursor()
|
||||||
|
|
||||||
|
def is_query_deleteandvacuume(self):
|
||||||
|
"""
|
||||||
|
When query == 'deleteandvacuume'
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.db.cur.execute(''' VACUUM ''')
|
||||||
|
except Exception as err:
|
||||||
|
self.error_handler(err, 'deleteandvacuume')
|
||||||
|
self.db.cur.execute('''delete from inbox where folder='trash' ''')
|
||||||
|
self.db.cur.execute('''delete from sent where folder='trash' ''')
|
||||||
|
self.db.conn.commit()
|
||||||
|
|
||||||
|
def is_query_other(self, query):
|
||||||
|
"""
|
||||||
|
When the query can be default or other '
|
||||||
|
"""
|
||||||
|
parameters = helper_sql.sqlSubmitQueue.get()
|
||||||
|
try:
|
||||||
|
self.db.cur.execute(query, parameters)
|
||||||
|
self.rowcount = self.db.cur.rowcount
|
||||||
|
return self.rowcount
|
||||||
|
except Exception as err:
|
||||||
|
self.error_handler(err, 'cur.execute', query, parameters)
|
||||||
|
|
||||||
|
def is_query_movemessagestoappdata(self):
|
||||||
|
"""
|
||||||
|
When query == 'movemessagestoappdata'
|
||||||
|
"""
|
||||||
|
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
|
||||||
|
try:
|
||||||
|
self.db.conn.commit()
|
||||||
|
except Exception as err:
|
||||||
|
self.error_handler(err, 'movemessagstoappdata')
|
||||||
|
self.db.conn.close()
|
||||||
|
shutil.move(
|
||||||
|
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
|
||||||
|
self.db.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
|
||||||
|
self.db.conn.text_factory = str
|
||||||
|
self.db.cur = self.db.conn.cursor()
|
||||||
|
|
||||||
|
def is_query_exit(self):
|
||||||
|
"""
|
||||||
|
When query == 'exit'
|
||||||
|
"""
|
||||||
|
self.db.conn.close()
|
||||||
|
logger.info('sqlThread exiting gracefully.')
|
||||||
|
|
||||||
|
def loop_queue(self):
|
||||||
|
"""
|
||||||
|
Looping queue and process them
|
||||||
|
"""
|
||||||
|
query = helper_sql.sqlSubmitQueue.get()
|
||||||
|
if query == 'commit':
|
||||||
|
self.is_query_commit()
|
||||||
|
elif query == 'exit':
|
||||||
|
self.is_query_exit()
|
||||||
|
return False
|
||||||
|
elif query == 'movemessagstoprog':
|
||||||
|
self.is_query_movemessagstoprog()
|
||||||
|
elif query == 'movemessagstoappdata':
|
||||||
|
self.is_query_movemessagestoappdata()
|
||||||
|
elif query == 'deleteandvacuume':
|
||||||
|
self.is_query_deleteandvacuume()
|
||||||
|
else:
|
||||||
|
self.rowcount = self.is_query_other(query)
|
||||||
|
helper_sql.sqlReturnQueue.put((self.db.cur.fetchall(), self.rowcount))
|
||||||
|
return True
|
||||||
|
|
||||||
|
def run(self): # pylint: disable=R0204, E501
|
||||||
|
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
|
||||||
|
|
||||||
|
logger.info('Init thread in sqlthread')
|
||||||
|
# pylint: disable=redefined-variable-type
|
||||||
|
if state.testmode:
|
||||||
|
self.db = TestDB()
|
||||||
|
else:
|
||||||
|
self.db = BitmessageDB()
|
||||||
|
helper_sql.sql_available = True
|
||||||
|
|
||||||
|
config_ready.wait()
|
||||||
|
|
||||||
|
self.db.create_sql_function()
|
||||||
|
|
||||||
|
self.db.initialize_schema()
|
||||||
|
|
||||||
|
self.upgrade_config_setting_version()
|
||||||
|
|
||||||
|
helper_startup.updateConfig()
|
||||||
|
|
||||||
|
self.db.upgrade_schema_if_old_version()
|
||||||
|
|
||||||
|
self.db.upgrade_to_latest()
|
||||||
|
|
||||||
|
self.db.check_columns_can_store_binary_null()
|
||||||
|
|
||||||
|
self.db.check_vacuum()
|
||||||
|
|
||||||
|
helper_sql.sql_ready.set()
|
||||||
|
|
||||||
|
while self.loop_queue():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TestDB(BitmessageDB):
|
||||||
|
"""
|
||||||
|
Database connection build for test e
|
||||||
|
"""
|
||||||
|
def _connection_build(self):
|
||||||
|
self._connection_build_internal("memory", True)
|
||||||
|
return self.conn, self.cur
|
||||||
|
|
|
@ -23,7 +23,7 @@ import helper_addressbook
|
||||||
|
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from helper_msgcoding import MsgEncode, MsgDecode
|
from helper_msgcoding import MsgEncode, MsgDecode
|
||||||
from helper_sql import sqlQuery
|
from helper_sql import sqlQuery, sqlExecute
|
||||||
from network import asyncore_pollchoose as asyncore, knownnodes
|
from network import asyncore_pollchoose as asyncore, knownnodes
|
||||||
from network.bmproto import BMProto
|
from network.bmproto import BMProto
|
||||||
from network.connectionpool import BMConnectionPool
|
from network.connectionpool import BMConnectionPool
|
||||||
|
@ -412,6 +412,22 @@ class TestCore(unittest.TestCase):
|
||||||
self.delete_address_from_addressbook(address1)
|
self.delete_address_from_addressbook(address1)
|
||||||
self.delete_address_from_addressbook(address2)
|
self.delete_address_from_addressbook(address2)
|
||||||
|
|
||||||
|
def test_sqlscripts(self):
|
||||||
|
""" Test sql statements"""
|
||||||
|
|
||||||
|
sqlExecute('create table if not exists testtbl (id integer)')
|
||||||
|
tables = list(sqlQuery("select name from sqlite_master where type is 'table'"))
|
||||||
|
res = [item for item in tables if 'testtbl' in item]
|
||||||
|
self.assertEqual(res[0][0], 'testtbl')
|
||||||
|
|
||||||
|
queryreturn = sqlExecute("INSERT INTO testtbl VALUES(101);")
|
||||||
|
self.assertEqual(queryreturn, 1)
|
||||||
|
|
||||||
|
queryreturn = sqlQuery('''SELECT * FROM testtbl''')
|
||||||
|
self.assertEqual(queryreturn[0][0], 101)
|
||||||
|
|
||||||
|
sqlQuery("DROP TABLE testtbl")
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
"""Starts all tests intended for core run"""
|
"""Starts all tests intended for core run"""
|
||||||
|
|
0
src/tests/sql/__init__.py
Normal file
|
@ -2,43 +2,221 @@
|
||||||
# flake8: noqa:E402
|
# flake8: noqa:E402
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from .common import skip_python3
|
|
||||||
|
|
||||||
skip_python3()
|
|
||||||
|
|
||||||
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
|
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
|
||||||
|
|
||||||
from pybitmessage.helper_sql import (
|
from pybitmessage.class_sqlThread import TestDB # noqa:E402
|
||||||
sqlQuery, sql_ready, sqlStoredProcedure) # noqa:E402
|
|
||||||
from pybitmessage.class_sqlThread import sqlThread # noqa:E402
|
|
||||||
from pybitmessage.addresses import encodeAddress # noqa:E402
|
from pybitmessage.addresses import encodeAddress # noqa:E402
|
||||||
|
|
||||||
|
|
||||||
class TestSqlThread(unittest.TestCase):
|
def filter_table_column(schema, column):
|
||||||
"""Test case for SQL thread"""
|
"""
|
||||||
|
Filter column from schema
|
||||||
|
"""
|
||||||
|
for x in schema:
|
||||||
|
for y in x:
|
||||||
|
if y == column:
|
||||||
|
yield y
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls):
|
|
||||||
# Start SQL thread
|
|
||||||
sqlLookup = sqlThread()
|
|
||||||
sqlLookup.daemon = True
|
|
||||||
sqlLookup.start()
|
|
||||||
sql_ready.wait()
|
|
||||||
|
|
||||||
@classmethod
|
class TestSqlBase(object): # pylint: disable=E1101, too-few-public-methods, E1004, W0232
|
||||||
def tearDownClass(cls):
|
""" Base for test case """
|
||||||
sqlStoredProcedure('exit')
|
|
||||||
for thread in threading.enumerate():
|
__name__ = None
|
||||||
if thread.name == "SQL":
|
root_path = os.path.dirname(os.path.dirname(__file__))
|
||||||
thread.join()
|
test_db = None
|
||||||
|
|
||||||
|
def _setup_db(self): # pylint: disable=W0622, redefined-builtin
|
||||||
|
"""
|
||||||
|
Drop all tables before each test case start
|
||||||
|
"""
|
||||||
|
self.test_db = TestDB()
|
||||||
|
self.test_db.create_sql_function()
|
||||||
|
self.test_db.initialize_schema()
|
||||||
|
|
||||||
|
def initialise_database(self, test_db_cur, file): # pylint: disable=W0622, redefined-builtin
|
||||||
|
"""
|
||||||
|
Initialise DB
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(os.path.join(self.root_path, "tests/sql/{}.sql".format(file)), 'r') as sql_as_string:
|
||||||
|
sql_as_string = sql_as_string.read()
|
||||||
|
|
||||||
|
test_db_cur.cur.executescript(sql_as_string)
|
||||||
|
|
||||||
|
|
||||||
|
class TestFnBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
|
||||||
|
""" Test case for Sql function"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""
|
||||||
|
setup for test case
|
||||||
|
"""
|
||||||
|
self._setup_db()
|
||||||
|
|
||||||
def test_create_function(self):
|
def test_create_function(self):
|
||||||
"""Check the result of enaddr function"""
|
"""Check the result of enaddr function"""
|
||||||
encoded_str = encodeAddress(4, 1, "21122112211221122112")
|
st = "21122112211221122112".encode()
|
||||||
|
encoded_str = encodeAddress(4, 1, st)
|
||||||
|
|
||||||
query = sqlQuery('SELECT enaddr(4, 1, "21122112211221122112")')
|
item = '''SELECT enaddr(4, 1, ?);'''
|
||||||
self.assertEqual(
|
parameters = (st, )
|
||||||
query[0][-1], encoded_str, "test case fail for create_function")
|
self.test_db.cur.execute(item, parameters)
|
||||||
|
query = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(query[0][-1], encoded_str, "test case fail for create_function")
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpgradeBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
|
||||||
|
"""Test case for SQL versions"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""
|
||||||
|
Setup DB schema before start.
|
||||||
|
And applying default schema for version test.
|
||||||
|
"""
|
||||||
|
self._setup_db()
|
||||||
|
self.test_db.cur.execute('''INSERT INTO settings VALUES('version','2')''')
|
||||||
|
|
||||||
|
def version(self):
|
||||||
|
"""
|
||||||
|
Run SQL Scripts, Initialize DB with respect to versioning
|
||||||
|
and Upgrade DB schema for all versions
|
||||||
|
"""
|
||||||
|
def wrapper(*args):
|
||||||
|
"""
|
||||||
|
Run SQL and mocking DB for versions
|
||||||
|
"""
|
||||||
|
self = args[0]
|
||||||
|
func_name = func.__name__
|
||||||
|
version = func_name.rsplit('_', 1)[-1]
|
||||||
|
self.test_db._upgrade_one_level_sql_statement(int(version)) # pylint: disable= W0212, protected-access
|
||||||
|
|
||||||
|
# Update versions DB mocking
|
||||||
|
self.initialise_database(self.test_db, "init_version_{}".format(version))
|
||||||
|
|
||||||
|
return func(*args) # <-- use (self, ...)
|
||||||
|
func = self
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_2(self):
|
||||||
|
"""
|
||||||
|
Test with version 2
|
||||||
|
"""
|
||||||
|
|
||||||
|
res = self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
|
||||||
|
WHERE type='table' AND name='inventory_backup' ''')
|
||||||
|
self.assertNotEqual(res, 1, "Table inventory_backup not deleted in versioning 2")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_3(self):
|
||||||
|
"""
|
||||||
|
Test with version 1
|
||||||
|
Version 1 and 3 are same so will skip 3
|
||||||
|
"""
|
||||||
|
|
||||||
|
res = self.test_db.cur.execute('''PRAGMA table_info('inventory');''')
|
||||||
|
result = list(filter_table_column(res, "tag"))
|
||||||
|
self.assertEqual(result, ['tag'], "Data not migrated for version 3")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_4(self):
|
||||||
|
"""
|
||||||
|
Test with version 4
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute("select * from pubkeys where addressversion = '1';")
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(len(res), 1, "Table inventory not deleted in versioning 4")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_5(self):
|
||||||
|
"""
|
||||||
|
Test with version 5
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='knownnodes' ''') # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(res[0][0], 1, "Table knownnodes not deleted in versioning 5")
|
||||||
|
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
|
||||||
|
WHERE type='table' AND name='objectprocessorqueue'; ''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5")
|
||||||
|
self.test_db.cur.execute(''' SELECT * FROM objectprocessorqueue where objecttype='hash' ; ''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_6(self):
|
||||||
|
"""
|
||||||
|
Test with version 6
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('inventory');''')
|
||||||
|
inventory = self.test_db.cur.fetchall()
|
||||||
|
inventory = list(filter_table_column(inventory, "expirestime"))
|
||||||
|
self.assertEqual(inventory, ['expirestime'], "Data not migrated for version 6")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('objectprocessorqueue');''')
|
||||||
|
objectprocessorqueue = self.test_db.cur.fetchall()
|
||||||
|
objectprocessorqueue = list(filter_table_column(objectprocessorqueue, "objecttype"))
|
||||||
|
self.assertEqual(objectprocessorqueue, ['objecttype'], "Data not migrated for version 6")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_7(self):
|
||||||
|
"""
|
||||||
|
Test with version 7
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT * FROM pubkeys ''')
|
||||||
|
pubkeys = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(pubkeys, [], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT * FROM inventory ''')
|
||||||
|
inventory = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(inventory, [], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT status FROM sent ''')
|
||||||
|
sent = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(sent, [('msgqueued',), ('msgqueued',)], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_8(self):
|
||||||
|
"""
|
||||||
|
Test with version 8
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('inbox');''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
result = list(filter_table_column(res, "sighash"))
|
||||||
|
self.assertEqual(result, ['sighash'], "Data not migrated for version 8")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_9(self):
|
||||||
|
"""
|
||||||
|
Test with version 9
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='pubkeys_backup'") # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(res[0][0], 1, "Table pubkeys_backup not deleted")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_10(self):
|
||||||
|
"""
|
||||||
|
Test with version 10
|
||||||
|
"""
|
||||||
|
|
||||||
|
label = "test"
|
||||||
|
self.test_db.cur.execute("SELECT * FROM addressbook WHERE label='test' ") # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(res[0][0], label, "Data not migrated for version 10")
|
||||||
|
|
||||||
|
def test_bm_db_version_type(self):
|
||||||
|
"""
|
||||||
|
Test version type
|
||||||
|
"""
|
||||||
|
self.test_db.cur.execute('''INSERT INTO settings VALUES('version','test_string')''') # noqa
|
||||||
|
res = self.test_db.sql_schema_version
|
||||||
|
self.assertEqual(type(res), int)
|
||||||
|
|
return True