have shared.py use helper_sql and move the sql queues and locks to helper_sql

This commit is contained in:
Grant T. Olson 2013-08-29 08:03:45 -04:00
parent 92c1368691
commit 7499de4e13
2 changed files with 40 additions and 67 deletions

View File

@ -1,33 +1,38 @@
import shared import threading
import Queue
sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread.
sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock()
def sqlQuery(sqlStatement, *args): def sqlQuery(sqlStatement, *args):
shared.sqlLock.acquire() sqlLock.acquire()
shared.sqlSubmitQueue.put(sqlStatement) sqlSubmitQueue.put(sqlStatement)
if args == (): if args == ():
shared.sqlSubmitQueue.put('') sqlSubmitQueue.put('')
else: else:
shared.sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
queryreturn = shared.sqlReturnQueue.get() queryreturn = sqlReturnQueue.get()
shared.sqlLock.release() sqlLock.release()
return queryreturn return queryreturn
def sqlExecute(sqlStatement, *args): def sqlExecute(sqlStatement, *args):
shared.sqlLock.acquire() sqlLock.acquire()
shared.sqlSubmitQueue.put(sqlStatement) sqlSubmitQueue.put(sqlStatement)
if args == (): if args == ():
shared.sqlSubmitQueue.put('') sqlSubmitQueue.put('')
else: else:
shared.sqlSubmitQueue.put(args) sqlSubmitQueue.put(args)
shared.sqlReturnQueue.get() sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
shared.sqlLock.release() sqlLock.release()
def sqlStoredProcedure(procName): def sqlStoredProcedure(procName):
shared.sqlLock.acquire() sqlLock.acquire()
shared.sqlSubmitQueue.put(procName) sqlSubmitQueue.put(procName)
shared.sqlLock.release() sqlLock.release()

View File

@ -27,7 +27,7 @@ from addresses import *
import highlevelcrypto import highlevelcrypto
import shared import shared
import helper_startup import helper_startup
from helper_sql import *
config = ConfigParser.SafeConfigParser() config = ConfigParser.SafeConfigParser()
@ -36,9 +36,6 @@ MyECSubscriptionCryptorObjects = {}
myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself. myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself.
broadcastSendersForWhichImWatching = {} broadcastSendersForWhichImWatching = {}
workerQueue = Queue.Queue() workerQueue = Queue.Queue()
sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread.
sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock()
UISignalQueue = Queue.Queue() UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue()
knownNodesLock = threading.Lock() knownNodesLock = threading.Lock()
@ -80,12 +77,7 @@ networkDefaultPayloadLengthExtraBytes = 14000 #To make sending short messages a
namecoinDefaultRpcPort = "8336" namecoinDefaultRpcPort = "8336"
def isInSqlInventory(hash): def isInSqlInventory(hash):
t = (hash,) queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
return False return False
else: else:
@ -161,41 +153,29 @@ def lookupAppdataFolder():
return dataFolder return dataFolder
def isAddressInMyAddressBook(address): def isAddressInMyAddressBook(address):
t = (address,) queryreturn = sqlQuery(
sqlLock.acquire() '''select address from addressbook where address=?''',
sqlSubmitQueue.put('''select address from addressbook where address=?''') address)
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
return queryreturn != [] return queryreturn != []
#At this point we should really just have a isAddressInMy(book, address)... #At this point we should really just have a isAddressInMy(book, address)...
def isAddressInMySubscriptionsList(address): def isAddressInMySubscriptionsList(address):
t = (str(address),) # As opposed to Qt str queryreturn = (
sqlLock.acquire() '''select * from subscriptions where address=?''',
sqlSubmitQueue.put('''select * from subscriptions where address=?''') str(address))
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
return queryreturn != [] return queryreturn != []
def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address): def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address):
if isAddressInMyAddressBook(address): if isAddressInMyAddressBook(address):
return True return True
sqlLock.acquire() queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address)
sqlSubmitQueue.put('''SELECT address FROM whitelist where address=? and enabled = '1' ''')
sqlSubmitQueue.put((address,))
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn <> []: if queryreturn <> []:
return True return True
sqlLock.acquire() queryreturn = (
sqlSubmitQueue.put('''select address from subscriptions where address=? and enabled = '1' ''') '''select address from subscriptions where address=? and enabled = '1' ''',
sqlSubmitQueue.put((address,)) address)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn <> []: if queryreturn <> []:
return True return True
return False return False
@ -259,11 +239,7 @@ def reloadBroadcastSendersForWhichImWatching():
logger.debug('reloading subscriptions...') logger.debug('reloading subscriptions...')
broadcastSendersForWhichImWatching.clear() broadcastSendersForWhichImWatching.clear()
MyECSubscriptionCryptorObjects.clear() MyECSubscriptionCryptorObjects.clear()
sqlLock.acquire() queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1')
sqlSubmitQueue.put('SELECT address FROM subscriptions where enabled=1')
sqlSubmitQueue.put('')
queryreturn = sqlReturnQueue.get()
sqlLock.release()
for row in queryreturn: for row in queryreturn:
address, = row address, = row
status,addressVersionNumber,streamNumber,hash = decodeAddress(address) status,addressVersionNumber,streamNumber,hash = decodeAddress(address)
@ -297,12 +273,8 @@ def doCleanShutdown():
# This one last useless query will guarantee that the previous flush committed before we close # This one last useless query will guarantee that the previous flush committed before we close
# the program. # the program.
sqlLock.acquire() sqlQuery('SELECT address FROM subscriptions')
sqlSubmitQueue.put('SELECT address FROM subscriptions') sqlStoredProcedure('exit')
sqlSubmitQueue.put('')
sqlReturnQueue.get()
sqlSubmitQueue.put('exit')
sqlLock.release()
logger.info('Finished flushing inventory.') logger.info('Finished flushing inventory.')
# Wait long enough to guarantee that any running proof of work worker threads will check the # Wait long enough to guarantee that any running proof of work worker threads will check the
@ -323,16 +295,12 @@ def broadcastToSendDataQueues(data):
def flushInventory(): def flushInventory():
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
sqlLock.acquire()
for hash, storedValue in inventory.items(): for hash, storedValue in inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
t = (hash,objectType,streamNumber,payload,receivedTime,'') t = ()
sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') sqlExecute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
sqlSubmitQueue.put(t) hash,objectType,streamNumber,payload,receivedTime,'')
sqlReturnQueue.get()
del inventory[hash] del inventory[hash]
sqlSubmitQueue.put('commit')
sqlLock.release()
def fixPotentiallyInvalidUTF8Data(text): def fixPotentiallyInvalidUTF8Data(text):
try: try: