From 184664d7583096e6cd7b828e08d112ff1727d81b Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 1 Jan 2021 13:51:35 +0100 Subject: [PATCH] SQL operations asserts and code quality - complain if trying to execute SQL statements without a running `.threads.sqlThread`. This is to give better test feedback if used incorrectly - refactor `.helper_sql.sql_ready` as a `threading.Event` - code quality --- src/class_singleWorker.py | 5 ++-- src/class_sqlThread.py | 3 +- src/helper_sql.py | 62 +++++++++++++++++++++++---------------- src/state.py | 3 -- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 499cb82f..1e7552a8 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -16,6 +16,7 @@ import defaults import helper_inbox import helper_msgcoding import helper_random +import helper_sql import highlevelcrypto import l10n import proofofwork @@ -62,8 +63,8 @@ class singleWorker(StoppableThread): def run(self): # pylint: disable=attribute-defined-outside-init - while not state.sqlReady and state.shutdown == 0: - self.stop.wait(2) + while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: + self.stop.wait(1.0) if state.shutdown > 0: return diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 7e9eb6c5..563dc472 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -28,6 +28,7 @@ class sqlThread(threading.Thread): def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" + helper_sql.sql_available = True self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() @@ -464,7 +465,7 @@ class sqlThread(threading.Thread): parameters = (int(time.time()),) self.cur.execute(item, parameters) - state.sqlReady = True + helper_sql.sql_ready.set() while True: item = helper_sql.sqlSubmitQueue.get() diff --git a/src/helper_sql.py b/src/helper_sql.py index 9b5dc29d..043bccf2 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue() """the queue for SQL""" sqlReturnQueue = Queue.Queue() """the queue for results""" -sqlLock = threading.Lock() +sql_lock = threading.Lock() +""" lock to prevent queueing a new request until the previous response + is available """ +sql_available = False +"""set to True by `.threads.sqlThread` immediately upon start""" +sql_ready = threading.Event() +"""set by `.threads.sqlThread` when ready for processing (after + initialization is done)""" -def sqlQuery(sqlStatement, *args): +def sqlQuery(sql_statement, *args): """ Query sqlite and return results - :param str sqlStatement: SQL statement string + :param str sql_statement: SQL statement string :param list args: SQL query parameters :rtype: list """ - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -44,46 +52,48 @@ def sqlQuery(sqlStatement, *args): else: sqlSubmitQueue.put(args) queryreturn, _ = sqlReturnQueue.get() - sqlLock.release() + sql_lock.release() return queryreturn -def sqlExecuteChunked(sqlStatement, idCount, *args): +def sqlExecuteChunked(sql_statement, idCount, *args): """Execute chunked SQL statement to avoid argument limit""" # SQLITE_MAX_VARIABLE_NUMBER, # unfortunately getting/setting isn't exposed to python + assert sql_available sqlExecuteChunked.chunkSize = 999 if idCount == 0 or idCount > len(args): return 0 - totalRowCount = 0 - with sqlLock: + total_row_count = 0 + with sql_lock: for i in range( - len(args) - idCount, len(args), - sqlExecuteChunked.chunkSize - (len(args) - idCount) + len(args) - idCount, len(args), + sqlExecuteChunked.chunkSize - (len(args) - idCount) ): chunk_slice = args[ i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) ] sqlSubmitQueue.put( - sqlStatement.format(','.join('?' * len(chunk_slice))) + sql_statement.format(','.join('?' * len(chunk_slice))) ) # first static args, and then iterative chunk sqlSubmitQueue.put( args[0:len(args) - idCount] + chunk_slice ) - retVal = sqlReturnQueue.get() - totalRowCount += retVal[1] + ret_val = sqlReturnQueue.get() + total_row_count += ret_val[1] sqlSubmitQueue.put('commit') - return totalRowCount + return total_row_count -def sqlExecute(sqlStatement, *args): +def sqlExecute(sql_statement, *args): """Execute SQL statement (optionally with arguments)""" - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args): sqlSubmitQueue.put(args) _, rowcount = sqlReturnQueue.get() sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() return rowcount def sqlStoredProcedure(procName): """Schedule procName to be run""" - sqlLock.acquire() + assert sql_available + sql_lock.acquire() sqlSubmitQueue.put(procName) - sqlLock.release() + sql_lock.release() class SqlBulkExecute(object): """This is used when you have to execute the same statement in a cycle.""" def __enter__(self): - sqlLock.acquire() + sql_lock.acquire() return self def __exit__(self, exc_type, value, traceback): sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() @staticmethod - def execute(sqlStatement, *args): + def execute(sql_statement, *args): """Used for statements that do not return results.""" - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') diff --git a/src/state.py b/src/state.py index 8bfaf7b9..aa703fae 100644 --- a/src/state.py +++ b/src/state.py @@ -34,9 +34,6 @@ enableSTDIO = False """enable STDIO threads""" curses = False -sqlReady = False -"""set to true by `.threads.sqlThread` when ready for processing""" - maximumNumberOfHalfOpenConnections = 0 maximumLengthOfTimeToBotherResendingMessages = 0