SQL operations asserts and code quality #1701
|
@ -16,6 +16,7 @@ import defaults
|
|||
import helper_inbox
|
||||
import helper_msgcoding
|
||||
import helper_random
|
||||
import helper_sql
|
||||
import highlevelcrypto
|
||||
import l10n
|
||||
import proofofwork
|
||||
|
@ -62,8 +63,8 @@ class singleWorker(StoppableThread):
|
|||
def run(self):
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
|
||||
while not state.sqlReady and state.shutdown == 0:
|
||||
self.stop.wait(2)
|
||||
while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0:
|
||||
self.stop.wait(1.0)
|
||||
if state.shutdown > 0:
|
||||
return
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ class sqlThread(threading.Thread):
|
|||
|
||||
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
|
||||
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
|
||||
helper_sql.sql_available = True
|
||||
self.conn = sqlite3.connect(state.appdata + 'messages.dat')
|
||||
self.conn.text_factory = str
|
||||
self.cur = self.conn.cursor()
|
||||
|
@ -464,7 +465,7 @@ class sqlThread(threading.Thread):
|
|||
parameters = (int(time.time()),)
|
||||
self.cur.execute(item, parameters)
|
||||
|
||||
state.sqlReady = True
|
||||
helper_sql.sql_ready.set()
|
||||
|
||||
while True:
|
||||
item = helper_sql.sqlSubmitQueue.get()
|
||||
|
|
|
@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue()
|
|||
"""the queue for SQL"""
|
||||
sqlReturnQueue = Queue.Queue()
|
||||
"""the queue for results"""
|
||||
sqlLock = threading.Lock()
|
||||
sql_lock = threading.Lock()
|
||||
""" lock to prevent queueing a new request until the previous response
|
||||
is available """
|
||||
sql_available = False
|
||||
"""set to True by `.threads.sqlThread` immediately upon start"""
|
||||
sql_ready = threading.Event()
|
||||
"""set by `.threads.sqlThread` when ready for processing (after
|
||||
initialization is done)"""
|
||||
|
||||
|
||||
def sqlQuery(sqlStatement, *args):
|
||||
def sqlQuery(sql_statement, *args):
|
||||
"""
|
||||
Query sqlite and return results
|
||||
|
||||
:param str sqlStatement: SQL statement string
|
||||
:param str sql_statement: SQL statement string
|
||||
:param list args: SQL query parameters
|
||||
:rtype: list
|
||||
"""
|
||||
sqlLock.acquire()
|
||||
sqlSubmitQueue.put(sqlStatement)
|
||||
assert sql_available
|
||||
sql_lock.acquire()
|
||||
sqlSubmitQueue.put(sql_statement)
|
||||
|
||||
if args == ():
|
||||
sqlSubmitQueue.put('')
|
||||
|
@ -44,22 +52,23 @@ def sqlQuery(sqlStatement, *args):
|
|||
else:
|
||||
sqlSubmitQueue.put(args)
|
||||
queryreturn, _ = sqlReturnQueue.get()
|
||||
sqlLock.release()
|
||||
sql_lock.release()
|
||||
|
||||
return queryreturn
|
||||
|
||||
|
||||
def sqlExecuteChunked(sqlStatement, idCount, *args):
|
||||
def sqlExecuteChunked(sql_statement, idCount, *args):
|
||||
"""Execute chunked SQL statement to avoid argument limit"""
|
||||
# SQLITE_MAX_VARIABLE_NUMBER,
|
||||
# unfortunately getting/setting isn't exposed to python
|
||||
assert sql_available
|
||||
sqlExecuteChunked.chunkSize = 999
|
||||
|
||||
if idCount == 0 or idCount > len(args):
|
||||
return 0
|
||||
|
||||
totalRowCount = 0
|
||||
with sqlLock:
|
||||
total_row_count = 0
|
||||
with sql_lock:
|
||||
for i in range(
|
||||
len(args) - idCount, len(args),
|
||||
sqlExecuteChunked.chunkSize - (len(args) - idCount)
|
||||
|
@ -68,22 +77,23 @@ def sqlExecuteChunked(sqlStatement, idCount, *args):
|
|||
i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount)
|
||||
]
|
||||
sqlSubmitQueue.put(
|
||||
sqlStatement.format(','.join('?' * len(chunk_slice)))
|
||||
sql_statement.format(','.join('?' * len(chunk_slice)))
|
||||
)
|
||||
# first static args, and then iterative chunk
|
||||
sqlSubmitQueue.put(
|
||||
args[0:len(args) - idCount] + chunk_slice
|
||||
)
|
||||
retVal = sqlReturnQueue.get()
|
||||
totalRowCount += retVal[1]
|
||||
ret_val = sqlReturnQueue.get()
|
||||
total_row_count += ret_val[1]
|
||||
sqlSubmitQueue.put('commit')
|
||||
return totalRowCount
|
||||
return total_row_count
|
||||
|
||||
|
||||
def sqlExecute(sqlStatement, *args):
|
||||
def sqlExecute(sql_statement, *args):
|
||||
"""Execute SQL statement (optionally with arguments)"""
|
||||
sqlLock.acquire()
|
||||
sqlSubmitQueue.put(sqlStatement)
|
||||
assert sql_available
|
||||
sql_lock.acquire()
|
||||
sqlSubmitQueue.put(sql_statement)
|
||||
|
||||
if args == ():
|
||||
sqlSubmitQueue.put('')
|
||||
|
@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args):
|
|||
sqlSubmitQueue.put(args)
|
||||
_, rowcount = sqlReturnQueue.get()
|
||||
sqlSubmitQueue.put('commit')
|
||||
sqlLock.release()
|
||||
sql_lock.release()
|
||||
return rowcount
|
||||
|
||||
|
||||
def sqlStoredProcedure(procName):
|
||||
"""Schedule procName to be run"""
|
||||
sqlLock.acquire()
|
||||
assert sql_available
|
||||
sql_lock.acquire()
|
||||
sqlSubmitQueue.put(procName)
|
||||
sqlLock.release()
|
||||
sql_lock.release()
|
||||
|
||||
|
||||
class SqlBulkExecute(object):
|
||||
"""This is used when you have to execute the same statement in a cycle."""
|
||||
|
||||
def __enter__(self):
|
||||
sqlLock.acquire()
|
||||
sql_lock.acquire()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, value, traceback):
|
||||
sqlSubmitQueue.put('commit')
|
||||
sqlLock.release()
|
||||
sql_lock.release()
|
||||
|
||||
@staticmethod
|
||||
def execute(sqlStatement, *args):
|
||||
def execute(sql_statement, *args):
|
||||
"""Used for statements that do not return results."""
|
||||
sqlSubmitQueue.put(sqlStatement)
|
||||
assert sql_available
|
||||
sqlSubmitQueue.put(sql_statement)
|
||||
|
||||
if args == ():
|
||||
sqlSubmitQueue.put('')
|
||||
|
|
|
@ -34,9 +34,6 @@ enableSTDIO = False
|
|||
"""enable STDIO threads"""
|
||||
curses = False
|
||||
|
||||
sqlReady = False
|
||||
"""set to true by `.threads.sqlThread` when ready for processing"""
|
||||
|
||||
maximumNumberOfHalfOpenConnections = 0
|
||||
|
||||
maximumLengthOfTimeToBotherResendingMessages = 0
|
||||
|
|
Reference in New Issue
Block a user