Refactor sqlthread #1794
|
@ -2,10 +2,10 @@
|
||||||
sqlThread is defined here
|
sqlThread is defined here
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil # used for moving the messages.dat file
|
import shutil # used for moving the messages.dat file
|
||||||
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
import sys
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -17,128 +17,100 @@ try:
|
||||||
import state
|
import state
|
||||||
this should contain an argument that allows to use an in memory database instead of the file. this should contain an argument that allows to use an in memory database instead of the file.
here allow to call it optionally with a parameter for in memory database here allow to call it optionally with a parameter for in memory database
this should be in parent class this should be in parent class
only this should be in this class, the other functionality should be refactored into methods and put into parent. only this should be in this class, the other functionality should be refactored into methods and put into parent.
maybe an maybe an `UpgradeDB` object would be an attribute of the `sqlThread` rarther than `sqlThread` inheriting it. That way we isolate the DB stuff and can test it without threading, and the thread would need to use the same interfaces as the test.
Done Done
pass arguments with DB file name pass arguments with DB file name
Inherit connection vars from UpgradeDB class Inherit connection vars from UpgradeDB class
Changes done, Changes done,
Refactored code and kept in Upgrade DB,
done done
Moved in to Moved in to ```__init__```
strange formatting strange formatting
No need for semicolon No need for semicolon
Why are you introducing more pylint warnings here? Why are you introducing more pylint warnings here?
I think I think `logger.debug('Error while trying to initialize...', exc_info=True)` is enough here. No need for `sys.stderr.write()` and err.
It doesn't look like the method docstring It doesn't look like the method docstring
There was a problem, we found out what it is, will be fixed in next commit. There was a problem, we found out what it is, will be fixed in next commit.
|
|||||||
from addresses import encodeAddress
|
from addresses import encodeAddress
|
||||||
from bmconfigparser import config, config_ready
|
from bmconfigparser import config, config_ready
|
||||||
from debug import logger
|
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from . import helper_sql, helper_startup, paths, queues, state
|
from . import helper_sql, helper_startup, paths, queues, state
|
||||||
from .addresses import encodeAddress
|
from .addresses import encodeAddress
|
||||||
from .bmconfigparser import config, config_ready
|
from .bmconfigparser import config, config_ready
|
||||||
from .debug import logger
|
|
||||||
from .tr import _translate
|
from .tr import _translate
|
||||||
```
def connection_build(self, file_name='messages.dat'):
```
instead of instead of `current_level` use `ignore`
variable variable `self.current_level` shouldn't be needed
just just
`self._connection_build('messages.dat')`
instead of return, just
and then in instead of return, just
```
self.conn = conn
self.cur = cur
```
and then in `MockDB` just override `__init__` to call `self._connection_build(':memory:')
Then you can remove all ifs here
removed current_level removed current_level
Now its
Now its
``` @sql_schema_version.setter
def sql_schema_version(self):
"""
Update version with one level
"""
item = '''update settings set value=value+1 WHERE key='version';'''
self.cur.execute(item)
self._current_level = self.__get_current_settings_version()
```
Done Done
make it a class variable make it a class variable
```
self._connection_build()
```
no static method no static method
no return, just directly assign the no return, just directly assign the `self.con` and `self.cur` here
also declare
also declare
```
self.cur = None
self.con = None
```
rename to
rename to
```
def __connection_build_internal(self, file_name="messages.dat")
```
this should be internal to the object this should be internal to the object
get rid of get rid of `self._current_level`
no need for caching, this is uses only during startup and tests, and only a dozen times, no need to optimize, just use the property no need for caching, this is uses only during startup and tests, and only a dozen times, no need to optimize, just use the property
applied changes method name to applied changes method name to ```__connection_build_internal``` and pass file_name
Its changed into
Because its not taking Its changed into
``` item = '''update settings set value=? WHERE key='version';'''
self.cur.execute(item, self.__get_current_settings_version() + 1)
```
Because its not taking ```value+1```
Changed Changed
``` def __init__(self, db_name="messages.dat"):
self._current_level = None
self.max_level = 11
self.conn = None
self.cur = None
self.__connection_build_internal(db_name)
def __connection_build_internal(self, file_name="messages.dat"):
"""
Stablish SQL connection
"""
if file_name == "memory":
self.conn = sqlite3.connect(':memory:')
else:
self.conn = sqlite3.connect(state.appdata + file_name)
self.conn.text_factory = str
self.cur = self.conn.cursor()
```
Done Done
Done Done
Removed static method Removed static method
Done Done
Done Done
Done Done
```
def __connection_build_internal(self, file_name='messages.dat', memory=False)
if memory:
self.conn = sqlite3.connect(':memory:')
else
self.conn = sqlite3.connect(os.path.join(state.appdata, file_name))
```
use getter use getter
and create an empty sql script for level 1 (one that doesn't change anything), like ```
increment = True
while self.settings_version < self.max_level:
self._upgrade()
self.settings_version = increment
```
and create an empty sql script for level 1 (one that doesn't change anything), like `SELECT NULL`
`os.path.join`
not not `upgrade`, `initialize`
also also `initialize` instead of `upgrade`
don't exit, log an error message don't exit, log an error message
|
|||||||
|
|
||||||
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
class sqlThread(threading.Thread):
|
|
||||||
"""A thread for all SQL operations"""
|
class BitmessageDB(object):
|
||||||
|
""" Upgrade Db with respect to versions """
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
threading.Thread.__init__(self, name="SQL")
|
self._current_level = None
|
||||||
|
self.max_level = 11
|
||||||
|
self.conn = None
|
||||||
|
self.cur = None
|
||||||
|
self._connection_build()
|
||||||
|
self.root_path = os.path.dirname(os.path.dirname(__file__))
|
||||||
|
|
||||||
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
|
def _connection_build(self):
|
||||||
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
|
self._connection_build_internal('messages.dat', False)
|
||||||
helper_sql.sql_available = True
|
|
||||||
config_ready.wait()
|
def _connection_build_internal(self, file_name="messages.dat", memory=False):
|
||||||
self.conn = sqlite3.connect(state.appdata + 'messages.dat')
|
"""
|
||||||
|
Stablish SQL connection
|
||||||
|
"""
|
||||||
|
if memory:
|
||||||
|
self.conn = sqlite3.connect(':memory:')
|
||||||
|
else:
|
||||||
|
self.conn = sqlite3.connect(os.path.join(state.appdata + file_name))
|
||||||
self.conn.text_factory = str
|
self.conn.text_factory = str
|
||||||
self.cur = self.conn.cursor()
|
self.cur = self.conn.cursor()
|
||||||
|
|
||||||
self.cur.execute('PRAGMA secure_delete = true')
|
self.cur.execute('PRAGMA secure_delete = true')
|
||||||
|
|
||||||
# call create_function for encode address
|
def __get_current_settings_version(self):
|
||||||
self.create_function()
|
"""
|
||||||
|
Get current setting Version
|
||||||
|
"""
|
||||||
|
query = "SELECT value FROM settings WHERE key='version'"
|
||||||
|
parameters = ()
|
||||||
|
self.cur.execute(query, parameters)
|
||||||
|
return int(self.cur.fetchall()[0][0])
|
||||||
|
|
||||||
|
def _upgrade_one_level_sql_statement(self, file_name):
|
||||||
|
"""
|
||||||
|
Upgrade database versions with applying sql scripts
|
||||||
|
"""
|
||||||
|
self.initialise_sql("init_version_{}".format(file_name))
|
||||||
|
|
||||||
|
def initialise_sql(self, file_name):
|
||||||
try:
|
try:
|
||||||
add a new method
add a new method
```
def _connection_build(self)
self.__connection_build_internal(self, 'messages.dat')
````
```
def _connection_build(self, db_name='messages.dat'):
self.__connection_build_internal(file_name=db_name)
```
Done Done
|
|||||||
self.cur.execute(
|
with open(os.path.join(self.root_path, "pybitmessage/sql/{}.sql".format(file_name))) as sql_file:
|
||||||
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text,'''
|
sql_as_string = sql_file.read()
|
||||||
''' received text, message text, folder text, encodingtype int, read bool, sighash blob,'''
|
self.cur.executescript(sql_as_string)
|
||||||
''' UNIQUE(msgid) ON CONFLICT REPLACE)''')
|
except IOError as err:
|
||||||
self.cur.execute(
|
logger.debug(
|
||||||
'''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text,'''
|
'ERROR trying to initialize database. Error message: %s\n', str(err))
|
||||||
''' message text, ackdata blob, senttime integer, lastactiontime integer,'''
|
|
||||||
''' sleeptill integer, status text, retrynumber integer, folder text, encodingtype int, ttl int)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE subscriptions (label text, address text, enabled bool)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE addressbook (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE blacklist (label text, address text, enabled bool)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE whitelist (label text, address text, enabled bool)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys (address text, addressversion int, transmitdata blob, time int,'''
|
|
||||||
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob,'''
|
|
||||||
''' expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO subscriptions VALUES'''
|
|
||||||
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('version','11')''')
|
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
|
||||||
int(time.time()),))
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
self.conn.commit()
|
|
||||||
logger.info('Created messages database file')
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'table inbox already exists':
|
logger.debug(
|
||||||
logger.debug('Database file already exists.')
|
'ERROR trying to initialize database. Error message: %s\n', str(err))
|
||||||
```@getter something```
Already using getter for the same Already using getter for the same
|
|||||||
|
|
||||||
else:
|
@property
|
||||||
sys.stderr.write(
|
def sql_schema_version(self):
|
||||||
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
"""
|
||||||
os._exit(0)
|
Getter for get current schema version
|
||||||
|
"""
|
||||||
this should remain here this should remain here
|
|||||||
|
return self.__get_current_settings_version()
|
||||||
|
|
||||||
# If the settings version is equal to 2 or 3 then the
|
@sql_schema_version.setter
|
||||||
To make life easier, this should remain. To make life easier, this should remain.
|
|||||||
# sqlThread will modify the pubkeys table and change
|
def sql_schema_version(self, setter):
|
||||||
# the settings version to 4.
|
"""
|
||||||
settingsversion = config.getint(
|
Update version with one level
|
||||||
'bitmessagesettings', 'settingsversion')
|
"""
|
||||||
|
if setter:
|
||||||
|
query = "UPDATE settings SET value=CAST(value AS INT) + 1 WHERE key = 'version'"
|
||||||
|
self.cur.execute(query)
|
||||||
|
|
||||||
# People running earlier versions of PyBitmessage do not have the
|
def upgrade_to_latest(self):
|
||||||
# usedpersonally field in their pubkeys table. Let's add it.
|
"""
|
||||||
if settingsversion == 2:
|
Initialise upgrade level
|
||||||
item = '''ALTER TABLE pubkeys ADD usedpersonally text DEFAULT 'no' '''
|
"""
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
settingsversion = 3
|
while self.sql_schema_version < self.max_level:
|
||||||
|
self._upgrade_one_level_sql_statement(self.sql_schema_version)
|
||||||
|
self.sql_schema_version = True
|
||||||
|
|
||||||
# People running earlier versions of PyBitmessage do not have the
|
def upgrade_schema_if_old_version(self):
|
||||||
# encodingtype field in their inbox and sent tables or the read field
|
""" check settings table exists """
|
||||||
# in the inbox table. Let's add them.
|
|
||||||
if settingsversion == 3:
|
|
||||||
item = '''ALTER TABLE inbox ADD encodingtype int DEFAULT '2' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
item = '''ALTER TABLE inbox ADD read bool DEFAULT '1' '''
|
query = "SELECT name FROM sqlite_master WHERE type='table' AND name='settings'"
|
||||||
parameters = ''
|
parameters = ()
|
||||||
self.cur.execute(item, parameters)
|
self.cur.execute(query, parameters)
|
||||||
|
|
||||||
item = '''ALTER TABLE sent ADD encodingtype int DEFAULT '2' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
settingsversion = 4
|
|
||||||
|
|
||||||
config.set(
|
|
||||||
'bitmessagesettings', 'settingsversion', str(settingsversion))
|
|
||||||
config.save()
|
|
||||||
|
|
||||||
helper_startup.updateConfig()
|
|
||||||
|
|
||||||
# From now on, let us keep a 'version' embedded in the messages.dat
|
|
||||||
# file so that when we make changes to the database, the database
|
|
||||||
# version we are on can stay embedded in the messages.dat file. Let us
|
|
||||||
# check to see if the settings table exists yet.
|
|
||||||
item = '''SELECT name FROM sqlite_master WHERE type='table' AND name='settings';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
if self.cur.fetchall() == []:
|
if self.cur.fetchall() == []:
|
||||||
# The settings table doesn't exist. We need to make it.
|
# The settings table doesn't exist. We need to make it.
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
@ -149,277 +121,18 @@ class sqlThread(threading.Thread):
|
||||||
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
self.cur.execute('''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
||||||
int(time.time()),))
|
int(time.time()),))
|
||||||
logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.')
|
logger.debug('In messages.dat database, removing an obsolete field from the pubkeys table.')
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE pubkeys_backup(hash blob, transmitdata blob, time int,'''
|
|
||||||
''' usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE);''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys_backup SELECT hash, transmitdata, time, usedpersonally FROM pubkeys;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys'''
|
|
||||||
''' (hash blob, transmitdata blob, time int, usedpersonally text, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys SELECT hash, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys_backup;''')
|
|
||||||
logger.debug(
|
|
||||||
'Deleting all pubkeys from inventory.'
|
|
||||||
' They will be redownloaded and then saved with the correct times.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 'pubkey';''')
|
|
||||||
logger.debug('replacing Bitmessage announcements mailing list with a new one.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from subscriptions where address='BM-BbkPSZbzPwpVcYZpU4yHwf9ZPEapN5Zx' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO subscriptions VALUES'''
|
|
||||||
'''('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
|
||||||
logger.debug('Commiting.')
|
|
||||||
self.conn.commit()
|
|
||||||
logger.debug('Vacuuming message.dat. You might notice that the file size gets much smaller.')
|
|
||||||
self.cur.execute(''' VACUUM ''')
|
|
||||||
|
|
||||||
|
# initiate sql file
|
||||||
|
self.initialise_sql("upg_sc_if_old_ver_1")
|
||||||
# After code refactoring, the possible status values for sent messages
|
# After code refactoring, the possible status values for sent messages
|
||||||
# have changed.
|
# have changed.
|
||||||
self.cur.execute(
|
self.initialise_sql("upg_sc_if_old_ver_2")
|
||||||
'''update sent set status='doingmsgpow' where status='doingpow' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='msgsent' where status='sentmessage' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='doingpubkeypow' where status='findingpubkey' ''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''update sent set status='broadcastqueued' where status='broadcastpending' ''')
|
|
||||||
self.conn.commit()
|
|
||||||
|
|
||||||
# Let's get rid of the first20bytesofencryptedmessage field in
|
|
||||||
# the inventory table.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
if int(self.cur.fetchall()[0][0]) == 2:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, removing an obsolete field from'
|
|
||||||
' the inventory table.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE inventory_backup'''
|
|
||||||
'''(hash blob, objecttype text, streamnumber int, payload blob,'''
|
|
||||||
''' receivedtime integer, UNIQUE(hash) ON CONFLICT REPLACE);''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO inventory_backup SELECT hash, objecttype, streamnumber, payload, receivedtime'''
|
|
||||||
''' FROM inventory;''')
|
|
||||||
self.cur.execute('''DROP TABLE inventory''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE inventory'''
|
|
||||||
''' (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer,'''
|
|
||||||
''' UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO inventory SELECT hash, objecttype, streamnumber, payload, receivedtime'''
|
|
||||||
''' FROM inventory_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE inventory_backup;''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (3,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new column to the inventory table to store tags.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 1 or currentVersion == 3:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, adding tag field to'
|
|
||||||
' the inventory table.')
|
|
||||||
item = '''ALTER TABLE inventory ADD tag blob DEFAULT '' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (4,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new column to the pubkeys table to store the address version.
|
|
||||||
# We're going to trash all of our pubkeys and let them be redownloaded.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 4:
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int,'''
|
|
||||||
'''usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 'pubkey';''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (5,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# Add a new table: objectprocessorqueue with which to hold objects
|
|
||||||
# that have yet to be processed if the user shuts down Bitmessage.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 5:
|
|
||||||
self.cur.execute('''DROP TABLE knownnodes''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (6,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# changes related to protocol v3
|
|
||||||
# In table inventory and objectprocessorqueue, objecttype is now
|
|
||||||
# an integer (it was a human-friendly string previously)
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 6:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, dropping and recreating'
|
|
||||||
' the inventory table.')
|
|
||||||
self.cur.execute('''DROP TABLE inventory''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE inventory'''
|
|
||||||
''' (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer,'''
|
|
||||||
''' tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute('''DROP TABLE objectprocessorqueue''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE objectprocessorqueue'''
|
|
||||||
''' (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''')
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (7,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
logger.debug(
|
|
||||||
'Finished dropping and recreating the inventory table.')
|
|
||||||
|
|
||||||
# The format of data stored in the pubkeys table has changed. Let's
|
|
||||||
# clear it, and the pubkeys from inventory, so that they'll
|
|
||||||
# be re-downloaded.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 7:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, clearing pubkeys table'
|
|
||||||
' because the data format has been updated.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from inventory where objecttype = 1;''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''delete from pubkeys;''')
|
|
||||||
# Any sending messages for which we *thought* that we had
|
|
||||||
# the pubkey must be rechecked.
|
|
||||||
self.cur.execute(
|
|
||||||
'''UPDATE sent SET status='msgqueued' WHERE status='doingmsgpow' or status='badkey';''')
|
|
||||||
query = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (8,)
|
|
||||||
self.cur.execute(query, parameters)
|
|
||||||
logger.debug('Finished clearing currently held pubkeys.')
|
|
||||||
|
|
||||||
# Add a new column to the inbox table to store the hash of
|
|
||||||
# the message signature. We'll use this as temporary message UUID
|
|
||||||
# in order to detect duplicates.
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 8:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, adding sighash field to'
|
|
||||||
' the inbox table.')
|
|
||||||
item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' '''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
item = '''update settings set value=? WHERE key='version';'''
|
|
||||||
parameters = (9,)
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
|
|
||||||
# We'll also need a `sleeptill` field and a `ttl` field. Also we
|
|
||||||
# can combine the pubkeyretrynumber and msgretrynumber into one.
|
|
||||||
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 9:
|
|
||||||
logger.info(
|
|
||||||
'In messages.dat database, making TTL-related changes:'
|
|
||||||
' combining the pubkeyretrynumber and msgretrynumber'
|
|
||||||
' fields into the retrynumber field and adding the'
|
|
||||||
' sleeptill and ttl fields...')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE sent_backup'''
|
|
||||||
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
|
|
||||||
''' ackdata blob, lastactiontime integer, status text, retrynumber integer,'''
|
|
||||||
''' folder text, encodingtype int)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO sent_backup SELECT msgid, toaddress, toripe, fromaddress,'''
|
|
||||||
''' subject, message, ackdata, lastactiontime,'''
|
|
||||||
''' status, 0, folder, encodingtype FROM sent;''')
|
|
||||||
self.cur.execute('''DROP TABLE sent''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE sent'''
|
|
||||||
''' (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text,'''
|
|
||||||
''' ackdata blob, senttime integer, lastactiontime integer, sleeptill int, status text,'''
|
|
||||||
''' retrynumber integer, folder text, encodingtype int, ttl int)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO sent SELECT msgid, toaddress, toripe, fromaddress, subject, message, ackdata,'''
|
|
||||||
''' lastactiontime, lastactiontime, 0, status, 0, folder, encodingtype, 216000 FROM sent_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE sent_backup''')
|
|
||||||
logger.info('In messages.dat database, finished making TTL-related changes.')
|
|
||||||
logger.debug('In messages.dat database, adding address field to the pubkeys table.')
|
|
||||||
# We're going to have to calculate the address for each row in the pubkeys
|
|
||||||
# table. Then we can take out the hash field.
|
|
||||||
self.cur.execute('''ALTER TABLE pubkeys ADD address text DEFAULT '' ;''')
|
|
||||||
|
|
||||||
# replica for loop to update hashed address
|
|
||||||
self.cur.execute('''UPDATE pubkeys SET address=(enaddr(pubkeys.addressversion, 1, hash)); ''')
|
|
||||||
|
|
||||||
# Now we can remove the hash field from the pubkeys table.
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TEMPORARY TABLE pubkeys_backup'''
|
|
||||||
''' (address text, addressversion int, transmitdata blob, time int,'''
|
|
||||||
''' usedpersonally text, UNIQUE(address) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys_backup'''
|
|
||||||
''' SELECT address, addressversion, transmitdata, time, usedpersonally FROM pubkeys;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE pubkeys'''
|
|
||||||
''' (address text, addressversion int, transmitdata blob, time int, usedpersonally text,'''
|
|
||||||
''' UNIQUE(address) ON CONFLICT REPLACE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO pubkeys SELECT'''
|
|
||||||
''' address, addressversion, transmitdata, time, usedpersonally FROM pubkeys_backup;''')
|
|
||||||
self.cur.execute('''DROP TABLE pubkeys_backup''')
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, done adding address field to the pubkeys table'
|
|
||||||
' and removing the hash field.')
|
|
||||||
self.cur.execute('''update settings set value=10 WHERE key='version';''')
|
|
||||||
|
|
||||||
# Update the address colunm to unique in addressbook table
|
|
||||||
item = '''SELECT value FROM settings WHERE key='version';'''
|
|
||||||
parameters = ''
|
|
||||||
self.cur.execute(item, parameters)
|
|
||||||
currentVersion = int(self.cur.fetchall()[0][0])
|
|
||||||
if currentVersion == 10:
|
|
||||||
logger.debug(
|
|
||||||
'In messages.dat database, updating address column to UNIQUE'
|
|
||||||
' in the addressbook table.')
|
|
||||||
self.cur.execute(
|
|
||||||
'''ALTER TABLE addressbook RENAME TO old_addressbook''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''CREATE TABLE addressbook'''
|
|
||||||
''' (label text, address text, UNIQUE(address) ON CONFLICT IGNORE)''')
|
|
||||||
self.cur.execute(
|
|
||||||
'''INSERT INTO addressbook SELECT label, address FROM old_addressbook;''')
|
|
||||||
self.cur.execute('''DROP TABLE old_addressbook''')
|
|
||||||
self.cur.execute('''update settings set value=11 WHERE key='version';''')
|
|
||||||
|
|
||||||
# Are you hoping to add a new option to the keys.dat file of existing
|
|
||||||
# Bitmessage users or modify the SQLite database? Add it right
|
|
||||||
# above this line!
|
|
||||||
|
|
||||||
|
def check_columns_can_store_binary_null(self): # pylint: disable=too-many-locals,
|
||||||
|
# too-many-branches, too-many-statements
|
||||||
|
"""
|
||||||
|
Check if sqlite can store binary zeros.
|
||||||
why line 149 and 150? Makes no sense to put it here. why line 149 and 150? Makes no sense to put it here.
|
|||||||
|
"""
|
||||||
we should at least differenciate between SQL exceptions and other exceptions (e.g. file not found) we should at least differenciate between SQL exceptions and other exceptions (e.g. file not found)
Differed between Differed between
```except IOError as err:``` and ```except Exception as err:```
Not add others exceptions
|
|||||||
try:
|
try:
|
||||||
testpayload = '\x00\x00'
|
testpayload = '\x00\x00'
|
||||||
t = ('1234', 1, testpayload, '12345678', 'no')
|
t = ('1234', 1, testpayload, '12345678', 'no')
|
||||||
|
@ -460,11 +173,17 @@ class sqlThread(threading.Thread):
|
||||||
else:
|
else:
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
|
|
||||||
# Let us check to see the last time we vaccumed the messages.dat file.
|
def check_vacuum(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements,
|
||||||
# If it has been more than a month let's do it now.
|
# Redefinition-of-parameters-type-from-tuple-to-str, R0204, line-too-long, E501
|
||||||
item = '''SELECT value FROM settings WHERE key='lastvacuumtime';'''
|
"""
|
||||||
parameters = ''
|
Check vacuum and apply sql queries for different different conditions.
|
||||||
self.cur.execute(item, parameters)
|
Let us check to see the last time we vaccumed the messages.dat file.
|
||||||
|
If it has been more than a month let's do it now.
|
||||||
|
"""
|
||||||
|
|
||||||
|
query = "SELECT value FROM settings WHERE key='lastvacuumtime'"
|
||||||
|
parameters = ()
|
||||||
|
self.cur.execute(query, parameters)
|
||||||
queryreturn = self.cur.fetchall()
|
queryreturn = self.cur.fetchall()
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
value, = row
|
value, = row
|
||||||
|
@ -487,17 +206,89 @@ class sqlThread(threading.Thread):
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
||||||
True)))
|
True)))
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
item = '''update settings set value=? WHERE key='lastvacuumtime';'''
|
query = "update settings set value=? WHERE key='lastvacuumtime'"
|
||||||
`item` is a bad name. Use `query`.
|
|||||||
parameters = (int(time.time()),)
|
parameters = (int(time.time()),)
|
||||||
self.cur.execute(item, parameters)
|
self.cur.execute(query, parameters)
|
||||||
|
|
||||||
this should be in parent class this should be in parent class
Done Done
|
|||||||
helper_sql.sql_ready.set()
|
def upgrade_config_parser_setting_version(self, settingsversion):
|
||||||
|
"""
|
||||||
|
Upgrade schema with respect setting version
|
||||||
|
"""
|
||||||
|
|
||||||
while True:
|
self.initialise_sql("config_setting_ver_{}".format(settingsversion))
|
||||||
item = helper_sql.sqlSubmitQueue.get()
|
|
||||||
if item == 'commit':
|
def initialize_schema(self):
|
||||||
|
"""
|
||||||
|
Initialise Db schema
|
||||||
|
"""
|
||||||
I don't understand this I don't understand this `os._exit(0)`. Why don't you use `sys.exit('Error while trying to create database file..')`? What is it mean "in1111"?
`BMConfigParser` (obj)
|
|||||||
try:
|
try:
|
||||||
|
inbox_exists = list(self.cur.execute('PRAGMA table_info(inbox)'))
|
||||||
|
if not inbox_exists:
|
||||||
|
self.initialise_sql("initialize_schema")
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
logger.info('Created messages database file')
|
||||||
|
except Exception as err:
|
||||||
|
if str(err) == 'table inbox already exists':
|
||||||
|
logger.debug('Database file already exists.')
|
||||||
something like this
something like this
```
while self.loop_queue():
pass
```
```
return False
```
|
|||||||
|
else:
|
||||||
|
os._exit(
|
||||||
|
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
||||||
|
|
||||||
|
def create_sql_function(self):
|
||||||
|
"""
|
||||||
|
Apply create_function to DB
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
|
||||||
|
except (TypeError, sqlite3.NotSupportedError) as err:
|
||||||
|
logger.debug(
|
||||||
|
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
|
||||||
|
self.conn.create_function("enaddr", 3, encodeAddress)
|
||||||
|
|
||||||
|
|
||||||
|
class sqlThread(threading.Thread):
|
||||||
|
"""A thread for all SQL operations"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
threading.Thread.__init__(self, name="SQL")
|
||||||
|
self.db = None
|
||||||
|
self.max_setting_level = 4
|
||||||
|
logger.debug('Init thread in sqlthread')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sql_config_settings_version(self):
|
||||||
|
""" Getter for BMConfigParser (obj) """
|
||||||
|
|
||||||
|
return config.getint(
|
||||||
|
'bitmessagesettings', 'settingsversion')
|
||||||
|
|
||||||
|
@sql_config_settings_version.setter
|
||||||
|
def sql_config_settings_version(self, settingsversion): # pylint: disable=R0201, no-self-use
|
||||||
|
# Setter for BmConfigparser
|
||||||
|
|
||||||
|
config.set(
|
||||||
|
'bitmessagesettings', 'settingsversion', str(int(settingsversion) + 1))
|
||||||
|
return config.save()
|
||||||
|
|
||||||
|
def upgrade_config_setting_version(self):
|
||||||
|
"""
|
||||||
|
upgrade config parser setting version.
|
||||||
|
If the settings version is equal to 2 or 3 then the
|
||||||
|
sqlThread will modify the pubkeys table and change
|
||||||
|
the settings version to 4.
|
||||||
|
"""
|
||||||
|
while self.sql_config_settings_version < self.max_setting_level:
|
||||||
|
self.db.upgrade_config_parser_setting_version(self.sql_config_settings_version)
|
||||||
|
self.sql_config_settings_version = self.sql_config_settings_version
|
||||||
|
|
||||||
|
def loop_queue(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
|
||||||
|
"""
|
||||||
|
Looping queue and process them
|
||||||
|
"""
|
||||||
|
query = helper_sql.sqlSubmitQueue.get()
|
||||||
|
if query == 'commit':
|
||||||
|
try:
|
||||||
|
self.db.conn.commit()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
|
@ -513,16 +304,15 @@ class sqlThread(threading.Thread):
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
||||||
True)))
|
True)))
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
elif item == 'exit':
|
elif query == 'exit':
|
||||||
self.conn.close()
|
self.db.conn.close()
|
||||||
logger.info('sqlThread exiting gracefully.')
|
logger.info('sqlThread exiting gracefully.')
|
||||||
|
return False
|
||||||
return
|
elif query == 'movemessagstoprog':
|
||||||
elif item == 'movemessagstoprog':
|
|
||||||
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
|
logger.debug('the sqlThread is moving the messages.dat file to the local program directory.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.conn.commit()
|
self.db.conn.commit()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
|
@ -538,17 +328,17 @@ class sqlThread(threading.Thread):
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
||||||
True)))
|
True)))
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
self.conn.close()
|
self.db.conn.close()
|
||||||
shutil.move(
|
shutil.move(
|
||||||
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
|
paths.lookupAppdataFolder() + 'messages.dat', paths.lookupExeFolder() + 'messages.dat')
|
||||||
self.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
|
self.db.conn = sqlite3.connect(paths.lookupExeFolder() + 'messages.dat')
|
||||||
self.conn.text_factory = str
|
self.db.conn.text_factory = str
|
||||||
self.cur = self.conn.cursor()
|
self.db.cur = self.db.conn.cursor()
|
||||||
elif item == 'movemessagstoappdata':
|
elif query == 'movemessagstoappdata':
|
||||||
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
|
logger.debug('the sqlThread is moving the messages.dat file to the Appdata folder.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.conn.commit()
|
self.db.conn.commit()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
|
@ -564,18 +354,18 @@ class sqlThread(threading.Thread):
|
||||||
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'),
|
||||||
True)))
|
True)))
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
self.conn.close()
|
self.db.conn.close()
|
||||||
shutil.move(
|
shutil.move(
|
||||||
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
|
paths.lookupExeFolder() + 'messages.dat', paths.lookupAppdataFolder() + 'messages.dat')
|
||||||
self.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
|
self.db.conn = sqlite3.connect(paths.lookupAppdataFolder() + 'messages.dat')
|
||||||
self.conn.text_factory = str
|
self.db.conn.text_factory = str
|
||||||
self.cur = self.conn.cursor()
|
self.db.cur = self.db.conn.cursor()
|
||||||
elif item == 'deleteandvacuume':
|
elif query == 'deleteandvacuume':
|
||||||
self.cur.execute('''delete from inbox where folder='trash' ''')
|
self.db.cur.execute('''delete from inbox where folder='trash' ''')
|
||||||
self.cur.execute('''delete from sent where folder='trash' ''')
|
self.db.cur.execute('''delete from sent where folder='trash' ''')
|
||||||
self.conn.commit()
|
self.db.conn.commit()
|
||||||
try:
|
try:
|
||||||
self.cur.execute(''' VACUUM ''')
|
self.db.cur.execute(''' VACUUM ''')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
|
@ -595,8 +385,8 @@ class sqlThread(threading.Thread):
|
||||||
parameters = helper_sql.sqlSubmitQueue.get()
|
parameters = helper_sql.sqlSubmitQueue.get()
|
||||||
rowcount = 0
|
rowcount = 0
|
||||||
try:
|
try:
|
||||||
self.cur.execute(item, parameters)
|
self.db.cur.execute(query, parameters)
|
||||||
rowcount = self.cur.rowcount
|
rowcount = self.db.cur.rowcount
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'database or disk is full':
|
if str(err) == 'database or disk is full':
|
||||||
logger.fatal(
|
logger.fatal(
|
||||||
|
@ -620,21 +410,54 @@ class sqlThread(threading.Thread):
|
||||||
' you might want to censor this data with asterisks (***)'
|
' you might want to censor this data with asterisks (***)'
|
||||||
make this into a separate method make this into a separate method
no no
```
class TestDB(UpgradeDB)
```
```
def _connection_build(self, db_name=None)
self.__connection_build_internal(memory=True)
```
Changed approach Changed approach
Done Done
```CAST(strftime('%s', 'now') AS STR)```
|
|||||||
' as it can contain private information: %s.'
|
' as it can contain private information: %s.'
|
||||||
' Here is the actual error message thrown by the sqlThread: %s',
|
' Here is the actual error message thrown by the sqlThread: %s',
|
||||||
str(item),
|
str(query),
|
||||||
str(repr(parameters)),
|
str(repr(parameters)),
|
||||||
str(err))
|
str(err))
|
||||||
logger.fatal('This program shall now abruptly exit!')
|
logger.fatal('This program shall now abruptly exit!')
|
||||||
|
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
helper_sql.sqlReturnQueue.put((self.cur.fetchall(), rowcount))
|
helper_sql.sqlReturnQueue.put((self.db.cur.fetchall(), rowcount))
|
||||||
# helper_sql.sqlSubmitQueue.task_done()
|
# helper_sql.sqlSubmitQueue.task_done()
|
||||||
|
return True
|
||||||
|
|
||||||
def create_function(self):
|
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements,
|
||||||
# create_function
|
# Redefinition-of-parameters-type-from-tuple-to-str, R0204, line-too-long, E501
|
||||||
try:
|
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
|
||||||
self.conn.create_function("enaddr", 3, func=encodeAddress, deterministic=True)
|
|
||||||
except (TypeError, sqlite3.NotSupportedError) as err:
|
logger.info('Init thread in sqlthread')
|
||||||
logger.debug(
|
|
||||||
"Got error while pass deterministic in sqlite create function {}, Passing 3 params".format(err))
|
self.db = BitmessageDB()
|
||||||
self.conn.create_function("enaddr", 3, encodeAddress)
|
|
||||||
|
helper_sql.sql_available = True
|
||||||
|
|
||||||
|
config_ready.wait()
|
||||||
this should all be in parent class this should all be in parent class
Done Done
|
|||||||
|
|
||||||
|
self.db.create_sql_function()
|
||||||
|
|
||||||
|
self.db.initialize_schema()
|
||||||
|
|
||||||
|
self.upgrade_config_setting_version()
|
||||||
|
|
||||||
|
helper_startup.updateConfig()
|
||||||
|
|
||||||
|
self.db.upgrade_schema_if_old_version()
|
||||||
|
|
||||||
|
self.db.upgrade_to_latest()
|
||||||
|
|
||||||
|
self.db.check_columns_can_store_binary_null()
|
||||||
|
|
||||||
|
self.db.check_vacuum()
|
||||||
|
|
||||||
|
helper_sql.sql_ready.set()
|
||||||
|
|
||||||
|
while self.loop_queue():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TestDB(BitmessageDB):
|
||||||
|
"""
|
||||||
|
Database connection build for test case
|
||||||
|
"""
|
||||||
|
def _connection_build(self):
|
||||||
|
self._connection_build_internal("memory", True)
|
||||||
|
return self.conn, self.cur
|
||||||
|
|
|
@ -23,7 +23,7 @@ import helper_addressbook
|
||||||
|
|
||||||
from bmconfigparser import config
|
from bmconfigparser import config
|
||||||
from helper_msgcoding import MsgEncode, MsgDecode
|
from helper_msgcoding import MsgEncode, MsgDecode
|
||||||
from helper_sql import sqlQuery
|
from helper_sql import sqlQuery, sqlExecute
|
||||||
from network import asyncore_pollchoose as asyncore, knownnodes
|
from network import asyncore_pollchoose as asyncore, knownnodes
|
||||||
from network.bmproto import BMProto
|
from network.bmproto import BMProto
|
||||||
from network.connectionpool import BMConnectionPool
|
from network.connectionpool import BMConnectionPool
|
||||||
|
@ -409,6 +409,22 @@ class TestCore(unittest.TestCase):
|
||||||
self.delete_address_from_addressbook(address1)
|
self.delete_address_from_addressbook(address1)
|
||||||
self.delete_address_from_addressbook(address2)
|
self.delete_address_from_addressbook(address2)
|
||||||
|
|
||||||
|
def test_sqlscripts(self):
|
||||||
|
""" Test sql statements"""
|
||||||
|
|
||||||
|
sqlExecute('create table if not exists testtbl (id integer)')
|
||||||
|
tables = list(sqlQuery("select name from sqlite_master where type is 'table'"))
|
||||||
|
res = [item for item in tables if 'testtbl' in item]
|
||||||
|
self.assertEqual(res[0][0], 'testtbl')
|
||||||
|
|
||||||
|
queryreturn = sqlExecute("INSERT INTO testtbl VALUES(101);")
|
||||||
|
self.assertEqual(queryreturn, 1)
|
||||||
|
|
||||||
|
queryreturn = sqlQuery('''SELECT * FROM testtbl''')
|
||||||
|
self.assertEqual(queryreturn[0][0], 101)
|
||||||
|
|
||||||
|
sqlQuery("DROP TABLE testtbl")
|
||||||
|
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
"""Starts all tests defined in this module"""
|
"""Starts all tests defined in this module"""
|
||||||
|
|
0
src/tests/sql/__init__.py
Normal file
|
@ -2,43 +2,214 @@
|
||||||
# flake8: noqa:E402
|
# flake8: noqa:E402
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
should be called something like should be called something like `TestUpgradeDB`
```
self.db = TestDB()
res = self.db.execute()
self.assertwhatever
```
maybe here in the decorator create the maybe here in the decorator create the `self.db` object
Done Done
Done Done
remove this remove this
remove this remove this
remove this method remove this method
```
cls.test_db = TestDB()
```
delete his method delete his method
delete this method delete this method
why do you need this? how about just using why do you need this? how about just using `cls.test_db.conn` and `cls.test_db.cur`?
67-70 not needed anymore 67-70 not needed anymore
just remove this section, no comment just remove this section, no comment
duplicate line duplicate line
this is not helpful this is not helpful
`setUp` shouldn't be a class method
|
|||||||
import threading
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from .common import skip_python3
|
|
||||||
|
|
||||||
skip_python3()
|
|
||||||
|
|
||||||
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
|
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
|
||||||
|
|
||||||
from pybitmessage.helper_sql import (
|
from pybitmessage.class_sqlThread import TestDB # noqa:E402
|
||||||
sqlQuery, sql_ready, sqlStoredProcedure) # noqa:E402
|
|
||||||
from pybitmessage.class_sqlThread import sqlThread # noqa:E402
|
|
||||||
from pybitmessage.addresses import encodeAddress # noqa:E402
|
from pybitmessage.addresses import encodeAddress # noqa:E402
|
||||||
|
|
||||||
|
|
||||||
class TestSqlThread(unittest.TestCase):
|
def filter_table_column(schema, column):
|
||||||
"""Test case for SQL thread"""
|
"""
|
||||||
|
Filter column from schema
|
||||||
|
"""
|
||||||
|
for x in schema:
|
||||||
|
for y in x:
|
||||||
|
if y == column:
|
||||||
|
yield y
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls):
|
|
||||||
# Start SQL thread
|
|
||||||
sqlLookup = sqlThread()
|
|
||||||
sqlLookup.daemon = True
|
|
||||||
sqlLookup.start()
|
|
||||||
sql_ready.wait()
|
|
||||||
|
|
||||||
@classmethod
|
class TestSqlBase(object): # pylint: disable=E1101, too-few-public-methods, E1004, W0232
|
||||||
def tearDownClass(cls):
|
""" Base for test case """
|
||||||
sqlStoredProcedure('exit')
|
|
||||||
for thread in threading.enumerate():
|
__name__ = None
|
||||||
if thread.name == "SQL":
|
root_path = os.path.dirname(os.path.dirname(__file__))
|
||||||
thread.join()
|
test_db = None
|
||||||
|
|
||||||
|
def _setup_db(self): # pylint: disable=W0622, redefined-builtin
|
||||||
|
"""
|
||||||
|
Drop all tables before each test case start
|
||||||
|
"""
|
||||||
|
self.test_db = TestDB()
|
||||||
|
self.test_db.create_sql_function()
|
||||||
|
self.test_db.initialize_schema()
|
||||||
|
|
||||||
|
def initialise_database(self, test_db_cur, file): # pylint: disable=W0622, redefined-builtin
|
||||||
|
"""
|
||||||
|
Initialise DB
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(os.path.join(self.root_path, "tests/sql/{}.sql".format(file)), 'r') as sql_as_string:
|
||||||
|
sql_as_string = sql_as_string.read()
|
||||||
|
|
||||||
|
test_db_cur.cur.executescript(sql_as_string)
|
||||||
|
|
||||||
|
|
||||||
|
class TestFnBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
|
||||||
|
""" Test case for Sql function"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""
|
||||||
|
setup for test case
|
||||||
|
"""
|
||||||
|
self._setup_db()
|
||||||
|
|
||||||
def test_create_function(self):
|
def test_create_function(self):
|
||||||
"""Check the result of enaddr function"""
|
"""Check the result of enaddr function"""
|
||||||
encoded_str = encodeAddress(4, 1, "21122112211221122112")
|
st = "21122112211221122112".encode()
|
||||||
|
encoded_str = encodeAddress(4, 1, st)
|
||||||
|
|
||||||
query = sqlQuery('SELECT enaddr(4, 1, "21122112211221122112")')
|
item = '''SELECT enaddr(4, 1, ?);'''
|
||||||
self.assertEqual(
|
parameters = (st, )
|
||||||
query[0][-1], encoded_str, "test case fail for create_function")
|
self.test_db.cur.execute(item, parameters)
|
||||||
|
query = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(query[0][-1], encoded_str, "test case fail for create_function")
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpgradeBitmessageDB(TestSqlBase, unittest.TestCase): # pylint: disable=protected-access
|
||||||
|
"""Test case for SQL versions"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""
|
||||||
|
Setup DB schema before start.
|
||||||
|
And applying default schema for version test.
|
||||||
|
"""
|
||||||
|
self._setup_db()
|
||||||
|
self.test_db.cur.execute('''INSERT INTO settings VALUES('version','2')''')
|
||||||
|
|
||||||
|
def version(self):
|
||||||
|
"""
|
||||||
|
Run SQL Scripts, Initialize DB with respect to versioning
|
||||||
|
and Upgrade DB schema for all versions
|
||||||
|
"""
|
||||||
|
def wrapper(*args):
|
||||||
|
"""
|
||||||
```
# setup
self.test_db.create_function()
```
|
|||||||
|
Run SQL and mocking DB for versions
|
||||||
|
"""
|
||||||
|
self = args[0]
|
||||||
|
func_name = func.__name__
|
||||||
|
version = func_name.rsplit('_', 1)[-1]
|
||||||
|
|
||||||
|
self.test_db._upgrade_one_level_sql_statement(int(version)) # pylint: disable= W0212, protected-access
|
||||||
|
|
||||||
|
# Update versions DB mocking
|
||||||
|
self.initialise_database(self.test_db, "init_version_{}".format(version))
|
||||||
|
|
||||||
|
return func(*args) # <-- use (self, ...)
|
||||||
|
func = self
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_2(self):
|
||||||
|
"""
|
||||||
|
Test with version 2
|
||||||
|
"""
|
||||||
|
|
||||||
|
res = self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
|
||||||
|
WHERE type='table' AND name='inventory_backup' ''')
|
||||||
|
self.assertNotEqual(res, 1, "Table inventory_backup not deleted in versioning 2")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_3(self):
|
||||||
|
"""
|
||||||
|
Test with version 1
|
||||||
|
Version 1 and 3 are same so will skip 3
|
||||||
|
"""
|
||||||
|
|
||||||
|
res = self.test_db.cur.execute('''PRAGMA table_info('inventory');''')
|
||||||
|
result = list(filter_table_column(res, "tag"))
|
||||||
|
self.assertEqual(result, ['tag'], "Data not migrated for version 3")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_4(self):
|
||||||
|
"""
|
||||||
|
Test with version 4
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute("select * from pubkeys where addressversion = '1';")
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(len(res), 1, "Table inventory not deleted in versioning 4")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_5(self):
|
||||||
|
"""
|
||||||
|
Test with version 5
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='knownnodes' ''') # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(res[0][0], 1, "Table knownnodes not deleted in versioning 5")
|
||||||
|
self.test_db.cur.execute(''' SELECT count(name) FROM sqlite_master
|
||||||
|
WHERE type='table' AND name='objectprocessorqueue'; ''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5")
|
||||||
|
self.test_db.cur.execute(''' SELECT * FROM objectprocessorqueue where objecttype='hash' ; ''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(len(res), 0, "Table objectprocessorqueue not created in versioning 5")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_6(self):
|
||||||
|
"""
|
||||||
|
Test with version 6
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('inventory');''')
|
||||||
|
inventory = self.test_db.cur.fetchall()
|
||||||
|
inventory = list(filter_table_column(inventory, "expirestime"))
|
||||||
|
self.assertEqual(inventory, ['expirestime'], "Data not migrated for version 6")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('objectprocessorqueue');''')
|
||||||
|
objectprocessorqueue = self.test_db.cur.fetchall()
|
||||||
|
objectprocessorqueue = list(filter_table_column(objectprocessorqueue, "objecttype"))
|
||||||
|
self.assertEqual(objectprocessorqueue, ['objecttype'], "Data not migrated for version 6")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_7(self):
|
||||||
|
"""
|
||||||
|
Test with version 7
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT * FROM pubkeys ''')
|
||||||
|
pubkeys = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(pubkeys, [], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT * FROM inventory ''')
|
||||||
|
inventory = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(inventory, [], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''SELECT status FROM sent ''')
|
||||||
|
sent = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(sent, [('msgqueued',), ('msgqueued',)], "Data not migrated for version 7")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_8(self):
|
||||||
|
"""
|
||||||
|
Test with version 8
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute('''PRAGMA table_info('inbox');''')
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
result = list(filter_table_column(res, "sighash"))
|
||||||
|
self.assertEqual(result, ['sighash'], "Data not migrated for version 8")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_9(self):
|
||||||
|
"""
|
||||||
|
Test with version 9
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_db.cur.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='pubkeys_backup'") # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertNotEqual(res[0][0], 1, "Table pubkeys_backup not deleted")
|
||||||
|
|
||||||
|
@version
|
||||||
|
def test_bm_db_version_10(self):
|
||||||
|
"""
|
||||||
|
Test with version 10
|
||||||
|
"""
|
||||||
|
|
||||||
|
label = "test"
|
||||||
|
self.test_db.cur.execute("SELECT * FROM addressbook WHERE label='test' ") # noqa
|
||||||
|
res = self.test_db.cur.fetchall()
|
||||||
|
self.assertEqual(res[0][0], label, "Data not migrated for version 10")
|
||||||
|
|
isort