diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index e18a15d3..cb44b7ad 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -25,6 +25,7 @@ if sys.platform == 'darwin': sys.exit(0) # Classes +from helper_sql import * from class_sqlThread import * from class_singleCleaner import * from class_singleWorker import * @@ -315,12 +316,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): streamNumber, 'unused API address', numberOfAddresses, passphrase, eighteenByteRipe)) return shared.apiAddressGeneratorReturnQueue.get() elif method == 'getAllInboxMessages': - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox where folder='inbox' ORDER BY received''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() data = '{"inboxMessages":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row @@ -333,12 +330,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): data += ']}' return data elif method == 'getAllInboxMessageIds' or method == 'getAllInboxMessageIDs': - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT msgid FROM inbox where folder='inbox' ORDER BY received''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() data = '{"inboxMessageIds":[' for row in queryreturn: msgid = row[0] @@ -351,12 +344,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') msgid = self._decode(params[0], "hex") - v = (msgid,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox WHERE msgid=?''') - shared.sqlSubmitQueue.put(v) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox WHERE msgid=?''', msgid) data = '{"inboxMessage":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row @@ -366,11 +354,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): data += ']}' return data elif method == 'getAllSentMessages': - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent where folder='sent' ORDER BY lastactiontime''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent where folder='sent' ORDER BY lastactiontime''') data = '{"sentMessages":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row @@ -382,11 +366,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): data += ']}' return data elif method == 'getAllSentMessageIds' or method == 'getAllSentMessageIDs': - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''') data = '{"sentMessageIds":[' for row in queryreturn: msgid = row[0] @@ -399,12 +379,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') toAddress = params[0] - v = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype FROM inbox WHERE folder='inbox' AND toAddress=?''') - shared.sqlSubmitQueue.put(v) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryReturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype FROM inbox WHERE folder='inbox' AND toAddress=?''', toAddress) data = '{"inboxMessages":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, received, message, encodingtype = row @@ -419,12 +394,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') msgid = self._decode(params[0], "hex") - v = (msgid,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE msgid=?''') - shared.sqlSubmitQueue.put(v) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE msgid=?''', msgid) data = '{"sentMessage":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row @@ -437,12 +407,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') fromAddress = params[0] - v = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE folder='sent' AND fromAddress=? ORDER BY lastactiontime''') - shared.sqlSubmitQueue.put(v) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE folder='sent' AND fromAddress=? ORDER BY lastactiontime''', + fromAddress) data = '{"sentMessages":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row @@ -457,12 +423,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') ackData = self._decode(params[0], "hex") - v = (ackData,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE ackdata=?''') - shared.sqlSubmitQueue.put(v) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE ackdata=?''', + ackData) data = '{"sentMessage":[' for row in queryreturn: msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row @@ -479,14 +441,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): # Trash if in inbox table helper_inbox.trash(msgid) # Trash if in sent table - t = (msgid,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - # shared.UISignalQueue.put(('removeSentRowByMsgid',msgid)) This function doesn't exist yet. + sqlExecute('''UPDATE sent SET folder='trash' WHERE msgid=?''', msgid) return 'Trashed message (assuming message existed).' elif method == 'trashInboxMessage': if len(params) == 0: @@ -498,27 +453,15 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(params) == 0: raise APIError(0, 'I need parameters!') msgid = self._decode(params[0], "hex") - t = (msgid,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - # shared.UISignalQueue.put(('removeSentRowByMsgid',msgid)) This function doesn't exist yet. + sqlExecute('''UPDATE sent SET folder='trash' WHERE msgid=?''', msgid) return 'Trashed sent message (assuming message existed).' elif method == 'trashSentMessageByAckData': # This API method should only be used when msgid is not available if len(params) == 0: raise APIError(0, 'I need parameters!') ackdata = self._decode(params[0], "hex") - t = (ackdata,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET folder='trash' WHERE ackdata=?''', + ackdata) return 'Trashed sent message (assuming message existed).' elif method == 'sendMessage': if len(params) == 0: @@ -581,13 +524,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): helper_sent.insert(t) toLabel = '' - t = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select label from addressbook where address=?''', toAddress) if queryreturn != []: for row in queryreturn: toLabel, = row @@ -656,12 +593,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(ackdata) != 64: raise APIError(15, 'The length of ackData should be 32 bytes (encoded in hex thus 64 characters).') ackdata = self._decode(ackdata, "hex") - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT status FROM sent where ackdata=?''') - shared.sqlSubmitQueue.put((ackdata,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT status FROM sent where ackdata=?''', + ackdata) if queryreturn == []: return 'notfound' for row in queryreturn: @@ -701,23 +635,10 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): raise APIError(12, 'The stream number must be 1. Others aren\'t supported.') # First we must check to see if the address is already in the # subscriptions list. - shared.sqlLock.acquire() - t = (address,) - shared.sqlSubmitQueue.put( - '''select * from subscriptions where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select * from subscriptions where address=?''', address) if queryreturn != []: raise APIError(16, 'You are already subscribed to that address.') - t = (label, address, True) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO subscriptions VALUES (?,?,?)''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO subscriptions VALUES (?,?,?)''',label, address, True) shared.reloadBroadcastSendersForWhichImWatching() shared.UISignalQueue.put(('rerenderInboxFromLabels', '')) shared.UISignalQueue.put(('rerenderSubscriptions', '')) @@ -728,24 +649,13 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): raise APIError(0, 'I need 1 parameter!') address, = params address = addBMIfNotPresent(address) - t = (address,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''DELETE FROM subscriptions WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''DELETE FROM subscriptions WHERE address=?''', address) shared.reloadBroadcastSendersForWhichImWatching() shared.UISignalQueue.put(('rerenderInboxFromLabels', '')) shared.UISignalQueue.put(('rerenderSubscriptions', '')) return 'Deleted subscription if it existed.' elif method == 'listSubscriptions': - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''SELECT label, address, enabled FROM subscriptions''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT label, address, enabled FROM subscriptions''') data = '{"subscriptions":[' for row in queryreturn: label, address, enabled = row @@ -815,27 +725,18 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): # This is not a particularly commonly used API function. Before we # use it we'll need to fill out a field in our inventory database # which is blank by default (first20bytesofencryptedmessage). - parameters = '' - with shared.sqlLock: - shared.sqlSubmitQueue.put('''SELECT hash, payload FROM inventory WHERE first20bytesofencryptedmessage = '' and objecttype = 'msg' ; ''') - shared.sqlSubmitQueue.put(parameters) - queryreturn = shared.sqlReturnQueue.get() - + queryreturn = sqlQuery( + '''SELECT hash, payload FROM inventory WHERE first20bytesofencryptedmessage = '' and objecttype = 'msg' ; ''') + with SqlBulkExecute() as sql: for row in queryreturn: hash, payload = row readPosition = 16 # Nonce length + time length readPosition += decodeVarint(payload[readPosition:readPosition+10])[1] # Stream Number length t = (payload[readPosition:readPosition+20],hash) - shared.sqlSubmitQueue.put('''UPDATE inventory SET first20bytesofencryptedmessage=? WHERE hash=?; ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() + sql.execute('''UPDATE inventory SET first20bytesofencryptedmessage=? WHERE hash=?; ''', *t) - parameters = (requestedHash,) - with shared.sqlLock: - shared.sqlSubmitQueue.put('commit') - shared.sqlSubmitQueue.put('''SELECT payload FROM inventory WHERE first20bytesofencryptedmessage = ?''') - shared.sqlSubmitQueue.put(parameters) - queryreturn = shared.sqlReturnQueue.get() + queryreturn = sqlQuery('''SELECT payload FROM inventory WHERE first20bytesofencryptedmessage = ?''', + requestedHash) data = '{"receivedMessageDatas":[' for row in queryreturn: payload, = row @@ -853,11 +754,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): if len(requestedHash) != 40: raise APIError(19, 'The length of hash should be 20 bytes (encoded in hex thus 40 characters).') requestedHash = self._decode(requestedHash, "hex") - parameters = (requestedHash,) - with shared.sqlLock: - shared.sqlSubmitQueue.put('''SELECT transmitdata FROM pubkeys WHERE hash = ? ; ''') - shared.sqlSubmitQueue.put(parameters) - queryreturn = shared.sqlReturnQueue.get() + queryreturn = sqlQuery('''SELECT transmitdata FROM pubkeys WHERE hash = ? ; ''', requestedHash) data = '{"pubkey":[' for row in queryreturn: transmitdata, = row diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 3ad41c5e..2cdb2c97 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -36,6 +36,7 @@ import debug from debug import logger import subprocess import datetime +from helper_sql import * try: from PyQt4 import QtCore, QtGui @@ -346,11 +347,7 @@ class MyForm(QtGui.QMainWindow): self.loadSent() # Initialize the address book - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('SELECT * FROM addressbook') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('SELECT * FROM addressbook') for row in queryreturn: label, address = row self.ui.tableWidgetAddressBook.insertRow(0) @@ -561,7 +558,7 @@ class MyForm(QtGui.QMainWindow): else: where = "toaddress || fromaddress || subject || message" - sqlQuery = ''' + sqlStatement = ''' SELECT toaddress, fromaddress, subject, message, status, ackdata, lastactiontime FROM sent WHERE folder="sent" AND %s LIKE ? ORDER BY lastactiontime @@ -570,12 +567,7 @@ class MyForm(QtGui.QMainWindow): while self.ui.tableWidgetSent.rowCount() > 0: self.ui.tableWidgetSent.removeRow(0) - t = (what,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put(sqlQuery) - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery(sqlStatement, what) for row in queryreturn: toAddress, fromAddress, subject, message, status, ackdata, lastactiontime = row subject = shared.fixPotentiallyInvalidUTF8Data(subject) @@ -588,13 +580,8 @@ class MyForm(QtGui.QMainWindow): fromLabel = fromAddress toLabel = '' - t = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from addressbook where address=?''', toAddress) if queryreturn != []: for row in queryreturn: @@ -691,7 +678,7 @@ class MyForm(QtGui.QMainWindow): else: where = "toaddress || fromaddress || subject || message" - sqlQuery = ''' + sqlStatement = ''' SELECT msgid, toaddress, fromaddress, subject, received, message, read FROM inbox WHERE folder="inbox" AND %s LIKE ? ORDER BY received @@ -702,12 +689,7 @@ class MyForm(QtGui.QMainWindow): font = QFont() font.setBold(True) - t = (what,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put(sqlQuery) - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery(sqlStatement, what) for row in queryreturn: msgid, toAddress, fromAddress, subject, received, message, read = row subject = shared.fixPotentiallyInvalidUTF8Data(subject) @@ -723,26 +705,16 @@ class MyForm(QtGui.QMainWindow): toLabel = toAddress fromLabel = '' - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from addressbook where address=?''', fromAddress) if queryreturn != []: for row in queryreturn: fromLabel, = row if fromLabel == '': # If this address wasn't in our address book... - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from subscriptions where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryReturn = sqlQuery( + '''select label from subscriptions where address=?''', fromAddress) if queryreturn != []: for row in queryreturn: @@ -883,12 +855,8 @@ class MyForm(QtGui.QMainWindow): if not (self.mmapp.has_source("Subscriptions") or self.mmapp.has_source("Messages")): return - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT toaddress, read FROM inbox WHERE msgid=?''') - shared.sqlSubmitQueue.put(inventoryHash) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT toaddress, read FROM inbox WHERE msgid=?''', inventoryHash) for row in queryreturn: toAddress, read = row if not read: @@ -904,12 +872,8 @@ class MyForm(QtGui.QMainWindow): unreadMessages = 0 unreadSubscriptions = 0 - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT msgid, toaddress, read FROM inbox where folder='inbox' ''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: msgid, toAddress, read = row @@ -1156,9 +1120,7 @@ class MyForm(QtGui.QMainWindow): def click_actionDeleteAllTrashedMessages(self): if QtGui.QMessageBox.question(self, _translate("MainWindow", "Delete trash?"), _translate("MainWindow", "Are you sure you want to delete all trashed messages?"), QtGui.QMessageBox.Yes, QtGui.QMessageBox.No) == QtGui.QMessageBox.No: return - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('deleteandvacuume') - shared.sqlLock.release() + sqlStoredProcedure('deleteandvacuume') def click_actionRegenerateDeterministicAddresses(self): self.regenerateAddressesDialogInstance = regenerateAddressesDialog( @@ -1439,13 +1401,8 @@ class MyForm(QtGui.QMainWindow): addressToLookup = str(self.ui.tableWidgetInbox.item( i, 1).data(Qt.UserRole).toPyObject()) fromLabel = '' - t = (addressToLookup,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from addressbook where address=?''', addressToLookup) if queryreturn != []: for row in queryreturn: @@ -1455,12 +1412,8 @@ class MyForm(QtGui.QMainWindow): else: # It might be a broadcast message. We should check for that # label. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from subscriptions where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from subscriptions where address=?''', addressToLookup) if queryreturn != []: for row in queryreturn: @@ -1506,13 +1459,8 @@ class MyForm(QtGui.QMainWindow): addressToLookup = str(self.ui.tableWidgetSent.item( i, 0).data(Qt.UserRole).toPyObject()) toLabel = '' - t = (addressToLookup,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from addressbook where address=?''', addressToLookup) if queryreturn != []: for row in queryreturn: @@ -1522,12 +1470,7 @@ class MyForm(QtGui.QMainWindow): def rerenderSubscriptions(self): self.ui.tableWidgetSubscriptions.setRowCount(0) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - 'SELECT label, address, enabled FROM subscriptions') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('SELECT label, address, enabled FROM subscriptions') for row in queryreturn: label, address, enabled = row self.ui.tableWidgetSubscriptions.insertRow(0) @@ -1610,24 +1553,26 @@ class MyForm(QtGui.QMainWindow): self.statusBar().showMessage(_translate( "MainWindow", "Warning: You are currently not connected. Bitmessage will do the work necessary to send the message but it won\'t send until you connect.")) ackdata = OpenSSL.rand(32) - shared.sqlLock.acquire() - t = ('', toAddress, ripe, fromAddress, subject, message, ackdata, int( - time.time()), 'msgqueued', 1, 1, 'sent', 2) - shared.sqlSubmitQueue.put( - '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + t = () + sqlExecute( + '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''', + '', + toAddress, + ripe, + fromAddress, + subject, + message, + ackdata, + int(time.time()), + 'msgqueued', + 1, + 1, + 'sent', + 2) toLabel = '' - t = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select label from addressbook where address=?''', + toAddress) if queryreturn != []: for row in queryreturn: toLabel, = row @@ -1658,15 +1603,10 @@ class MyForm(QtGui.QMainWindow): ackdata = OpenSSL.rand(32) toAddress = self.str_broadcast_subscribers ripe = '' - shared.sqlLock.acquire() t = ('', toAddress, ripe, fromAddress, subject, message, ackdata, int( time.time()), 'broadcastqueued', 1, 1, 'sent', 2) - shared.sqlSubmitQueue.put( - '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''', *t) shared.workerQueue.put(('sendbroadcast', '')) @@ -1822,25 +1762,15 @@ class MyForm(QtGui.QMainWindow): subject = shared.fixPotentiallyInvalidUTF8Data(subject) message = shared.fixPotentiallyInvalidUTF8Data(message) fromLabel = '' - shared.sqlLock.acquire() - t = (fromAddress,) - shared.sqlSubmitQueue.put( - '''select label from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from addressbook where address=?''', fromAddress) if queryreturn != []: for row in queryreturn: fromLabel, = row else: # There might be a label in the subscriptions table - shared.sqlLock.acquire() - t = (fromAddress,) - shared.sqlSubmitQueue.put( - '''select label from subscriptions where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select label from subscriptions where address=?''', fromAddress) if queryreturn != []: for row in queryreturn: fromLabel, = row @@ -1914,13 +1844,7 @@ class MyForm(QtGui.QMainWindow): "MainWindow", "The address you entered was invalid. Ignoring it.")) def addEntryToAddressBook(self,address,label): - shared.sqlLock.acquire() - t = (address,) - shared.sqlSubmitQueue.put( - '''select * from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select * from addressbook where address=?''', address) if queryreturn == []: self.ui.tableWidgetAddressBook.setSortingEnabled(False) self.ui.tableWidgetAddressBook.insertRow(0) @@ -1931,14 +1855,7 @@ class MyForm(QtGui.QMainWindow): QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) self.ui.tableWidgetAddressBook.setItem(0, 1, newItem) self.ui.tableWidgetAddressBook.setSortingEnabled(True) - t = (str(label), address) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO addressbook VALUES (?,?)''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO addressbook VALUES (?,?)''', str(label), address) self.rerenderInboxFromLabels() self.rerenderSentToLabels() else: @@ -1960,13 +1877,7 @@ class MyForm(QtGui.QMainWindow): self.ui.tableWidgetSubscriptions.setItem(0,1,newItem) self.ui.tableWidgetSubscriptions.setSortingEnabled(True) #Add to database (perhaps this should be separated from the MyForm class) - t = (str(label),address,True) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO subscriptions VALUES (?,?,?)''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO subscriptions VALUES (?,?,?)''',str(label),address,True) self.rerenderInboxFromLabels() shared.reloadBroadcastSendersForWhichImWatching() @@ -1987,16 +1898,10 @@ class MyForm(QtGui.QMainWindow): def loadBlackWhiteList(self): # Initialize the Blacklist or Whitelist table listType = shared.config.get('bitmessagesettings', 'blackwhitelist') - shared.sqlLock.acquire() if listType == 'black': - shared.sqlSubmitQueue.put( - '''SELECT label, address, enabled FROM blacklist''') + queryreturn = sqlQuery('''SELECT label, address, enabled FROM blacklist''') else: - shared.sqlSubmitQueue.put( - '''SELECT label, address, enabled FROM whitelist''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT label, address, enabled FROM whitelist''') for row in queryreturn: label, address, enabled = row self.ui.tableWidgetBlacklist.insertRow(0) @@ -2125,9 +2030,7 @@ class MyForm(QtGui.QMainWindow): if shared.appdata != '' and self.settingsDialogInstance.ui.checkBoxPortableMode.isChecked(): # If we are NOT using portable mode now but the user selected that we should... # Write the keys.dat file to disk in the new location - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('movemessagstoprog') - shared.sqlLock.release() + sqlStoredProcedure('movemessagstoprog') with open('keys.dat', 'wb') as configfile: shared.config.write(configfile) # Write the knownnodes.dat file to disk in the new location @@ -2151,9 +2054,7 @@ class MyForm(QtGui.QMainWindow): shared.appdata = shared.lookupAppdataFolder() if not os.path.exists(shared.appdata): os.makedirs(shared.appdata) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('movemessagstoappdata') - shared.sqlLock.release() + sqlStoredProcedure('movemessagstoappdata') # Write the keys.dat file to disk in the new location with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) @@ -2199,18 +2100,13 @@ class MyForm(QtGui.QMainWindow): # First we must check to see if the address is already in the # address book. The user cannot add it again or else it will # cause problems when updating and deleting the entry. - shared.sqlLock.acquire() t = (addBMIfNotPresent(str( self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())),) if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': - shared.sqlSubmitQueue.put( - '''select * from blacklist where address=?''') + sql = '''select * from blacklist where address=?''' else: - shared.sqlSubmitQueue.put( - '''select * from whitelist where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + sql = '''select * from whitelist where address=?''' + queryreturn = sqlQuery(sql,*t) if queryreturn == []: self.ui.tableWidgetBlacklist.setSortingEnabled(False) self.ui.tableWidgetBlacklist.insertRow(0) @@ -2225,17 +2121,11 @@ class MyForm(QtGui.QMainWindow): self.ui.tableWidgetBlacklist.setSortingEnabled(True) t = (str(self.NewBlacklistDialogInstance.ui.newsubscriptionlabel.text().toUtf8()), addBMIfNotPresent( str(self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())), True) - shared.sqlLock.acquire() if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': - shared.sqlSubmitQueue.put( - '''INSERT INTO blacklist VALUES (?,?,?)''') + sql = '''INSERT INTO blacklist VALUES (?,?,?)''' else: - shared.sqlSubmitQueue.put( - '''INSERT INTO whitelist VALUES (?,?,?)''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sql = '''INSERT INTO whitelist VALUES (?,?,?)''' + sqlExecute(sql, *t) else: self.statusBar().showMessage(_translate( "MainWindow", "Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want.")) @@ -2362,20 +2252,11 @@ class MyForm(QtGui.QMainWindow): currentRow = row.row() inventoryHashToMarkUnread = str(self.ui.tableWidgetInbox.item( currentRow, 3).data(Qt.UserRole).toPyObject()) - t = (inventoryHashToMarkUnread,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE inbox SET read=0 WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlLock.release() + sqlExecute('''UPDATE inbox SET read=0 WHERE msgid=?''', inventoryHashToMarkUnread) self.ui.tableWidgetInbox.item(currentRow, 0).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 1).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 2).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 3).setFont(font) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() # self.ui.tableWidgetInbox.selectRow(currentRow + 1) # This doesn't de-select the last message if you try to mark it unread, but that doesn't interfere. Might not be necessary. # We could also select upwards, but then our problem would be with the topmost message. @@ -2420,13 +2301,8 @@ class MyForm(QtGui.QMainWindow): addressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item( currentInboxRow, 1).data(Qt.UserRole).toPyObject()) # Let's make sure that it isn't already in the address book - shared.sqlLock.acquire() - t = (addressAtCurrentInboxRow,) - shared.sqlSubmitQueue.put( - '''select * from addressbook where address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select * from addressbook where address=?''', + addressAtCurrentInboxRow) if queryreturn == []: self.ui.tableWidgetAddressBook.insertRow(0) newItem = QtGui.QTableWidgetItem( @@ -2436,15 +2312,9 @@ class MyForm(QtGui.QMainWindow): newItem.setFlags( QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) self.ui.tableWidgetAddressBook.setItem(0, 1, newItem) - t = ('--New entry. Change label in Address Book.--', - addressAtCurrentInboxRow) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO addressbook VALUES (?,?)''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO addressbook VALUES (?,?)''', + '--New entry. Change label in Address Book.--', + addressAtCurrentInboxRow) self.ui.tabWidget.setCurrentIndex(5) self.ui.tableWidgetAddressBook.setCurrentCell(0, 0) self.statusBar().showMessage(_translate( @@ -2459,20 +2329,11 @@ class MyForm(QtGui.QMainWindow): currentRow = self.ui.tableWidgetInbox.selectedIndexes()[0].row() inventoryHashToTrash = str(self.ui.tableWidgetInbox.item( currentRow, 3).data(Qt.UserRole).toPyObject()) - t = (inventoryHashToTrash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE inbox SET folder='trash' WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlLock.release() + sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', inventoryHashToTrash) self.ui.textEditInboxMessage.setText("") self.ui.tableWidgetInbox.removeRow(currentRow) self.statusBar().showMessage(_translate( "MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back.")) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() if currentRow == 0: self.ui.tableWidgetInbox.selectRow(currentRow) else: @@ -2503,20 +2364,11 @@ class MyForm(QtGui.QMainWindow): currentRow = self.ui.tableWidgetSent.selectedIndexes()[0].row() ackdataToTrash = str(self.ui.tableWidgetSent.item( currentRow, 3).data(Qt.UserRole).toPyObject()) - t = (ackdataToTrash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET folder='trash' WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET folder='trash' WHERE ackdata=?''', ackdataToTrash) self.ui.textEditSentMessage.setPlainText("") self.ui.tableWidgetSent.removeRow(currentRow) self.statusBar().showMessage(_translate( "MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back.")) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() if currentRow == 0: self.ui.tableWidgetSent.selectRow(currentRow) else: @@ -2527,18 +2379,10 @@ class MyForm(QtGui.QMainWindow): addressAtCurrentRow = str(self.ui.tableWidgetSent.item( currentRow, 0).data(Qt.UserRole).toPyObject()) toRipe = decodeAddress(addressAtCurrentRow)[3] - t = (toRipe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='forcepow' WHERE toripe=? AND status='toodifficult' and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlSubmitQueue.put( - '''select ackdata FROM sent WHERE status='forcepow' ''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='forcepow' WHERE toripe=? AND status='toodifficult' and folder='sent' ''', + toRipe) + queryreturn = sqlQuery('''select ackdata FROM sent WHERE status='forcepow' ''') for row in queryreturn: ackdata, = row shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( @@ -2564,14 +2408,8 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).text().toUtf8() addressAtCurrentRow = self.ui.tableWidgetAddressBook.item( currentRow, 1).text() - t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''DELETE FROM addressbook WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''DELETE FROM addressbook WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) self.ui.tableWidgetAddressBook.removeRow(currentRow) self.rerenderInboxFromLabels() self.rerenderSentToLabels() @@ -2641,14 +2479,8 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).text().toUtf8() addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( currentRow, 1).text() - t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''DELETE FROM subscriptions WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''DELETE FROM subscriptions WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) self.ui.tableWidgetSubscriptions.removeRow(currentRow) self.rerenderInboxFromLabels() shared.reloadBroadcastSendersForWhichImWatching() @@ -2666,14 +2498,9 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).text().toUtf8() addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( currentRow, 1).text() - t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''update subscriptions set enabled=1 WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''update subscriptions set enabled=1 WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) self.ui.tableWidgetSubscriptions.item( currentRow, 0).setTextColor(QApplication.palette().text().color()) self.ui.tableWidgetSubscriptions.item( @@ -2686,14 +2513,9 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).text().toUtf8() addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( currentRow, 1).text() - t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''update subscriptions set enabled=0 WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''update subscriptions set enabled=0 WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) self.ui.tableWidgetSubscriptions.item( currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128)) self.ui.tableWidgetSubscriptions.item( @@ -2714,20 +2536,14 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).text().toUtf8() addressAtCurrentRow = self.ui.tableWidgetBlacklist.item( currentRow, 1).text() - t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) - shared.sqlLock.acquire() if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': - shared.sqlSubmitQueue.put( - '''DELETE FROM blacklist WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() + sqlExecute( + '''DELETE FROM blacklist WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) else: - shared.sqlSubmitQueue.put( - '''DELETE FROM whitelist WHERE label=? AND address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''DELETE FROM whitelist WHERE label=? AND address=?''', + str(labelAtCurrentRow), str(addressAtCurrentRow)) self.ui.tableWidgetBlacklist.removeRow(currentRow) def on_action_BlacklistClipboard(self): @@ -2749,20 +2565,14 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).setTextColor(QApplication.palette().text().color()) self.ui.tableWidgetBlacklist.item( currentRow, 1).setTextColor(QApplication.palette().text().color()) - t = (str(addressAtCurrentRow),) - shared.sqlLock.acquire() if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': - shared.sqlSubmitQueue.put( - '''UPDATE blacklist SET enabled=1 WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() + sqlExecute( + '''UPDATE blacklist SET enabled=1 WHERE address=?''', + str(addressAtCurrentRow)) else: - shared.sqlSubmitQueue.put( - '''UPDATE whitelist SET enabled=1 WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE whitelist SET enabled=1 WHERE address=?''', + str(addressAtCurrentRow)) def on_action_BlacklistDisable(self): currentRow = self.ui.tableWidgetBlacklist.currentRow() @@ -2772,20 +2582,12 @@ class MyForm(QtGui.QMainWindow): currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128)) self.ui.tableWidgetBlacklist.item( currentRow, 1).setTextColor(QtGui.QColor(128, 128, 128)) - t = (str(addressAtCurrentRow),) - shared.sqlLock.acquire() if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': - shared.sqlSubmitQueue.put( - '''UPDATE blacklist SET enabled=0 WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() + sqlExecute( + '''UPDATE blacklist SET enabled=0 WHERE address=?''', str(addressAtCurrentRow)) else: - shared.sqlSubmitQueue.put( - '''UPDATE whitelist SET enabled=0 WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE whitelist SET enabled=0 WHERE address=?''', str(addressAtCurrentRow)) # Group of functions for the Your Identities dialog box def on_action_YourIdentitiesNew(self): @@ -2851,12 +2653,7 @@ class MyForm(QtGui.QMainWindow): currentRow = self.ui.tableWidgetSent.currentRow() ackData = str(self.ui.tableWidgetSent.item( currentRow, 3).data(Qt.UserRole).toPyObject()) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT status FROM sent where ackdata=?''') - shared.sqlSubmitQueue.put((ackData,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT status FROM sent where ackdata=?''', ackData) for row in queryreturn: status, = row if status == 'toodifficult': @@ -2913,13 +2710,7 @@ class MyForm(QtGui.QMainWindow): currentRow, 3).data(Qt.UserRole).toPyObject()) t = (inventoryHash,) self.ubuntuMessagingMenuClear(t) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''update inbox set read=1 WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''update inbox set read=1 WHERE msgid=?''', *t) def tableWidgetSentItemClicked(self): currentRow = self.ui.tableWidgetSent.currentRow() @@ -2944,35 +2735,23 @@ class MyForm(QtGui.QMainWindow): def tableWidgetAddressBookItemChanged(self): currentRow = self.ui.tableWidgetAddressBook.currentRow() - shared.sqlLock.acquire() if currentRow >= 0: addressAtCurrentRow = self.ui.tableWidgetAddressBook.item( currentRow, 1).text() - t = (str(self.ui.tableWidgetAddressBook.item( - currentRow, 0).text().toUtf8()), str(addressAtCurrentRow)) - shared.sqlSubmitQueue.put( - '''UPDATE addressbook set label=? WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE addressbook set label=? WHERE address=?''', + str(self.ui.tableWidgetAddressBook.item(currentRow, 0).text().toUtf8()), + str(addressAtCurrentRow)) self.rerenderInboxFromLabels() self.rerenderSentToLabels() def tableWidgetSubscriptionsItemChanged(self): currentRow = self.ui.tableWidgetSubscriptions.currentRow() - shared.sqlLock.acquire() if currentRow >= 0: addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( currentRow, 1).text() - t = (str(self.ui.tableWidgetSubscriptions.item( - currentRow, 0).text().toUtf8()), str(addressAtCurrentRow)) - shared.sqlSubmitQueue.put( - '''UPDATE subscriptions set label=? WHERE address=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE subscriptions set label=? WHERE address=?''', + str(self.ui.tableWidgetSubscriptions.item(currentRow, 0).text().toUtf8()), + str(addressAtCurrentRow)) self.rerenderInboxFromLabels() self.rerenderSentToLabels() diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index b5aa5893..d04da7a6 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -19,6 +19,7 @@ import helper_generic import helper_bitcoin import helper_inbox import helper_sent +from helper_sql import * import tr #from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys @@ -281,16 +282,13 @@ class receiveDataThread(threading.Thread): self.sendBigInv() def sendBigInv(self): - shared.sqlLock.acquire() # Select all hashes which are younger than two days old and in this # stream. - t = (int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, int( - time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber) - shared.sqlSubmitQueue.put( - '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''', + int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, + int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, + self.streamNumber) bigInvList = {} for row in queryreturn: hash, = row @@ -507,15 +505,12 @@ class receiveDataThread(threading.Thread): # won't be able to send this pubkey to others (without doing # the proof of work ourselves, which this program is programmed # to not do.) - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[ - beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[beginningOfPubkeyPosition:endOfPubkeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the @@ -657,15 +652,11 @@ class receiveDataThread(threading.Thread): # Let's store the public key in case we want to reply to this # person. - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ - beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[beginningOfPubkeyPosition:endOfPubkeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the POW @@ -802,14 +793,8 @@ class receiveDataThread(threading.Thread): print 'This msg IS an acknowledgement bound for me.' del shared.ackdataForWhichImWatching[encryptedData[readPosition:]] - t = ('ackreceived', encryptedData[readPosition:]) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - 'UPDATE sent SET status=? WHERE ackdata=?') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('UPDATE sent SET status=? WHERE ackdata=?', + 'ackreceived', encryptedData[readPosition:]) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode( time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8'))))) return @@ -932,15 +917,12 @@ class receiveDataThread(threading.Thread): ripe.update(sha.digest()) # Let's store the public key in case we want to reply to this # person. - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ - messageVersionLength:endOfThePublicKeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionLength:endOfThePublicKeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the POW @@ -962,26 +944,18 @@ class receiveDataThread(threading.Thread): return blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists. if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT label FROM blacklist where address=? and enabled='1' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT label FROM blacklist where address=? and enabled='1' ''', + fromAddress) if queryreturn != []: with shared.printLock: print 'Message ignored because address is in blacklist.' blockMessage = True else: # We're using a whitelist - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT label FROM whitelist where address=? and enabled='1' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT label FROM whitelist where address=? and enabled='1' ''', + toAddress) if queryreturn == []: print 'Message ignored because address not in whitelist.' blockMessage = True @@ -1111,14 +1085,9 @@ class receiveDataThread(threading.Thread): if toRipe in shared.neededPubkeys: print 'We have been awaiting the arrival of this pubkey.' del shared.neededPubkeys[toRipe] - t = (toRipe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''', + toRipe) shared.workerQueue.put(('sendmessage', '')) else: with shared.printLock: @@ -1254,13 +1223,8 @@ class receiveDataThread(threading.Thread): print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - t = (ripe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe) if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe, data, embeddedTime, 'yes') @@ -1268,13 +1232,7 @@ class receiveDataThread(threading.Thread): print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe, data, embeddedTime, 'no') # This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) self.possibleNewPubkey(ripe) if addressVersion == 3: @@ -1323,13 +1281,7 @@ class receiveDataThread(threading.Thread): print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - t = (ripe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe) if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe, data, embeddedTime, 'yes') @@ -1337,13 +1289,7 @@ class receiveDataThread(threading.Thread): print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe, data, embeddedTime, 'no') # This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) self.possibleNewPubkey(ripe) @@ -1540,13 +1486,9 @@ class receiveDataThread(threading.Thread): hash] self.sendData(objectType, payload) else: - t = (hash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select objecttype, payload from inventory where hash=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select objecttype, payload from inventory where hash=?''', + hash) if queryreturn != []: for row in queryreturn: objectType, payload = row diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index d92a37ec..3cc80868 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -2,6 +2,7 @@ import threading import shared import time import sys +from helper_sql import * '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. It cleans these data structures in memory: @@ -27,21 +28,23 @@ class singleCleaner(threading.Thread): timeWeLastClearedInventoryAndPubkeysTables = 0 while True: - shared.sqlLock.acquire() shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time()) - 3600 > receivedTime: - t = (hash, objectType, streamNumber, payload, receivedTime,'') - shared.sqlSubmitQueue.put( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - del shared.inventory[hash] - shared.sqlSubmitQueue.put('commit') + + with SqlBulkExecute() as sql: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if int(time.time()) - 3600 > receivedTime: + sql.execute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + receivedTime, + '') + del shared.inventory[hash] shared.UISignalQueue.put(('updateStatusBar', '')) - shared.sqlLock.release() shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. # If we are running as a daemon then we are going to fill up the UI @@ -53,29 +56,20 @@ class singleCleaner(threading.Thread): timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) # inventory (moves data from the inventory data structure to # the on-disk sql database) - shared.sqlLock.acquire() # inventory (clears pubkeys after 28 days and everything else # after 2 days and 12 hours) - t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int( - time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys) - shared.sqlSubmitQueue.put( - '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime'pubkey') OR (receivedtime (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' - t = (int( - time.time()), msgretrynumber + 1, 'msgqueued', ackdata) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') + sqlExecute( + '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''', + int(time.time()), + msgretrynumber + 1, + 'msgqueued', + ackdata) shared.workerQueue.put(('sendmessage', '')) shared.UISignalQueue.put(( 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() time.sleep(300) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index a3d0a0c5..d3c0c784 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -10,6 +10,7 @@ import sys from class_addressGenerator import pointMult import tr from debug import logger +from helper_sql import * # This thread, of which there is only one, does the heavy lifting: # calculating POWs. @@ -22,35 +23,23 @@ class singleWorker(threading.Thread): threading.Thread.__init__(self) def run(self): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT toripe FROM sent WHERE ((status='awaitingpubkey' OR status='doingpubkeypow') AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: toripe, = row shared.neededPubkeys[toripe] = 0 # Initialize the shared.ackdataForWhichImWatching data structure using data # from the sql database. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: ackdata, = row print 'Watching for ackdata', ackdata.encode('hex') shared.ackdataForWhichImWatching[ackdata] = 0 - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: toaddress, = row self.requestPubKey(toaddress) @@ -248,26 +237,19 @@ class singleWorker(threading.Thread): if shared.safeConfigGetBoolean(myAddress, 'chan'): payload = '\x00' * 8 + payload # Attach a fake nonce on the front # just so that it is in the correct format. - t = (hash,payload,embeddedTime,'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', + hash, + payload, + embeddedTime, + 'yes') shared.config.set( myAddress, 'lastpubkeysendtime', str(int(time.time()))) with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) def sendBroadcast(self): - shared.sqlLock.acquire() - t = ('broadcastqueued',) - shared.sqlSubmitQueue.put( - '''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued') for row in queryreturn: fromaddress, subject, body, ackdata = row status, addressVersionNumber, streamNumber, ripe = decodeAddress( @@ -355,79 +337,49 @@ class singleWorker(threading.Thread): # Update the status of the message in the 'sent' table to have # a 'broadcastsent' status - shared.sqlLock.acquire() - t = (inventoryHash,'broadcastsent', int( - time.time()), ackdata) - shared.sqlSubmitQueue.put( - 'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + 'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?', + inventoryHash, + 'broadcastsent', + int(time.time()), + ackdata) def sendMsg(self): # Check to see if there are any messages queued to be sent - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already. toaddress, = row toripe = decodeAddress(toaddress)[3] - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT hash FROM pubkeys WHERE hash=? ''') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM pubkeys WHERE hash=? ''', toripe) if queryreturn != []: # If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down) - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) else: # We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already if toripe in shared.neededPubkeys: # We already sent a request for the pubkey - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) else: # We have not yet sent a request for the pubkey - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) - shared.sqlLock.acquire() # Get all messages that are ready to be sent, and also all messages # which we have sent in the last 28 days which were previously marked # as 'toodifficult'. If the user as raised the maximum acceptable # difficulty then those messages may now be sendable. - shared.sqlSubmitQueue.put( - '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''') - shared.sqlSubmitQueue.put((int(time.time()) - 2419200,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''', + int(time.time()) - 2419200) for row in queryreturn: # For each message we need to send.. toaddress, toripe, fromaddress, subject, message, ackdata, status = row # There is a remote possibility that we may no longer have the @@ -436,12 +388,9 @@ class singleWorker(threading.Thread): # user sends a message but doesn't let the POW function finish, # then leaves their client off for a long time which could cause # the needed pubkey to expire and be deleted. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT hash FROM pubkeys WHERE hash=? ''') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM pubkeys WHERE hash=? ''', + toripe) if queryreturn == [] and toripe not in shared.neededPubkeys: # We no longer have the needed pubkey and we haven't requested # it. @@ -449,14 +398,8 @@ class singleWorker(threading.Thread): sys.stderr.write( 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''', toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) @@ -475,21 +418,15 @@ class singleWorker(threading.Thread): # mark the pubkey as 'usedpersonally' so that we don't ever delete # it. - shared.sqlLock.acquire() - t = (toripe,) - shared.sqlSubmitQueue.put( - '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') + sqlExecute( + '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''', + toripe) # Let us fetch the recipient's public key out of our database. If # the required proof of work difficulty is too hard then we'll # abort. - shared.sqlSubmitQueue.put( - 'SELECT transmitdata FROM pubkeys WHERE hash=?') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + 'SELECT transmitdata FROM pubkeys WHERE hash=?', + toripe) if queryreturn == []: with shared.printLock: sys.stderr.write( @@ -559,14 +496,9 @@ class singleWorker(threading.Thread): if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0): # The demanded difficulty is more than we are willing # to do. - shared.sqlLock.acquire() - t = (ackdata,) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''', + ackdata) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float( requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) continue @@ -694,13 +626,7 @@ class singleWorker(threading.Thread): try: encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex')) except: - shared.sqlLock.acquire() - t = (ackdata,) - shared.sqlSubmitQueue.put('''UPDATE sent SET status='badkey' WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET status='badkey' WHERE ackdata=?''', ackdata) shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))))) continue encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted @@ -741,13 +667,8 @@ class singleWorker(threading.Thread): newStatus = 'msgsentnoackexpected' else: newStatus = 'msgsent' - shared.sqlLock.acquire() - t = (inventoryHash,newStatus,ackdata,) - shared.sqlSubmitQueue.put('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''', + inventoryHash,newStatus,ackdata) def requestPubKey(self, toAddress): toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( @@ -789,14 +710,9 @@ class singleWorker(threading.Thread): shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) - t = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''', + toAddress) shared.UISignalQueue.put(( 'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.'))) diff --git a/src/helper_inbox.py b/src/helper_inbox.py index fc420825..5d746010 100644 --- a/src/helper_inbox.py +++ b/src/helper_inbox.py @@ -1,22 +1,9 @@ -import shared +from helper_sql import * def insert(t): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''', *t) def trash(msgid): - t = (msgid,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE inbox SET folder='trash' WHERE msgid=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid) shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid)) diff --git a/src/helper_sent.py b/src/helper_sent.py index dcfe844e..75634b49 100644 --- a/src/helper_sent.py +++ b/src/helper_sent.py @@ -1,11 +1,4 @@ -import shared +from helper_sql import * def insert(t): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - + sqlExecute('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''', *t) diff --git a/src/helper_sql.py b/src/helper_sql.py new file mode 100644 index 00000000..0353f9ae --- /dev/null +++ b/src/helper_sql.py @@ -0,0 +1,66 @@ +import threading +import Queue + +sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread. +sqlReturnQueue = Queue.Queue() +sqlLock = threading.Lock() + +def sqlQuery(sqlStatement, *args): + sqlLock.acquire() + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + + queryreturn = sqlReturnQueue.get() + sqlLock.release() + + return queryreturn + +def sqlExecute(sqlStatement, *args): + sqlLock.acquire() + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + + sqlReturnQueue.get() + sqlSubmitQueue.put('commit') + sqlLock.release() + +def sqlStoredProcedure(procName): + sqlLock.acquire() + sqlSubmitQueue.put(procName) + sqlLock.release() + +class SqlBulkExecute: + def __enter__(self): + sqlLock.acquire() + return self + + def __exit__(self, type, value, traceback): + sqlSubmitQueue.put('commit') + sqlLock.release() + + def execute(self, sqlStatement, *args): + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + sqlReturnQueue.get() + + def query(self, sqlStatement, *args): + sqlSubmitQueue.put(sqlStatement) + + if args == (): + sqlSubmitQueue.put('') + else: + sqlSubmitQueue.put(args) + return sqlReturnQueue.get() + diff --git a/src/shared.py b/src/shared.py index 6f06c3ac..0ff80978 100644 --- a/src/shared.py +++ b/src/shared.py @@ -27,7 +27,7 @@ from addresses import * import highlevelcrypto import shared import helper_startup - +from helper_sql import * config = ConfigParser.SafeConfigParser() @@ -36,9 +36,6 @@ MyECSubscriptionCryptorObjects = {} myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself. broadcastSendersForWhichImWatching = {} workerQueue = Queue.Queue() -sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread. -sqlReturnQueue = Queue.Queue() -sqlLock = threading.Lock() UISignalQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue() knownNodesLock = threading.Lock() @@ -85,12 +82,7 @@ namecoinDefaultRpcPort = "8336" frozen = getattr(sys,'frozen', None) def isInSqlInventory(hash): - t = (hash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash) if queryreturn == []: return False else: @@ -171,41 +163,29 @@ def lookupAppdataFolder(): return dataFolder def isAddressInMyAddressBook(address): - t = (address,) - sqlLock.acquire() - sqlSubmitQueue.put('''select address from addressbook where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery( + '''select address from addressbook where address=?''', + address) return queryreturn != [] #At this point we should really just have a isAddressInMy(book, address)... def isAddressInMySubscriptionsList(address): - t = (str(address),) # As opposed to Qt str - sqlLock.acquire() - sqlSubmitQueue.put('''select * from subscriptions where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery( + '''select * from subscriptions where address=?''', + str(address)) return queryreturn != [] def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address): if isAddressInMyAddressBook(address): return True - sqlLock.acquire() - sqlSubmitQueue.put('''SELECT address FROM whitelist where address=? and enabled = '1' ''') - sqlSubmitQueue.put((address,)) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address) if queryreturn <> []: return True - sqlLock.acquire() - sqlSubmitQueue.put('''select address from subscriptions where address=? and enabled = '1' ''') - sqlSubmitQueue.put((address,)) - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery( + '''select address from subscriptions where address=? and enabled = '1' ''', + address) if queryreturn <> []: return True return False @@ -269,11 +249,7 @@ def reloadBroadcastSendersForWhichImWatching(): logger.debug('reloading subscriptions...') broadcastSendersForWhichImWatching.clear() MyECSubscriptionCryptorObjects.clear() - sqlLock.acquire() - sqlSubmitQueue.put('SELECT address FROM subscriptions where enabled=1') - sqlSubmitQueue.put('') - queryreturn = sqlReturnQueue.get() - sqlLock.release() + queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1') for row in queryreturn: address, = row status,addressVersionNumber,streamNumber,hash = decodeAddress(address) @@ -307,12 +283,8 @@ def doCleanShutdown(): # This one last useless query will guarantee that the previous flush committed before we close # the program. - sqlLock.acquire() - sqlSubmitQueue.put('SELECT address FROM subscriptions') - sqlSubmitQueue.put('') - sqlReturnQueue.get() - sqlSubmitQueue.put('exit') - sqlLock.release() + sqlQuery('SELECT address FROM subscriptions') + sqlStoredProcedure('exit') logger.info('Finished flushing inventory.') # Wait long enough to guarantee that any running proof of work worker threads will check the @@ -333,16 +305,12 @@ def broadcastToSendDataQueues(data): def flushInventory(): #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. - sqlLock.acquire() - for hash, storedValue in inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - t = (hash,objectType,streamNumber,payload,receivedTime,'') - sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - del inventory[hash] - sqlSubmitQueue.put('commit') - sqlLock.release() + with SqlBulkExecute() as sql: + for hash, storedValue in inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash,objectType,streamNumber,payload,receivedTime,'') + del inventory[hash] def fixPotentiallyInvalidUTF8Data(text): try: