From a2ab0a02dc0b1ea4fa09481b92c0bc1e79b4b189 Mon Sep 17 00:00:00 2001 From: navjot Date: Mon, 28 Dec 2020 16:01:23 +0530 Subject: [PATCH 1/6] written test case randomtrackingdict dict module --- src/network/randomtrackingdict.py | 39 --------------------- src/storage/filesystem.py | 3 +- src/tests/test_randomtrackingdict.py | 52 ++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 40 deletions(-) create mode 100644 src/tests/test_randomtrackingdict.py diff --git a/src/network/randomtrackingdict.py b/src/network/randomtrackingdict.py index e87bf156..cb4e310d 100644 --- a/src/network/randomtrackingdict.py +++ b/src/network/randomtrackingdict.py @@ -1,7 +1,6 @@ """ Track randomize ordered dict """ -import random from threading import RLock from time import time @@ -128,41 +127,3 @@ class RandomTrackingDict(object): self.pendingLen += 1 self.lastPoll = time() return retval - - -if __name__ == '__main__': - - # pylint: disable=redefined-outer-name - def randString(): - """helper function for tests, generates a random string""" - retval = b'' - for _ in range(32): - retval += chr(random.randint(0, 255)) - return retval - - a = [] - k = RandomTrackingDict() - d = {} - - print "populating random tracking dict" - a.append(time()) - for i in range(50000): - k[randString()] = True - a.append(time()) - print "done" - - while k: - retval = k.randomKeys(1000) - if not retval: - print "error getting random keys" - try: - k.randomKeys(100) - print "bad" - except KeyError: - pass - for i in retval: - del k[i] - a.append(time()) - - for x in range(len(a) - 1): - print "%i: %.3f" % (x, a[x + 1] - a[x]) diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py index b19d9272..82ec9feb 100644 --- a/src/storage/filesystem.py +++ b/src/storage/filesystem.py @@ -7,6 +7,7 @@ from binascii import hexlify, unhexlify from os import listdir, makedirs, path, remove, rmdir from threading import RLock +from debug import logger from paths import lookupAppdataFolder from storage import InventoryItem, InventoryStorage @@ -162,7 +163,7 @@ class FilesystemInventory(InventoryStorage): newInventory[streamNumber][hashId] = InventoryItem( objectType, streamNumber, None, expiresTime, tag) except KeyError: - print "error loading %s" % (hexlify(hashId)) + logger.warning('error loading %s', hexlify(hashId)) self._inventory = newInventory # for i, v in self._inventory.items(): # print "loaded stream: %s, %i items" % (i, len(v)) diff --git a/src/tests/test_randomtrackingdict.py b/src/tests/test_randomtrackingdict.py new file mode 100644 index 00000000..09f8efad --- /dev/null +++ b/src/tests/test_randomtrackingdict.py @@ -0,0 +1,52 @@ +""" +Tests for RandomTrackingDict class +""" +import random +import unittest + +from time import time + + +class TestRandomTrackingDict(unittest.TestCase): + """ + Main protocol test case + """ + + @staticmethod + def randString(): + """helper function for tests, generates a random string""" + retval = b'' + for _ in range(32): + retval += chr(random.randint(0, 255)) + return retval + + def test_check_randomtrackingdict(self): + """Check the logic of RandomTrackingDict class""" + # from network import RandomTrackingDict + # from pybitmessage.network.randomtrackingdict import RandomTrackingDict + # from network.randomtrackingdict import RandomTrackingDict + from pybitmessage import network + a = [] + k = network.randomtrackingdict.RandomTrackingDict() + + a.append(time()) + for i in range(50000): + k[self.randString()] = True + a.append(time()) + + while k: + retval = k.randomKeys(1000) + if not retval: + self.fail("error getting random keys") + + try: + k.randomKeys(100) + self.fail("bad") + except KeyError: + pass + for i in retval: + del k[i] + a.append(time()) + + for x in range(len(a) - 1): + self.assertLess(a[x + 1] - a[x], 10) \ No newline at end of file From 46e2f04488b225b2fa06c645116e34fe32d8689f Mon Sep 17 00:00:00 2001 From: navjot Date: Mon, 28 Dec 2020 16:10:56 +0530 Subject: [PATCH 2/6] move randomtrackingdict.py out side the network dircetory --- src/{network => }/randomtrackingdict.py | 2 +- src/tests/test_randomtrackingdict.py | 7 ++----- 2 files changed, 3 insertions(+), 6 deletions(-) rename src/{network => }/randomtrackingdict.py (99%) diff --git a/src/network/randomtrackingdict.py b/src/randomtrackingdict.py similarity index 99% rename from src/network/randomtrackingdict.py rename to src/randomtrackingdict.py index cb4e310d..4b265f82 100644 --- a/src/network/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -4,7 +4,7 @@ Track randomize ordered dict from threading import RLock from time import time -import helper_random +from pybitmessage import helper_random class RandomTrackingDict(object): diff --git a/src/tests/test_randomtrackingdict.py b/src/tests/test_randomtrackingdict.py index 09f8efad..9cad05b3 100644 --- a/src/tests/test_randomtrackingdict.py +++ b/src/tests/test_randomtrackingdict.py @@ -22,12 +22,9 @@ class TestRandomTrackingDict(unittest.TestCase): def test_check_randomtrackingdict(self): """Check the logic of RandomTrackingDict class""" - # from network import RandomTrackingDict - # from pybitmessage.network.randomtrackingdict import RandomTrackingDict - # from network.randomtrackingdict import RandomTrackingDict - from pybitmessage import network + from pybitmessage.randomtrackingdict import RandomTrackingDict a = [] - k = network.randomtrackingdict.RandomTrackingDict() + k = RandomTrackingDict() a.append(time()) for i in range(50000): From 7a010441c3bffd60b217c15cc807877d40dd691d Mon Sep 17 00:00:00 2001 From: navjot Date: Tue, 29 Dec 2020 13:58:48 +0530 Subject: [PATCH 3/6] Fixed flake8 CQ issues --- src/randomtrackingdict.py | 2 +- src/storage/filesystem.py | 7 +++++-- src/tests/test_randomtrackingdict.py | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/randomtrackingdict.py b/src/randomtrackingdict.py index 4b265f82..cb4e310d 100644 --- a/src/randomtrackingdict.py +++ b/src/randomtrackingdict.py @@ -4,7 +4,7 @@ Track randomize ordered dict from threading import RLock from time import time -from pybitmessage import helper_random +import helper_random class RandomTrackingDict(object): diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py index 82ec9feb..150e8d9e 100644 --- a/src/storage/filesystem.py +++ b/src/storage/filesystem.py @@ -1,16 +1,18 @@ """ Module for using filesystem (directory with files) for inventory storage """ +import logging import string import time from binascii import hexlify, unhexlify from os import listdir, makedirs, path, remove, rmdir from threading import RLock -from debug import logger from paths import lookupAppdataFolder from storage import InventoryItem, InventoryStorage +logger = logging.getLogger('default') + class FilesystemInventory(InventoryStorage): """Filesystem for inventory storage""" @@ -163,7 +165,8 @@ class FilesystemInventory(InventoryStorage): newInventory[streamNumber][hashId] = InventoryItem( objectType, streamNumber, None, expiresTime, tag) except KeyError: - logger.warning('error loading %s', hexlify(hashId)) + logger.debug( + 'error loading %s', hexlify(hashId), exc_info=True) self._inventory = newInventory # for i, v in self._inventory.items(): # print "loaded stream: %s, %i items" % (i, len(v)) diff --git a/src/tests/test_randomtrackingdict.py b/src/tests/test_randomtrackingdict.py index 9cad05b3..40d2e01b 100644 --- a/src/tests/test_randomtrackingdict.py +++ b/src/tests/test_randomtrackingdict.py @@ -46,4 +46,4 @@ class TestRandomTrackingDict(unittest.TestCase): a.append(time()) for x in range(len(a) - 1): - self.assertLess(a[x + 1] - a[x], 10) \ No newline at end of file + self.assertLess(a[x + 1] - a[x], 10) From 184664d7583096e6cd7b828e08d112ff1727d81b Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 1 Jan 2021 13:51:35 +0100 Subject: [PATCH 4/6] SQL operations asserts and code quality - complain if trying to execute SQL statements without a running `.threads.sqlThread`. This is to give better test feedback if used incorrectly - refactor `.helper_sql.sql_ready` as a `threading.Event` - code quality --- src/class_singleWorker.py | 5 ++-- src/class_sqlThread.py | 3 +- src/helper_sql.py | 62 +++++++++++++++++++++++---------------- src/state.py | 3 -- 4 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 499cb82f..1e7552a8 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -16,6 +16,7 @@ import defaults import helper_inbox import helper_msgcoding import helper_random +import helper_sql import highlevelcrypto import l10n import proofofwork @@ -62,8 +63,8 @@ class singleWorker(StoppableThread): def run(self): # pylint: disable=attribute-defined-outside-init - while not state.sqlReady and state.shutdown == 0: - self.stop.wait(2) + while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0: + self.stop.wait(1.0) if state.shutdown > 0: return diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 7e9eb6c5..563dc472 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -28,6 +28,7 @@ class sqlThread(threading.Thread): def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" + helper_sql.sql_available = True self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() @@ -464,7 +465,7 @@ class sqlThread(threading.Thread): parameters = (int(time.time()),) self.cur.execute(item, parameters) - state.sqlReady = True + helper_sql.sql_ready.set() while True: item = helper_sql.sqlSubmitQueue.get() diff --git a/src/helper_sql.py b/src/helper_sql.py index 9b5dc29d..043bccf2 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue() """the queue for SQL""" sqlReturnQueue = Queue.Queue() """the queue for results""" -sqlLock = threading.Lock() +sql_lock = threading.Lock() +""" lock to prevent queueing a new request until the previous response + is available """ +sql_available = False +"""set to True by `.threads.sqlThread` immediately upon start""" +sql_ready = threading.Event() +"""set by `.threads.sqlThread` when ready for processing (after + initialization is done)""" -def sqlQuery(sqlStatement, *args): +def sqlQuery(sql_statement, *args): """ Query sqlite and return results - :param str sqlStatement: SQL statement string + :param str sql_statement: SQL statement string :param list args: SQL query parameters :rtype: list """ - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -44,46 +52,48 @@ def sqlQuery(sqlStatement, *args): else: sqlSubmitQueue.put(args) queryreturn, _ = sqlReturnQueue.get() - sqlLock.release() + sql_lock.release() return queryreturn -def sqlExecuteChunked(sqlStatement, idCount, *args): +def sqlExecuteChunked(sql_statement, idCount, *args): """Execute chunked SQL statement to avoid argument limit""" # SQLITE_MAX_VARIABLE_NUMBER, # unfortunately getting/setting isn't exposed to python + assert sql_available sqlExecuteChunked.chunkSize = 999 if idCount == 0 or idCount > len(args): return 0 - totalRowCount = 0 - with sqlLock: + total_row_count = 0 + with sql_lock: for i in range( - len(args) - idCount, len(args), - sqlExecuteChunked.chunkSize - (len(args) - idCount) + len(args) - idCount, len(args), + sqlExecuteChunked.chunkSize - (len(args) - idCount) ): chunk_slice = args[ i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) ] sqlSubmitQueue.put( - sqlStatement.format(','.join('?' * len(chunk_slice))) + sql_statement.format(','.join('?' * len(chunk_slice))) ) # first static args, and then iterative chunk sqlSubmitQueue.put( args[0:len(args) - idCount] + chunk_slice ) - retVal = sqlReturnQueue.get() - totalRowCount += retVal[1] + ret_val = sqlReturnQueue.get() + total_row_count += ret_val[1] sqlSubmitQueue.put('commit') - return totalRowCount + return total_row_count -def sqlExecute(sqlStatement, *args): +def sqlExecute(sql_statement, *args): """Execute SQL statement (optionally with arguments)""" - sqlLock.acquire() - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sql_lock.acquire() + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') @@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args): sqlSubmitQueue.put(args) _, rowcount = sqlReturnQueue.get() sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() return rowcount def sqlStoredProcedure(procName): """Schedule procName to be run""" - sqlLock.acquire() + assert sql_available + sql_lock.acquire() sqlSubmitQueue.put(procName) - sqlLock.release() + sql_lock.release() class SqlBulkExecute(object): """This is used when you have to execute the same statement in a cycle.""" def __enter__(self): - sqlLock.acquire() + sql_lock.acquire() return self def __exit__(self, exc_type, value, traceback): sqlSubmitQueue.put('commit') - sqlLock.release() + sql_lock.release() @staticmethod - def execute(sqlStatement, *args): + def execute(sql_statement, *args): """Used for statements that do not return results.""" - sqlSubmitQueue.put(sqlStatement) + assert sql_available + sqlSubmitQueue.put(sql_statement) if args == (): sqlSubmitQueue.put('') diff --git a/src/state.py b/src/state.py index 8bfaf7b9..aa703fae 100644 --- a/src/state.py +++ b/src/state.py @@ -34,9 +34,6 @@ enableSTDIO = False """enable STDIO threads""" curses = False -sqlReady = False -"""set to true by `.threads.sqlThread` when ready for processing""" - maximumNumberOfHalfOpenConnections = 0 maximumLengthOfTimeToBotherResendingMessages = 0 From e084d7f53c895695db08e4a592d17eb29df50527 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 3 Jan 2021 11:14:27 +0100 Subject: [PATCH 5/6] `objectProcessor` waits for `sqlThread` ready - fixes #1702 --- src/class_objectProcessor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 97242a7f..cd38b5cc 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -29,7 +29,7 @@ from addresses import ( ) from bmconfigparser import BMConfigParser from fallback import RIPEMD160Hash -from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery +from helper_sql import sql_ready, SqlBulkExecute, sqlExecute, sqlQuery from network import bmproto, knownnodes from network.node import Peer # pylint: disable=too-many-locals, too-many-return-statements, too-many-branches, too-many-statements @@ -50,6 +50,7 @@ class objectProcessor(threading.Thread): # objectProcessorQueue. Assuming that Bitmessage wasn't closed # forcefully, it should have saved the data in the queue into the # objectprocessorqueue table. Let's pull it out. + sql_ready.wait() queryreturn = sqlQuery( '''SELECT objecttype, data FROM objectprocessorqueue''') for row in queryreturn: From a0e1c0041f154dd2be8e28c0363e3afd7a944d33 Mon Sep 17 00:00:00 2001 From: 813492291816 Date: Thu, 7 Jan 2021 19:51:12 -0500 Subject: [PATCH 6/6] Add missing TTL to API sendMessage --- src/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api.py b/src/api.py index c93eb4d8..9c0c18a7 100644 --- a/src/api.py +++ b/src/api.py @@ -1112,7 +1112,7 @@ class BMRPCDispatcher(object): ackdata = helper_sent.insert( toAddress=toAddress, fromAddress=fromAddress, - subject=subject, message=message, encoding=encodingType) + subject=subject, message=message, encoding=encodingType, ttl=TTL) toLabel = '' queryreturn = sqlQuery(