diff --git a/src/api.py b/src/api.py index c93eb4d8..9c0c18a7 100644 --- a/src/api.py +++ b/src/api.py @@ -1112,7 +1112,7 @@ class BMRPCDispatcher(object): ackdata = helper_sent.insert( toAddress=toAddress, fromAddress=fromAddress, - subject=subject, message=message, encoding=encodingType) + subject=subject, message=message, encoding=encodingType, ttl=TTL) toLabel = '' queryreturn = sqlQuery( diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 97242a7f..cd38b5cc 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -29,7 +29,7 @@ from addresses import ( ) from bmconfigparser import BMConfigParser from fallback import RIPEMD160Hash -from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery +from helper_sql import sql_ready, SqlBulkExecute, sqlExecute, sqlQuery from network import bmproto, knownnodes from network.node import Peer # pylint: disable=too-many-locals, too-many-return-statements, too-many-branches, too-many-statements @@ -50,6 +50,7 @@ class objectProcessor(threading.Thread): # objectProcessorQueue. Assuming that Bitmessage wasn't closed # forcefully, it should have saved the data in the queue into the # objectprocessorqueue table. Let's pull it out. + sql_ready.wait() queryreturn = sqlQuery( '''SELECT objecttype, data FROM objectprocessorqueue''') for row in queryreturn: diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 499cb82f..1e7552a8 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -16,6 +16,7 @@ import defaults import helper_inbox import helper_msgcoding import helper_random +import helper_sql import highlevelcrypto import l10n import proofofwork @@ -62,8 +63,8 @@ class singleWorker(StoppableThread): def run(self): # pylint: disable=attribute-defined-outside-init - while not state.sqlReady and state.shutdown == 0: - self.stop.wait(2) + while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: + self.stop.wait(1.0) if state.shutdown > 0: return diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 7e9eb6c5..563dc472 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -28,6 +28,7 @@ class sqlThread(threading.Thread): def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" + helper_sql.sql_available = True self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() @@ -464,7 +465,7 @@ class sqlThread(threading.Thread): parameters = (int(time.time()),) self.cur.execute(item, parameters) - state.sqlReady = True + helper_sql.sql_ready.set() while True: item = helper_sql.sqlSubmitQueue.get() diff --git a/src/helper_sql.py b/src/helper_sql.py index 9b5dc29d..043bccf2 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue() """the queue for SQL""" sqlReturnQueue = Queue.Queue() """the queue for results""" -sqlLock = threading.Lock() +sql_lock = threading.Lock() +""" lock to prevent queueing a new request until the previous response + is available """ +sql_available = False +"""set to True by `.threads.sqlThread` immediately upon start""" +sql_ready = threading.Event() +"""set by `.threads.sqlThread` when ready for processing (after + initialization is done)""" -def sqlQuery(sqlStatement, *args): +def sqlQuery(sql_statement, *args): """ Query sqlite and return results - :param str sqlStatement: SQL statement string + :param str sql_statement: SQL statement string :param list args: SQL query parameters :rtype: list """ - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -44,46 +52,48 @@ def sqlQuery(sqlStatement, *args): else: sqlSubmitQueue.put(args) queryreturn, _ = sqlReturnQueue.get() - sqlLock.release() + sql_lock.release() return queryreturn -def sqlExecuteChunked(sqlStatement, idCount, *args): +def sqlExecuteChunked(sql_statement, idCount, *args): """Execute chunked SQL statement to avoid argument limit""" # SQLITE_MAX_VARIABLE_NUMBER, # unfortunately getting/setting isn't exposed to python + assert sql_available sqlExecuteChunked.chunkSize = 999 if idCount == 0 or idCount > len(args): return 0 - totalRowCount = 0 - with sqlLock: + total_row_count = 0 + with sql_lock: for i in range( - len(args) - idCount, len(args), - sqlExecuteChunked.chunkSize - (len(args) - idCount) + len(args) - idCount, len(args), + sqlExecuteChunked.chunkSize - (len(args) - idCount) ): chunk_slice = args[ i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) ] sqlSubmitQueue.put( - sqlStatement.format(','.join('?' * len(chunk_slice))) + sql_statement.format(','.join('?' * len(chunk_slice))) ) # first static args, and then iterative chunk sqlSubmitQueue.put( args[0:len(args) - idCount] + chunk_slice ) - retVal = sqlReturnQueue.get() - totalRowCount += retVal[1] + ret_val = sqlReturnQueue.get() + total_row_count += ret_val[1] sqlSubmitQueue.put('commit') - return totalRowCount + return total_row_count -def sqlExecute(sqlStatement, *args): +def sqlExecute(sql_statement, *args): """Execute SQL statement (optionally with arguments)""" - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args): sqlSubmitQueue.put(args) _, rowcount = sqlReturnQueue.get() sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() return rowcount def sqlStoredProcedure(procName): """Schedule procName to be run""" - sqlLock.acquire() + assert sql_available + sql_lock.acquire() sqlSubmitQueue.put(procName) - sqlLock.release() + sql_lock.release() class SqlBulkExecute(object): """This is used when you have to execute the same statement in a cycle.""" def __enter__(self): - sqlLock.acquire() + sql_lock.acquire() return self def __exit__(self, exc_type, value, traceback): sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() @staticmethod - def execute(sqlStatement, *args): + def execute(sql_statement, *args): """Used for statements that do not return results.""" - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') diff --git a/src/network/randomtrackingdict.py b/src/randomtrackingdict.py similarity index 83% rename from src/network/randomtrackingdict.py rename to src/randomtrackingdict.py index e87bf156..cb4e310d 100644 --- a/src/network/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -1,7 +1,6 @@ """ Track randomize ordered dict """ -import random from threading import RLock from time import time @@ -128,41 +127,3 @@ class RandomTrackingDict(object): self.pendingLen += 1 self.lastPoll = time() return retval - - -if __name__ == '__main__': - - # pylint: disable=redefined-outer-name - def randString(): - """helper function for tests, generates a random string""" - retval = b'' - for _ in range(32): - retval += chr(random.randint(0, 255)) - return retval - - a = [] - k = RandomTrackingDict() - d = {} - - print "populating random tracking dict" - a.append(time()) - for i in range(50000): - k[randString()] = True - a.append(time()) - print "done" - - while k: - retval = k.randomKeys(1000) - if not retval: - print "error getting random keys" - try: - k.randomKeys(100) - print "bad" - except KeyError: - pass - for i in retval: - del k[i] - a.append(time()) - - for x in range(len(a) - 1): - print "%i: %.3f" % (x, a[x + 1] - a[x]) diff --git a/src/state.py b/src/state.py index 8bfaf7b9..aa703fae 100644 --- a/src/state.py +++ b/src/state.py @@ -34,9 +34,6 @@ enableSTDIO = False """enable STDIO threads""" curses = False -sqlReady = False -"""set to true by `.threads.sqlThread` when ready for processing""" - maximumNumberOfHalfOpenConnections = 0 maximumLengthOfTimeToBotherResendingMessages = 0 diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py index b19d9272..150e8d9e 100644 --- a/src/storage/filesystem.py +++ b/src/storage/filesystem.py @@ -1,6 +1,7 @@ """ Module for using filesystem (directory with files) for inventory storage """ +import logging import string import time from binascii import hexlify, unhexlify @@ -10,6 +11,8 @@ from threading import RLock from paths import lookupAppdataFolder from storage import InventoryItem, InventoryStorage +logger = logging.getLogger('default') + class FilesystemInventory(InventoryStorage): """Filesystem for inventory storage""" @@ -162,7 +165,8 @@ class FilesystemInventory(InventoryStorage): newInventory[streamNumber][hashId] = InventoryItem( objectType, streamNumber, None, expiresTime, tag) except KeyError: - print "error loading %s" % (hexlify(hashId)) + logger.debug( + 'error loading %s', hexlify(hashId), exc_info=True) self._inventory = newInventory # for i, v in self._inventory.items(): # print "loaded stream: %s, %i items" % (i, len(v)) diff --git a/src/tests/test_randomtrackingdict.py b/src/tests/test_randomtrackingdict.py new file mode 100644 index 00000000..40d2e01b --- /dev/null +++ b/src/tests/test_randomtrackingdict.py @@ -0,0 +1,49 @@ +""" +Tests for RandomTrackingDict class +""" +import random +import unittest + +from time import time + + +class TestRandomTrackingDict(unittest.TestCase): + """ + Main protocol test case + """ + + @staticmethod + def randString(): + """helper function for tests, generates a random string""" + retval = b'' + for _ in range(32): + retval += chr(random.randint(0, 255)) + return retval + + def test_check_randomtrackingdict(self): + """Check the logic of RandomTrackingDict class""" + from pybitmessage.randomtrackingdict import RandomTrackingDict + a = [] + k = RandomTrackingDict() + + a.append(time()) + for i in range(50000): + k[self.randString()] = True + a.append(time()) + + while k: + retval = k.randomKeys(1000) + if not retval: + self.fail("error getting random keys") + + try: + k.randomKeys(100) + self.fail("bad") + except KeyError: + pass + for i in retval: + del k[i] + a.append(time()) + + for x in range(len(a) - 1): + self.assertLess(a[x + 1] - a[x], 10)