SQL operations asserts and code quality

- complain if trying to execute SQL statements without a running
  `.threads.sqlThread`. This is to give better test feedback if used
  incorrectly
- refactor `.helper_sql.sql_ready` as a `threading.Event`
- code quality
This commit is contained in:
Peter Šurda 2021-01-01 13:51:35 +01:00
parent 7a010441c3
commit 184664d758
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 42 additions and 31 deletions

View File

@ -16,6 +16,7 @@ import defaults
import helper_inbox import helper_inbox
import helper_msgcoding import helper_msgcoding
import helper_random import helper_random
import helper_sql
import highlevelcrypto import highlevelcrypto
import l10n import l10n
import proofofwork import proofofwork
@ -62,8 +63,8 @@ class singleWorker(StoppableThread):
def run(self): def run(self):
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
while not state.sqlReady and state.shutdown == 0: while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0:
self.stop.wait(2) self.stop.wait(1.0)
if state.shutdown > 0: if state.shutdown > 0:
return return

View File

@ -28,6 +28,7 @@ class sqlThread(threading.Thread):
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`""" """Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
helper_sql.sql_available = True
self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn = sqlite3.connect(state.appdata + 'messages.dat')
self.conn.text_factory = str self.conn.text_factory = str
self.cur = self.conn.cursor() self.cur = self.conn.cursor()
@ -464,7 +465,7 @@ class sqlThread(threading.Thread):
parameters = (int(time.time()),) parameters = (int(time.time()),)
self.cur.execute(item, parameters) self.cur.execute(item, parameters)
state.sqlReady = True helper_sql.sql_ready.set()
while True: while True:
item = helper_sql.sqlSubmitQueue.get() item = helper_sql.sqlSubmitQueue.get()

View File

@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue()
"""the queue for SQL""" """the queue for SQL"""
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
"""the queue for results""" """the queue for results"""
sqlLock = threading.Lock() sql_lock = threading.Lock()
""" lock to prevent queueing a new request until the previous response
is available """
sql_available = False
"""set to True by `.threads.sqlThread` immediately upon start"""
sql_ready = threading.Event()
"""set by `.threads.sqlThread` when ready for processing (after
initialization is done)"""
def sqlQuery(sqlStatement, *args): def sqlQuery(sql_statement, *args):
""" """
Query sqlite and return results Query sqlite and return results
:param str sqlStatement: SQL statement string :param str sql_statement: SQL statement string
:param list args: SQL query parameters :param list args: SQL query parameters
:rtype: list :rtype: list
""" """
sqlLock.acquire() assert sql_available
sqlSubmitQueue.put(sqlStatement) sql_lock.acquire()
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')
@ -44,46 +52,48 @@ def sqlQuery(sqlStatement, *args):
else: else:
sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
queryreturn, _ = sqlReturnQueue.get() queryreturn, _ = sqlReturnQueue.get()
sqlLock.release() sql_lock.release()
return queryreturn return queryreturn
def sqlExecuteChunked(sqlStatement, idCount, *args): def sqlExecuteChunked(sql_statement, idCount, *args):
"""Execute chunked SQL statement to avoid argument limit""" """Execute chunked SQL statement to avoid argument limit"""
# SQLITE_MAX_VARIABLE_NUMBER, # SQLITE_MAX_VARIABLE_NUMBER,
# unfortunately getting/setting isn't exposed to python # unfortunately getting/setting isn't exposed to python
assert sql_available
sqlExecuteChunked.chunkSize = 999 sqlExecuteChunked.chunkSize = 999
if idCount == 0 or idCount > len(args): if idCount == 0 or idCount > len(args):
return 0 return 0
totalRowCount = 0 total_row_count = 0
with sqlLock: with sql_lock:
for i in range( for i in range(
len(args) - idCount, len(args), len(args) - idCount, len(args),
sqlExecuteChunked.chunkSize - (len(args) - idCount) sqlExecuteChunked.chunkSize - (len(args) - idCount)
): ):
chunk_slice = args[ chunk_slice = args[
i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount)
] ]
sqlSubmitQueue.put( sqlSubmitQueue.put(
sqlStatement.format(','.join('?' * len(chunk_slice))) sql_statement.format(','.join('?' * len(chunk_slice)))
) )
# first static args, and then iterative chunk # first static args, and then iterative chunk
sqlSubmitQueue.put( sqlSubmitQueue.put(
args[0:len(args) - idCount] + chunk_slice args[0:len(args) - idCount] + chunk_slice
) )
retVal = sqlReturnQueue.get() ret_val = sqlReturnQueue.get()
totalRowCount += retVal[1] total_row_count += ret_val[1]
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
return totalRowCount return total_row_count
def sqlExecute(sqlStatement, *args): def sqlExecute(sql_statement, *args):
"""Execute SQL statement (optionally with arguments)""" """Execute SQL statement (optionally with arguments)"""
sqlLock.acquire() assert sql_available
sqlSubmitQueue.put(sqlStatement) sql_lock.acquire()
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')
@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args):
sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
_, rowcount = sqlReturnQueue.get() _, rowcount = sqlReturnQueue.get()
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
sqlLock.release() sql_lock.release()
return rowcount return rowcount
def sqlStoredProcedure(procName): def sqlStoredProcedure(procName):
"""Schedule procName to be run""" """Schedule procName to be run"""
sqlLock.acquire() assert sql_available
sql_lock.acquire()
sqlSubmitQueue.put(procName) sqlSubmitQueue.put(procName)
sqlLock.release() sql_lock.release()
class SqlBulkExecute(object): class SqlBulkExecute(object):
"""This is used when you have to execute the same statement in a cycle.""" """This is used when you have to execute the same statement in a cycle."""
def __enter__(self): def __enter__(self):
sqlLock.acquire() sql_lock.acquire()
return self return self
def __exit__(self, exc_type, value, traceback): def __exit__(self, exc_type, value, traceback):
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
sqlLock.release() sql_lock.release()
@staticmethod @staticmethod
def execute(sqlStatement, *args): def execute(sql_statement, *args):
"""Used for statements that do not return results.""" """Used for statements that do not return results."""
sqlSubmitQueue.put(sqlStatement) assert sql_available
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')

View File

@ -34,9 +34,6 @@ enableSTDIO = False
"""enable STDIO threads""" """enable STDIO threads"""
curses = False curses = False
sqlReady = False
"""set to true by `.threads.sqlThread` when ready for processing"""
maximumNumberOfHalfOpenConnections = 0 maximumNumberOfHalfOpenConnections = 0
maximumLengthOfTimeToBotherResendingMessages = 0 maximumLengthOfTimeToBotherResendingMessages = 0