This commit is contained in:
surbhicis 2021-01-12 13:19:25 +05:30
commit 61b7da8828
Signed by untrusted user: surbhicis
GPG Key ID: 48A8C2D218DE7B0B
9 changed files with 99 additions and 73 deletions

View File

@ -1112,7 +1112,7 @@ class BMRPCDispatcher(object):
ackdata = helper_sent.insert( ackdata = helper_sent.insert(
toAddress=toAddress, fromAddress=fromAddress, toAddress=toAddress, fromAddress=fromAddress,
subject=subject, message=message, encoding=encodingType) subject=subject, message=message, encoding=encodingType, ttl=TTL)
toLabel = '' toLabel = ''
queryreturn = sqlQuery( queryreturn = sqlQuery(

View File

@ -29,7 +29,7 @@ from addresses import (
) )
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from fallback import RIPEMD160Hash from fallback import RIPEMD160Hash
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery from helper_sql import sql_ready, SqlBulkExecute, sqlExecute, sqlQuery
from network import bmproto, knownnodes from network import bmproto, knownnodes
from network.node import Peer from network.node import Peer
# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches, too-many-statements # pylint: disable=too-many-locals, too-many-return-statements, too-many-branches, too-many-statements
@ -50,6 +50,7 @@ class objectProcessor(threading.Thread):
# objectProcessorQueue. Assuming that Bitmessage wasn't closed # objectProcessorQueue. Assuming that Bitmessage wasn't closed
# forcefully, it should have saved the data in the queue into the # forcefully, it should have saved the data in the queue into the
# objectprocessorqueue table. Let's pull it out. # objectprocessorqueue table. Let's pull it out.
sql_ready.wait()
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''') '''SELECT objecttype, data FROM objectprocessorqueue''')
for row in queryreturn: for row in queryreturn:

View File

@ -16,6 +16,7 @@ import defaults
import helper_inbox import helper_inbox
import helper_msgcoding import helper_msgcoding
import helper_random import helper_random
import helper_sql
import highlevelcrypto import highlevelcrypto
import l10n import l10n
import proofofwork import proofofwork
@ -62,8 +63,8 @@ class singleWorker(StoppableThread):
def run(self): def run(self):
# pylint: disable=attribute-defined-outside-init # pylint: disable=attribute-defined-outside-init
while not state.sqlReady and state.shutdown == 0: while not helper_sql.sql_ready.wait(1.0) and state.shutdown == 0:
self.stop.wait(2) self.stop.wait(1.0)
if state.shutdown > 0: if state.shutdown > 0:
return return

View File

@ -28,6 +28,7 @@ class sqlThread(threading.Thread):
def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements def run(self): # pylint: disable=too-many-locals, too-many-branches, too-many-statements
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`""" """Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
helper_sql.sql_available = True
self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn = sqlite3.connect(state.appdata + 'messages.dat')
self.conn.text_factory = str self.conn.text_factory = str
self.cur = self.conn.cursor() self.cur = self.conn.cursor()
@ -464,7 +465,7 @@ class sqlThread(threading.Thread):
parameters = (int(time.time()),) parameters = (int(time.time()),)
self.cur.execute(item, parameters) self.cur.execute(item, parameters)
state.sqlReady = True helper_sql.sql_ready.set()
while True: while True:
item = helper_sql.sqlSubmitQueue.get() item = helper_sql.sqlSubmitQueue.get()

View File

@ -23,19 +23,27 @@ sqlSubmitQueue = Queue.Queue()
"""the queue for SQL""" """the queue for SQL"""
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
"""the queue for results""" """the queue for results"""
sqlLock = threading.Lock() sql_lock = threading.Lock()
""" lock to prevent queueing a new request until the previous response
is available """
sql_available = False
"""set to True by `.threads.sqlThread` immediately upon start"""
sql_ready = threading.Event()
"""set by `.threads.sqlThread` when ready for processing (after
initialization is done)"""
def sqlQuery(sqlStatement, *args): def sqlQuery(sql_statement, *args):
""" """
Query sqlite and return results Query sqlite and return results
:param str sqlStatement: SQL statement string :param str sql_statement: SQL statement string
:param list args: SQL query parameters :param list args: SQL query parameters
:rtype: list :rtype: list
""" """
sqlLock.acquire() assert sql_available
sqlSubmitQueue.put(sqlStatement) sql_lock.acquire()
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')
@ -44,22 +52,23 @@ def sqlQuery(sqlStatement, *args):
else: else:
sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
queryreturn, _ = sqlReturnQueue.get() queryreturn, _ = sqlReturnQueue.get()
sqlLock.release() sql_lock.release()
return queryreturn return queryreturn
def sqlExecuteChunked(sqlStatement, idCount, *args): def sqlExecuteChunked(sql_statement, idCount, *args):
"""Execute chunked SQL statement to avoid argument limit""" """Execute chunked SQL statement to avoid argument limit"""
# SQLITE_MAX_VARIABLE_NUMBER, # SQLITE_MAX_VARIABLE_NUMBER,
# unfortunately getting/setting isn't exposed to python # unfortunately getting/setting isn't exposed to python
assert sql_available
sqlExecuteChunked.chunkSize = 999 sqlExecuteChunked.chunkSize = 999
if idCount == 0 or idCount > len(args): if idCount == 0 or idCount > len(args):
return 0 return 0
totalRowCount = 0 total_row_count = 0
with sqlLock: with sql_lock:
for i in range( for i in range(
len(args) - idCount, len(args), len(args) - idCount, len(args),
sqlExecuteChunked.chunkSize - (len(args) - idCount) sqlExecuteChunked.chunkSize - (len(args) - idCount)
@ -68,22 +77,23 @@ def sqlExecuteChunked(sqlStatement, idCount, *args):
i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount) i:i + sqlExecuteChunked.chunkSize - (len(args) - idCount)
] ]
sqlSubmitQueue.put( sqlSubmitQueue.put(
sqlStatement.format(','.join('?' * len(chunk_slice))) sql_statement.format(','.join('?' * len(chunk_slice)))
) )
# first static args, and then iterative chunk # first static args, and then iterative chunk
sqlSubmitQueue.put( sqlSubmitQueue.put(
args[0:len(args) - idCount] + chunk_slice args[0:len(args) - idCount] + chunk_slice
) )
retVal = sqlReturnQueue.get() ret_val = sqlReturnQueue.get()
totalRowCount += retVal[1] total_row_count += ret_val[1]
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
return totalRowCount return total_row_count
def sqlExecute(sqlStatement, *args): def sqlExecute(sql_statement, *args):
"""Execute SQL statement (optionally with arguments)""" """Execute SQL statement (optionally with arguments)"""
sqlLock.acquire() assert sql_available
sqlSubmitQueue.put(sqlStatement) sql_lock.acquire()
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')
@ -91,32 +101,34 @@ def sqlExecute(sqlStatement, *args):
sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
_, rowcount = sqlReturnQueue.get() _, rowcount = sqlReturnQueue.get()
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
sqlLock.release() sql_lock.release()
return rowcount return rowcount
def sqlStoredProcedure(procName): def sqlStoredProcedure(procName):
"""Schedule procName to be run""" """Schedule procName to be run"""
sqlLock.acquire() assert sql_available
sql_lock.acquire()
sqlSubmitQueue.put(procName) sqlSubmitQueue.put(procName)
sqlLock.release() sql_lock.release()
class SqlBulkExecute(object): class SqlBulkExecute(object):
"""This is used when you have to execute the same statement in a cycle.""" """This is used when you have to execute the same statement in a cycle."""
def __enter__(self): def __enter__(self):
sqlLock.acquire() sql_lock.acquire()
return self return self
def __exit__(self, exc_type, value, traceback): def __exit__(self, exc_type, value, traceback):
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
sqlLock.release() sql_lock.release()
@staticmethod @staticmethod
def execute(sqlStatement, *args): def execute(sql_statement, *args):
"""Used for statements that do not return results.""" """Used for statements that do not return results."""
sqlSubmitQueue.put(sqlStatement) assert sql_available
sqlSubmitQueue.put(sql_statement)
if args == (): if args == ():
sqlSubmitQueue.put('') sqlSubmitQueue.put('')

View File

@ -1,7 +1,6 @@
""" """
Track randomize ordered dict Track randomize ordered dict
""" """
import random
from threading import RLock from threading import RLock
from time import time from time import time
@ -128,41 +127,3 @@ class RandomTrackingDict(object):
self.pendingLen += 1 self.pendingLen += 1
self.lastPoll = time() self.lastPoll = time()
return retval return retval
if __name__ == '__main__':
# pylint: disable=redefined-outer-name
def randString():
"""helper function for tests, generates a random string"""
retval = b''
for _ in range(32):
retval += chr(random.randint(0, 255))
return retval
a = []
k = RandomTrackingDict()
d = {}
print "populating random tracking dict"
a.append(time())
for i in range(50000):
k[randString()] = True
a.append(time())
print "done"
while k:
retval = k.randomKeys(1000)
if not retval:
print "error getting random keys"
try:
k.randomKeys(100)
print "bad"
except KeyError:
pass
for i in retval:
del k[i]
a.append(time())
for x in range(len(a) - 1):
print "%i: %.3f" % (x, a[x + 1] - a[x])

View File

@ -34,9 +34,6 @@ enableSTDIO = False
"""enable STDIO threads""" """enable STDIO threads"""
curses = False curses = False
sqlReady = False
"""set to true by `.threads.sqlThread` when ready for processing"""
maximumNumberOfHalfOpenConnections = 0 maximumNumberOfHalfOpenConnections = 0
maximumLengthOfTimeToBotherResendingMessages = 0 maximumLengthOfTimeToBotherResendingMessages = 0

View File

@ -1,6 +1,7 @@
""" """
Module for using filesystem (directory with files) for inventory storage Module for using filesystem (directory with files) for inventory storage
""" """
import logging
import string import string
import time import time
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
@ -10,6 +11,8 @@ from threading import RLock
from paths import lookupAppdataFolder from paths import lookupAppdataFolder
from storage import InventoryItem, InventoryStorage from storage import InventoryItem, InventoryStorage
logger = logging.getLogger('default')
class FilesystemInventory(InventoryStorage): class FilesystemInventory(InventoryStorage):
"""Filesystem for inventory storage""" """Filesystem for inventory storage"""
@ -162,7 +165,8 @@ class FilesystemInventory(InventoryStorage):
newInventory[streamNumber][hashId] = InventoryItem( newInventory[streamNumber][hashId] = InventoryItem(
objectType, streamNumber, None, expiresTime, tag) objectType, streamNumber, None, expiresTime, tag)
except KeyError: except KeyError:
print "error loading %s" % (hexlify(hashId)) logger.debug(
'error loading %s', hexlify(hashId), exc_info=True)
self._inventory = newInventory self._inventory = newInventory
# for i, v in self._inventory.items(): # for i, v in self._inventory.items():
# print "loaded stream: %s, %i items" % (i, len(v)) # print "loaded stream: %s, %i items" % (i, len(v))

View File

@ -0,0 +1,49 @@
"""
Tests for RandomTrackingDict class
"""
import random
import unittest
from time import time
class TestRandomTrackingDict(unittest.TestCase):
"""
Main protocol test case
"""
@staticmethod
def randString():
"""helper function for tests, generates a random string"""
retval = b''
for _ in range(32):
retval += chr(random.randint(0, 255))
return retval
def test_check_randomtrackingdict(self):
"""Check the logic of RandomTrackingDict class"""
from pybitmessage.randomtrackingdict import RandomTrackingDict
a = []
k = RandomTrackingDict()
a.append(time())
for i in range(50000):
k[self.randString()] = True
a.append(time())
while k:
retval = k.randomKeys(1000)
if not retval:
self.fail("error getting random keys")
try:
k.randomKeys(100)
self.fail("bad")
except KeyError:
pass
for i in retval:
del k[i]
a.append(time())
for x in range(len(a) - 1):
self.assertLess(a[x + 1] - a[x], 10)