diff --git a/src/helper_sql.py b/src/helper_sql.py index f32a31d4..706fce0c 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -1,33 +1,38 @@ -import shared +import threading +import Queue + +sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread. +sqlReturnQueue = Queue.Queue() +sqlLock = threading.Lock() def sqlQuery(sqlStatement, *args): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put(sqlStatement) + sqlLock.acquire() + sqlSubmitQueue.put(sqlStatement) if args == (): - shared.sqlSubmitQueue.put('') + sqlSubmitQueue.put('') else: - shared.sqlSubmitQueue.put(args) + sqlSubmitQueue.put(args) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlReturnQueue.get() + sqlLock.release() return queryreturn def sqlExecute(sqlStatement, *args): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put(sqlStatement) + sqlLock.acquire() + sqlSubmitQueue.put(sqlStatement) if args == (): - shared.sqlSubmitQueue.put('') + sqlSubmitQueue.put('') else: - shared.sqlSubmitQueue.put(args) + sqlSubmitQueue.put(args) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlReturnQueue.get() + sqlSubmitQueue.put('commit') + sqlLock.release() def sqlStoredProcedure(procName): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put(procName) - shared.sqlLock.release() + sqlLock.acquire() + sqlSubmitQueue.put(procName) + sqlLock.release() diff --git a/src/shared.py b/src/shared.py index f20c49e2..0aa69558 100644 --- a/src/shared.py +++ b/src/shared.py @@ -27,7 +27,7 @@ from addresses import * import highlevelcrypto import shared import helper_startup - +from helper_sql import * config = ConfigParser.SafeConfigParser() @@ -36,9 +36,6 @@ MyECSubscriptionCryptorObjects = {} myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself. broadcastSendersForWhichImWatching = {} workerQueue = Queue.Queue() -sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread. -sqlReturnQueue = Queue.Queue() -sqlLock = threading.Lock() UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() knownNodesLock = threading.Lock() @@ -80,12 +77,7 @@ networkDefaultPayloadLengthExtraBytes = 14000 #To make sending short messages a namecoinDefaultRpcPort = "8336" def isInSqlInventory(hash): - t = (hash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash) if queryreturn == []: return False else: @@ -161,41 +153,29 @@ def lookupAppdataFolder(): return dataFolder def isAddressInMyAddressBook(address): - t = (address,) - sqlLock.acquire() - sqlSubmitQueue.put('''select address from addressbook where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery( + '''select address from addressbook where address=?''', + address) return queryreturn != [] #At this point we should really just have a isAddressInMy(book, address)... def isAddressInMySubscriptionsList(address): - t = (str(address),) # As opposed to Qt str - sqlLock.acquire() - sqlSubmitQueue.put('''select * from subscriptions where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = ( + '''select * from subscriptions where address=?''', + str(address)) return queryreturn != [] def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address): if isAddressInMyAddressBook(address): return True - sqlLock.acquire() - sqlSubmitQueue.put('''SELECT address FROM whitelist where address=? and enabled = '1' ''') - sqlSubmitQueue.put((address,)) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address) if queryreturn <> []: return True - sqlLock.acquire() - sqlSubmitQueue.put('''select address from subscriptions where address=? and enabled = '1' ''') - sqlSubmitQueue.put((address,)) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = ( + '''select address from subscriptions where address=? and enabled = '1' ''', + address) if queryreturn <> []: return True return False @@ -259,11 +239,7 @@ def reloadBroadcastSendersForWhichImWatching(): logger.debug('reloading subscriptions...') broadcastSendersForWhichImWatching.clear() MyECSubscriptionCryptorObjects.clear() - sqlLock.acquire() - sqlSubmitQueue.put('SELECT address FROM subscriptions where enabled=1') - sqlSubmitQueue.put('') - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1') for row in queryreturn: address, = row status,addressVersionNumber,streamNumber,hash = decodeAddress(address) @@ -297,12 +273,8 @@ def doCleanShutdown(): # This one last useless query will guarantee that the previous flush committed before we close # the program. - sqlLock.acquire() - sqlSubmitQueue.put('SELECT address FROM subscriptions') - sqlSubmitQueue.put('') - sqlReturnQueue.get() - sqlSubmitQueue.put('exit') - sqlLock.release() + sqlQuery('SELECT address FROM subscriptions') + sqlStoredProcedure('exit') logger.info('Finished flushing inventory.') # Wait long enough to guarantee that any running proof of work worker threads will check the @@ -323,16 +295,12 @@ def broadcastToSendDataQueues(data): def flushInventory(): #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. - sqlLock.acquire() for hash, storedValue in inventory.items(): objectType, streamNumber, payload, receivedTime = storedValue - t = (hash,objectType,streamNumber,payload,receivedTime,'') - sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() + t = () + sqlExecute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash,objectType,streamNumber,payload,receivedTime,'') del inventory[hash] - sqlSubmitQueue.put('commit') - sqlLock.release() def fixPotentiallyInvalidUTF8Data(text): try: