Merge branch 'sql_refactor' of git://github.com/grant-olson/PyBitmessage into grant-olson-sql_refactor

This commit is contained in:
Jonathan Warren 2013-09-02 17:30:06 -04:00
commit 95c1dbda5a
9 changed files with 362 additions and 821 deletions

View File

@ -25,6 +25,7 @@ if sys.platform == 'darwin':
sys.exit(0) sys.exit(0)
# Classes # Classes
from helper_sql import *
from class_sqlThread import * from class_sqlThread import *
from class_singleCleaner import * from class_singleCleaner import *
from class_singleWorker import * from class_singleWorker import *
@ -315,12 +316,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
streamNumber, 'unused API address', numberOfAddresses, passphrase, eighteenByteRipe)) streamNumber, 'unused API address', numberOfAddresses, passphrase, eighteenByteRipe))
return shared.apiAddressGeneratorReturnQueue.get() return shared.apiAddressGeneratorReturnQueue.get()
elif method == 'getAllInboxMessages': elif method == 'getAllInboxMessages':
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox where folder='inbox' ORDER BY received''') '''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox where folder='inbox' ORDER BY received''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"inboxMessages":[' data = '{"inboxMessages":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row
@ -333,12 +330,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
data += ']}' data += ']}'
return data return data
elif method == 'getAllInboxMessageIds' or method == 'getAllInboxMessageIDs': elif method == 'getAllInboxMessageIds' or method == 'getAllInboxMessageIDs':
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT msgid FROM inbox where folder='inbox' ORDER BY received''') '''SELECT msgid FROM inbox where folder='inbox' ORDER BY received''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"inboxMessageIds":[' data = '{"inboxMessageIds":['
for row in queryreturn: for row in queryreturn:
msgid = row[0] msgid = row[0]
@ -351,12 +344,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
msgid = self._decode(params[0], "hex") msgid = self._decode(params[0], "hex")
v = (msgid,) queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox WHERE msgid=?''', msgid)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype, read FROM inbox WHERE msgid=?''')
shared.sqlSubmitQueue.put(v)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"inboxMessage":[' data = '{"inboxMessage":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row msgid, toAddress, fromAddress, subject, received, message, encodingtype, read = row
@ -366,11 +354,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
data += ']}' data += ']}'
return data return data
elif method == 'getAllSentMessages': elif method == 'getAllSentMessages':
shared.sqlLock.acquire() queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent where folder='sent' ORDER BY lastactiontime''')
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent where folder='sent' ORDER BY lastactiontime''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"sentMessages":[' data = '{"sentMessages":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row
@ -382,11 +366,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
data += ']}' data += ']}'
return data return data
elif method == 'getAllSentMessageIds' or method == 'getAllSentMessageIDs': elif method == 'getAllSentMessageIds' or method == 'getAllSentMessageIDs':
shared.sqlLock.acquire() queryreturn = sqlQuery('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''')
shared.sqlSubmitQueue.put('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"sentMessageIds":[' data = '{"sentMessageIds":['
for row in queryreturn: for row in queryreturn:
msgid = row[0] msgid = row[0]
@ -399,12 +379,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
toAddress = params[0] toAddress = params[0]
v = (toAddress,) queryReturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype FROM inbox WHERE folder='inbox' AND toAddress=?''', toAddress)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, received, message, encodingtype FROM inbox WHERE folder='inbox' AND toAddress=?''')
shared.sqlSubmitQueue.put(v)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"inboxMessages":[' data = '{"inboxMessages":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, received, message, encodingtype = row msgid, toAddress, fromAddress, subject, received, message, encodingtype = row
@ -419,12 +394,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
msgid = self._decode(params[0], "hex") msgid = self._decode(params[0], "hex")
v = (msgid,) queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE msgid=?''', msgid)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE msgid=?''')
shared.sqlSubmitQueue.put(v)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"sentMessage":[' data = '{"sentMessage":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row
@ -437,12 +407,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
fromAddress = params[0] fromAddress = params[0]
v = (fromAddress,) queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE folder='sent' AND fromAddress=? ORDER BY lastactiontime''',
shared.sqlLock.acquire() fromAddress)
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE folder='sent' AND fromAddress=? ORDER BY lastactiontime''')
shared.sqlSubmitQueue.put(v)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"sentMessages":[' data = '{"sentMessages":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row
@ -457,12 +423,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
ackData = self._decode(params[0], "hex") ackData = self._decode(params[0], "hex")
v = (ackData,) queryreturn = sqlQuery('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE ackdata=?''',
shared.sqlLock.acquire() ackData)
shared.sqlSubmitQueue.put('''SELECT msgid, toaddress, fromaddress, subject, lastactiontime, message, encodingtype, status, ackdata FROM sent WHERE ackdata=?''')
shared.sqlSubmitQueue.put(v)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"sentMessage":[' data = '{"sentMessage":['
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row msgid, toAddress, fromAddress, subject, lastactiontime, message, encodingtype, status, ackdata = row
@ -480,13 +442,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
helper_inbox.trash(msgid) helper_inbox.trash(msgid)
# Trash if in sent table # Trash if in sent table
t = (msgid,) t = (msgid,)
shared.sqlLock.acquire() sqlExecute('''UPDATE sent SET folder='trash' WHERE msgid=?''', t)
shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.UISignalQueue.put(('removeSentRowByMsgid',msgid)) This function doesn't exist yet.
return 'Trashed message (assuming message existed).' return 'Trashed message (assuming message existed).'
elif method == 'trashInboxMessage': elif method == 'trashInboxMessage':
if len(params) == 0: if len(params) == 0:
@ -499,26 +455,15 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
msgid = self._decode(params[0], "hex") msgid = self._decode(params[0], "hex")
t = (msgid,) t = (msgid,)
shared.sqlLock.acquire() sqlExecute('''UPDATE sent SET folder='trash' WHERE msgid=?''', t)
shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.UISignalQueue.put(('removeSentRowByMsgid',msgid)) This function doesn't exist yet.
return 'Trashed sent message (assuming message existed).' return 'Trashed sent message (assuming message existed).'
elif method == 'trashSentMessageByAckData': elif method == 'trashSentMessageByAckData':
# This API method should only be used when msgid is not available # This API method should only be used when msgid is not available
if len(params) == 0: if len(params) == 0:
raise APIError(0, 'I need parameters!') raise APIError(0, 'I need parameters!')
ackdata = self._decode(params[0], "hex") ackdata = self._decode(params[0], "hex")
t = (ackdata,) sqlExecute('''UPDATE sent SET folder='trash' WHERE ackdata=?''',
shared.sqlLock.acquire() ackdata)
shared.sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
return 'Trashed sent message (assuming message existed).' return 'Trashed sent message (assuming message existed).'
elif method == 'sendMessage': elif method == 'sendMessage':
if len(params) == 0: if len(params) == 0:
@ -581,13 +526,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
helper_sent.insert(t) helper_sent.insert(t)
toLabel = '' toLabel = ''
t = (toAddress,) queryreturn = sqlQuery('''select label from addressbook where address=?''', toAddress)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
toLabel, = row toLabel, = row
@ -656,12 +595,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(ackdata) != 64: if len(ackdata) != 64:
raise APIError(15, 'The length of ackData should be 32 bytes (encoded in hex thus 64 characters).') raise APIError(15, 'The length of ackData should be 32 bytes (encoded in hex thus 64 characters).')
ackdata = self._decode(ackdata, "hex") ackdata = self._decode(ackdata, "hex")
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put( '''SELECT status FROM sent where ackdata=?''',
'''SELECT status FROM sent where ackdata=?''') ackdata)
shared.sqlSubmitQueue.put((ackdata,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
return 'notfound' return 'notfound'
for row in queryreturn: for row in queryreturn:
@ -701,23 +637,10 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
raise APIError(12, 'The stream number must be 1. Others aren\'t supported.') raise APIError(12, 'The stream number must be 1. Others aren\'t supported.')
# First we must check to see if the address is already in the # First we must check to see if the address is already in the
# subscriptions list. # subscriptions list.
shared.sqlLock.acquire() queryreturn = sqlQuery('''select * from subscriptions where address=?''', address)
t = (address,)
shared.sqlSubmitQueue.put(
'''select * from subscriptions where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
raise APIError(16, 'You are already subscribed to that address.') raise APIError(16, 'You are already subscribed to that address.')
t = (label, address, True) sqlExecute('''INSERT INTO subscriptions VALUES (?,?,?)''',label, address, True)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''INSERT INTO subscriptions VALUES (?,?,?)''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.reloadBroadcastSendersForWhichImWatching() shared.reloadBroadcastSendersForWhichImWatching()
shared.UISignalQueue.put(('rerenderInboxFromLabels', '')) shared.UISignalQueue.put(('rerenderInboxFromLabels', ''))
shared.UISignalQueue.put(('rerenderSubscriptions', '')) shared.UISignalQueue.put(('rerenderSubscriptions', ''))
@ -728,24 +651,13 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
raise APIError(0, 'I need 1 parameter!') raise APIError(0, 'I need 1 parameter!')
address, = params address, = params
address = addBMIfNotPresent(address) address = addBMIfNotPresent(address)
t = (address,) sqlExecute('''DELETE FROM subscriptions WHERE address=?''', address)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''DELETE FROM subscriptions WHERE address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.reloadBroadcastSendersForWhichImWatching() shared.reloadBroadcastSendersForWhichImWatching()
shared.UISignalQueue.put(('rerenderInboxFromLabels', '')) shared.UISignalQueue.put(('rerenderInboxFromLabels', ''))
shared.UISignalQueue.put(('rerenderSubscriptions', '')) shared.UISignalQueue.put(('rerenderSubscriptions', ''))
return 'Deleted subscription if it existed.' return 'Deleted subscription if it existed.'
elif method == 'listSubscriptions': elif method == 'listSubscriptions':
shared.sqlLock.acquire() queryreturn = sqlQuery('''SELECT label, address, enabled FROM subscriptions''')
shared.sqlSubmitQueue.put('''SELECT label, address, enabled FROM subscriptions''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
data = '{"subscriptions":[' data = '{"subscriptions":['
for row in queryreturn: for row in queryreturn:
label, address, enabled = row label, address, enabled = row
@ -816,26 +728,18 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
# use it we'll need to fill out a field in our inventory database # use it we'll need to fill out a field in our inventory database
# which is blank by default (first20bytesofencryptedmessage). # which is blank by default (first20bytesofencryptedmessage).
parameters = '' parameters = ''
with shared.sqlLock: queryreturn = sqlQuery(
shared.sqlSubmitQueue.put('''SELECT hash, payload FROM inventory WHERE first20bytesofencryptedmessage = '' and objecttype = 'msg' ; ''') '''SELECT hash, payload FROM inventory WHERE first20bytesofencryptedmessage = '' and objecttype = 'msg' ; ''')
shared.sqlSubmitQueue.put(parameters)
queryreturn = shared.sqlReturnQueue.get()
for row in queryreturn: for row in queryreturn:
hash, payload = row hash, payload = row
readPosition = 16 # Nonce length + time length readPosition = 16 # Nonce length + time length
readPosition += decodeVarint(payload[readPosition:readPosition+10])[1] # Stream Number length readPosition += decodeVarint(payload[readPosition:readPosition+10])[1] # Stream Number length
t = (payload[readPosition:readPosition+20],hash) t = (payload[readPosition:readPosition+20],hash)
shared.sqlSubmitQueue.put('''UPDATE inventory SET first20bytesofencryptedmessage=? WHERE hash=?; ''') sqlExecute('''UPDATE inventory SET first20bytesofencryptedmessage=? WHERE hash=?; ''', t)
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
parameters = (requestedHash,) queryreturn = sqlQuery('''SELECT payload FROM inventory WHERE first20bytesofencryptedmessage = ?''',
with shared.sqlLock: requestedHash)
shared.sqlSubmitQueue.put('commit')
shared.sqlSubmitQueue.put('''SELECT payload FROM inventory WHERE first20bytesofencryptedmessage = ?''')
shared.sqlSubmitQueue.put(parameters)
queryreturn = shared.sqlReturnQueue.get()
data = '{"receivedMessageDatas":[' data = '{"receivedMessageDatas":['
for row in queryreturn: for row in queryreturn:
payload, = row payload, = row
@ -853,11 +757,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
if len(requestedHash) != 40: if len(requestedHash) != 40:
raise APIError(19, 'The length of hash should be 20 bytes (encoded in hex thus 40 characters).') raise APIError(19, 'The length of hash should be 20 bytes (encoded in hex thus 40 characters).')
requestedHash = self._decode(requestedHash, "hex") requestedHash = self._decode(requestedHash, "hex")
parameters = (requestedHash,) queryreturn = sqlQuery('''SELECT transmitdata FROM pubkeys WHERE hash = ? ; ''', requestedHash)
with shared.sqlLock:
shared.sqlSubmitQueue.put('''SELECT transmitdata FROM pubkeys WHERE hash = ? ; ''')
shared.sqlSubmitQueue.put(parameters)
queryreturn = shared.sqlReturnQueue.get()
data = '{"pubkey":[' data = '{"pubkey":['
for row in queryreturn: for row in queryreturn:
transmitdata, = row transmitdata, = row

View File

@ -36,6 +36,7 @@ import debug
from debug import logger from debug import logger
import subprocess import subprocess
import datetime import datetime
from helper_sql import *
try: try:
from PyQt4 import QtCore, QtGui from PyQt4 import QtCore, QtGui
@ -346,11 +347,7 @@ class MyForm(QtGui.QMainWindow):
self.loadSent() self.loadSent()
# Initialize the address book # Initialize the address book
shared.sqlLock.acquire() queryreturn = sqlQuery('SELECT * FROM addressbook')
shared.sqlSubmitQueue.put('SELECT * FROM addressbook')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
label, address = row label, address = row
self.ui.tableWidgetAddressBook.insertRow(0) self.ui.tableWidgetAddressBook.insertRow(0)
@ -561,7 +558,7 @@ class MyForm(QtGui.QMainWindow):
else: else:
where = "toaddress || fromaddress || subject || message" where = "toaddress || fromaddress || subject || message"
sqlQuery = ''' sql = '''
SELECT toaddress, fromaddress, subject, message, status, ackdata, lastactiontime SELECT toaddress, fromaddress, subject, message, status, ackdata, lastactiontime
FROM sent WHERE folder="sent" AND %s LIKE ? FROM sent WHERE folder="sent" AND %s LIKE ?
ORDER BY lastactiontime ORDER BY lastactiontime
@ -570,12 +567,7 @@ class MyForm(QtGui.QMainWindow):
while self.ui.tableWidgetSent.rowCount() > 0: while self.ui.tableWidgetSent.rowCount() > 0:
self.ui.tableWidgetSent.removeRow(0) self.ui.tableWidgetSent.removeRow(0)
t = (what,) queryreturn = sqlQuery(sql, what)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(sqlQuery)
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
toAddress, fromAddress, subject, message, status, ackdata, lastactiontime = row toAddress, fromAddress, subject, message, status, ackdata, lastactiontime = row
subject = shared.fixPotentiallyInvalidUTF8Data(subject) subject = shared.fixPotentiallyInvalidUTF8Data(subject)
@ -588,13 +580,8 @@ class MyForm(QtGui.QMainWindow):
fromLabel = fromAddress fromLabel = fromAddress
toLabel = '' toLabel = ''
t = (toAddress,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''select label from addressbook where address=?''', toAddress)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
@ -691,7 +678,7 @@ class MyForm(QtGui.QMainWindow):
else: else:
where = "toaddress || fromaddress || subject || message" where = "toaddress || fromaddress || subject || message"
sqlQuery = ''' sql = '''
SELECT msgid, toaddress, fromaddress, subject, received, message, read SELECT msgid, toaddress, fromaddress, subject, received, message, read
FROM inbox WHERE folder="inbox" AND %s LIKE ? FROM inbox WHERE folder="inbox" AND %s LIKE ?
ORDER BY received ORDER BY received
@ -702,12 +689,7 @@ class MyForm(QtGui.QMainWindow):
font = QFont() font = QFont()
font.setBold(True) font.setBold(True)
t = (what,) queryreturn = sqlQuery(sql, what)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(sqlQuery)
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
msgid, toAddress, fromAddress, subject, received, message, read = row msgid, toAddress, fromAddress, subject, received, message, read = row
subject = shared.fixPotentiallyInvalidUTF8Data(subject) subject = shared.fixPotentiallyInvalidUTF8Data(subject)
@ -723,26 +705,16 @@ class MyForm(QtGui.QMainWindow):
toLabel = toAddress toLabel = toAddress
fromLabel = '' fromLabel = ''
t = (fromAddress,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''select label from addressbook where address=?''', fromAddress)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
fromLabel, = row fromLabel, = row
if fromLabel == '': # If this address wasn't in our address book... if fromLabel == '': # If this address wasn't in our address book...
t = (fromAddress,) queryReturn = sqlQuery(
shared.sqlLock.acquire() '''select label from subscriptions where address=?''', fromAddress)
shared.sqlSubmitQueue.put(
'''select label from subscriptions where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
@ -883,12 +855,8 @@ class MyForm(QtGui.QMainWindow):
if not (self.mmapp.has_source("Subscriptions") or self.mmapp.has_source("Messages")): if not (self.mmapp.has_source("Subscriptions") or self.mmapp.has_source("Messages")):
return return
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put( '''SELECT toaddress, read FROM inbox WHERE msgid=?''', inventoryHash)
'''SELECT toaddress, read FROM inbox WHERE msgid=?''')
shared.sqlSubmitQueue.put(inventoryHash)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
toAddress, read = row toAddress, read = row
if not read: if not read:
@ -904,12 +872,8 @@ class MyForm(QtGui.QMainWindow):
unreadMessages = 0 unreadMessages = 0
unreadSubscriptions = 0 unreadSubscriptions = 0
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT msgid, toaddress, read FROM inbox where folder='inbox' ''') '''SELECT msgid, toaddress, read FROM inbox where folder='inbox' ''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
msgid, toAddress, read = row msgid, toAddress, read = row
@ -1156,9 +1120,7 @@ class MyForm(QtGui.QMainWindow):
def click_actionDeleteAllTrashedMessages(self): def click_actionDeleteAllTrashedMessages(self):
if QtGui.QMessageBox.question(self, _translate("MainWindow", "Delete trash?"), _translate("MainWindow", "Are you sure you want to delete all trashed messages?"), QtGui.QMessageBox.Yes, QtGui.QMessageBox.No) == QtGui.QMessageBox.No: if QtGui.QMessageBox.question(self, _translate("MainWindow", "Delete trash?"), _translate("MainWindow", "Are you sure you want to delete all trashed messages?"), QtGui.QMessageBox.Yes, QtGui.QMessageBox.No) == QtGui.QMessageBox.No:
return return
shared.sqlLock.acquire() sqlStoredProcedure('deleteandvacuume')
shared.sqlSubmitQueue.put('deleteandvacuume')
shared.sqlLock.release()
def click_actionRegenerateDeterministicAddresses(self): def click_actionRegenerateDeterministicAddresses(self):
self.regenerateAddressesDialogInstance = regenerateAddressesDialog( self.regenerateAddressesDialogInstance = regenerateAddressesDialog(
@ -1439,13 +1401,8 @@ class MyForm(QtGui.QMainWindow):
addressToLookup = str(self.ui.tableWidgetInbox.item( addressToLookup = str(self.ui.tableWidgetInbox.item(
i, 1).data(Qt.UserRole).toPyObject()) i, 1).data(Qt.UserRole).toPyObject())
fromLabel = '' fromLabel = ''
t = (addressToLookup,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''select label from addressbook where address=?''', addressToLookup)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
@ -1455,12 +1412,8 @@ class MyForm(QtGui.QMainWindow):
else: else:
# It might be a broadcast message. We should check for that # It might be a broadcast message. We should check for that
# label. # label.
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put( '''select label from subscriptions where address=?''', addressToLookup)
'''select label from subscriptions where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
@ -1506,13 +1459,8 @@ class MyForm(QtGui.QMainWindow):
addressToLookup = str(self.ui.tableWidgetSent.item( addressToLookup = str(self.ui.tableWidgetSent.item(
i, 0).data(Qt.UserRole).toPyObject()) i, 0).data(Qt.UserRole).toPyObject())
toLabel = '' toLabel = ''
t = (addressToLookup,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''select label from addressbook where address=?''', addressToLookup)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
@ -1522,12 +1470,7 @@ class MyForm(QtGui.QMainWindow):
def rerenderSubscriptions(self): def rerenderSubscriptions(self):
self.ui.tableWidgetSubscriptions.setRowCount(0) self.ui.tableWidgetSubscriptions.setRowCount(0)
shared.sqlLock.acquire() queryreturn = sqlQuery('SELECT label, address, enabled FROM subscriptions')
shared.sqlSubmitQueue.put(
'SELECT label, address, enabled FROM subscriptions')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
label, address, enabled = row label, address, enabled = row
self.ui.tableWidgetSubscriptions.insertRow(0) self.ui.tableWidgetSubscriptions.insertRow(0)
@ -1610,24 +1553,26 @@ class MyForm(QtGui.QMainWindow):
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Warning: You are currently not connected. Bitmessage will do the work necessary to send the message but it won\'t send until you connect.")) "MainWindow", "Warning: You are currently not connected. Bitmessage will do the work necessary to send the message but it won\'t send until you connect."))
ackdata = OpenSSL.rand(32) ackdata = OpenSSL.rand(32)
shared.sqlLock.acquire() t = ()
t = ('', toAddress, ripe, fromAddress, subject, message, ackdata, int( sqlExecute(
time.time()), 'msgqueued', 1, 1, 'sent', 2) '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''',
shared.sqlSubmitQueue.put( '',
'''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''') toAddress,
shared.sqlSubmitQueue.put(t) ripe,
shared.sqlReturnQueue.get() fromAddress,
shared.sqlSubmitQueue.put('commit') subject,
shared.sqlLock.release() message,
ackdata,
int(time.time()),
'msgqueued',
1,
1,
'sent',
2)
toLabel = '' toLabel = ''
t = (toAddress,) queryreturn = sqlQuery('''select label from addressbook where address=?''',
shared.sqlLock.acquire() toAddress)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
toLabel, = row toLabel, = row
@ -1658,15 +1603,10 @@ class MyForm(QtGui.QMainWindow):
ackdata = OpenSSL.rand(32) ackdata = OpenSSL.rand(32)
toAddress = self.str_broadcast_subscribers toAddress = self.str_broadcast_subscribers
ripe = '' ripe = ''
shared.sqlLock.acquire()
t = ('', toAddress, ripe, fromAddress, subject, message, ackdata, int( t = ('', toAddress, ripe, fromAddress, subject, message, ackdata, int(
time.time()), 'broadcastqueued', 1, 1, 'sent', 2) time.time()), 'broadcastqueued', 1, 1, 'sent', 2)
shared.sqlSubmitQueue.put( sqlExecute(
'''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''') '''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''', t)
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.workerQueue.put(('sendbroadcast', '')) shared.workerQueue.put(('sendbroadcast', ''))
@ -1822,25 +1762,15 @@ class MyForm(QtGui.QMainWindow):
subject = shared.fixPotentiallyInvalidUTF8Data(subject) subject = shared.fixPotentiallyInvalidUTF8Data(subject)
message = shared.fixPotentiallyInvalidUTF8Data(message) message = shared.fixPotentiallyInvalidUTF8Data(message)
fromLabel = '' fromLabel = ''
shared.sqlLock.acquire() queryreturn = sqlQuery(
t = (fromAddress,) '''select label from addressbook where address=?''', fromAddress)
shared.sqlSubmitQueue.put(
'''select label from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
fromLabel, = row fromLabel, = row
else: else:
# There might be a label in the subscriptions table # There might be a label in the subscriptions table
shared.sqlLock.acquire() queryreturn = sqlQuery(
t = (fromAddress,) '''select label from subscriptions where address=?''', fromAddress)
shared.sqlSubmitQueue.put(
'''select label from subscriptions where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
fromLabel, = row fromLabel, = row
@ -1914,13 +1844,7 @@ class MyForm(QtGui.QMainWindow):
"MainWindow", "The address you entered was invalid. Ignoring it.")) "MainWindow", "The address you entered was invalid. Ignoring it."))
def addEntryToAddressBook(self,address,label): def addEntryToAddressBook(self,address,label):
shared.sqlLock.acquire() queryreturn = sqlQuery('''select * from addressbook where address=?''', address)
t = (address,)
shared.sqlSubmitQueue.put(
'''select * from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
self.ui.tableWidgetAddressBook.setSortingEnabled(False) self.ui.tableWidgetAddressBook.setSortingEnabled(False)
self.ui.tableWidgetAddressBook.insertRow(0) self.ui.tableWidgetAddressBook.insertRow(0)
@ -1931,14 +1855,7 @@ class MyForm(QtGui.QMainWindow):
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled)
self.ui.tableWidgetAddressBook.setItem(0, 1, newItem) self.ui.tableWidgetAddressBook.setItem(0, 1, newItem)
self.ui.tableWidgetAddressBook.setSortingEnabled(True) self.ui.tableWidgetAddressBook.setSortingEnabled(True)
t = (str(label), address) sqlExecute('''INSERT INTO addressbook VALUES (?,?)''', str(label), address)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''INSERT INTO addressbook VALUES (?,?)''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()
else: else:
@ -1960,13 +1877,7 @@ class MyForm(QtGui.QMainWindow):
self.ui.tableWidgetSubscriptions.setItem(0,1,newItem) self.ui.tableWidgetSubscriptions.setItem(0,1,newItem)
self.ui.tableWidgetSubscriptions.setSortingEnabled(True) self.ui.tableWidgetSubscriptions.setSortingEnabled(True)
#Add to database (perhaps this should be separated from the MyForm class) #Add to database (perhaps this should be separated from the MyForm class)
t = (str(label),address,True) sqlExecute('''INSERT INTO subscriptions VALUES (?,?,?)''',str(label),address,True)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO subscriptions VALUES (?,?,?)''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
shared.reloadBroadcastSendersForWhichImWatching() shared.reloadBroadcastSendersForWhichImWatching()
@ -1987,16 +1898,10 @@ class MyForm(QtGui.QMainWindow):
def loadBlackWhiteList(self): def loadBlackWhiteList(self):
# Initialize the Blacklist or Whitelist table # Initialize the Blacklist or Whitelist table
listType = shared.config.get('bitmessagesettings', 'blackwhitelist') listType = shared.config.get('bitmessagesettings', 'blackwhitelist')
shared.sqlLock.acquire()
if listType == 'black': if listType == 'black':
shared.sqlSubmitQueue.put( queryreturn = sqlQuery('''SELECT label, address, enabled FROM blacklist''')
'''SELECT label, address, enabled FROM blacklist''')
else: else:
shared.sqlSubmitQueue.put( queryreturn = sqlQuery('''SELECT label, address, enabled FROM whitelist''')
'''SELECT label, address, enabled FROM whitelist''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
label, address, enabled = row label, address, enabled = row
self.ui.tableWidgetBlacklist.insertRow(0) self.ui.tableWidgetBlacklist.insertRow(0)
@ -2125,9 +2030,7 @@ class MyForm(QtGui.QMainWindow):
if shared.appdata != '' and self.settingsDialogInstance.ui.checkBoxPortableMode.isChecked(): # If we are NOT using portable mode now but the user selected that we should... if shared.appdata != '' and self.settingsDialogInstance.ui.checkBoxPortableMode.isChecked(): # If we are NOT using portable mode now but the user selected that we should...
# Write the keys.dat file to disk in the new location # Write the keys.dat file to disk in the new location
shared.sqlLock.acquire() sqlStoredProcedure('movemessagstoprog')
shared.sqlSubmitQueue.put('movemessagstoprog')
shared.sqlLock.release()
with open('keys.dat', 'wb') as configfile: with open('keys.dat', 'wb') as configfile:
shared.config.write(configfile) shared.config.write(configfile)
# Write the knownnodes.dat file to disk in the new location # Write the knownnodes.dat file to disk in the new location
@ -2151,9 +2054,7 @@ class MyForm(QtGui.QMainWindow):
shared.appdata = shared.lookupAppdataFolder() shared.appdata = shared.lookupAppdataFolder()
if not os.path.exists(shared.appdata): if not os.path.exists(shared.appdata):
os.makedirs(shared.appdata) os.makedirs(shared.appdata)
shared.sqlLock.acquire() sqlStoredProcedure('movemessagstoappdata')
shared.sqlSubmitQueue.put('movemessagstoappdata')
shared.sqlLock.release()
# Write the keys.dat file to disk in the new location # Write the keys.dat file to disk in the new location
with open(shared.appdata + 'keys.dat', 'wb') as configfile: with open(shared.appdata + 'keys.dat', 'wb') as configfile:
shared.config.write(configfile) shared.config.write(configfile)
@ -2199,18 +2100,13 @@ class MyForm(QtGui.QMainWindow):
# First we must check to see if the address is already in the # First we must check to see if the address is already in the
# address book. The user cannot add it again or else it will # address book. The user cannot add it again or else it will
# cause problems when updating and deleting the entry. # cause problems when updating and deleting the entry.
shared.sqlLock.acquire()
t = (addBMIfNotPresent(str( t = (addBMIfNotPresent(str(
self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())),) self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())),)
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black':
shared.sqlSubmitQueue.put( sql = '''select * from blacklist where address=?'''
'''select * from blacklist where address=?''')
else: else:
shared.sqlSubmitQueue.put( sql = '''select * from whitelist where address=?'''
'''select * from whitelist where address=?''') queryreturn = sqlQuery(sql,*t)
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
self.ui.tableWidgetBlacklist.setSortingEnabled(False) self.ui.tableWidgetBlacklist.setSortingEnabled(False)
self.ui.tableWidgetBlacklist.insertRow(0) self.ui.tableWidgetBlacklist.insertRow(0)
@ -2225,17 +2121,11 @@ class MyForm(QtGui.QMainWindow):
self.ui.tableWidgetBlacklist.setSortingEnabled(True) self.ui.tableWidgetBlacklist.setSortingEnabled(True)
t = (str(self.NewBlacklistDialogInstance.ui.newsubscriptionlabel.text().toUtf8()), addBMIfNotPresent( t = (str(self.NewBlacklistDialogInstance.ui.newsubscriptionlabel.text().toUtf8()), addBMIfNotPresent(
str(self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())), True) str(self.NewBlacklistDialogInstance.ui.lineEditSubscriptionAddress.text())), True)
shared.sqlLock.acquire()
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black':
shared.sqlSubmitQueue.put( sql = '''INSERT INTO blacklist VALUES (?,?,?)'''
'''INSERT INTO blacklist VALUES (?,?,?)''')
else: else:
shared.sqlSubmitQueue.put( sql = '''INSERT INTO whitelist VALUES (?,?,?)'''
'''INSERT INTO whitelist VALUES (?,?,?)''') sqlExecute(sql, *t)
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
else: else:
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want.")) "MainWindow", "Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want."))
@ -2362,20 +2252,11 @@ class MyForm(QtGui.QMainWindow):
currentRow = row.row() currentRow = row.row()
inventoryHashToMarkUnread = str(self.ui.tableWidgetInbox.item( inventoryHashToMarkUnread = str(self.ui.tableWidgetInbox.item(
currentRow, 3).data(Qt.UserRole).toPyObject()) currentRow, 3).data(Qt.UserRole).toPyObject())
t = (inventoryHashToMarkUnread,) sqlExecute('''UPDATE inbox SET read=0 WHERE msgid=?''', inventoryHashToMarkUnread)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE inbox SET read=0 WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlLock.release()
self.ui.tableWidgetInbox.item(currentRow, 0).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 0).setFont(font)
self.ui.tableWidgetInbox.item(currentRow, 1).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 1).setFont(font)
self.ui.tableWidgetInbox.item(currentRow, 2).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 2).setFont(font)
self.ui.tableWidgetInbox.item(currentRow, 3).setFont(font) self.ui.tableWidgetInbox.item(currentRow, 3).setFont(font)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# self.ui.tableWidgetInbox.selectRow(currentRow + 1) # self.ui.tableWidgetInbox.selectRow(currentRow + 1)
# This doesn't de-select the last message if you try to mark it unread, but that doesn't interfere. Might not be necessary. # This doesn't de-select the last message if you try to mark it unread, but that doesn't interfere. Might not be necessary.
# We could also select upwards, but then our problem would be with the topmost message. # We could also select upwards, but then our problem would be with the topmost message.
@ -2420,13 +2301,8 @@ class MyForm(QtGui.QMainWindow):
addressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item( addressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item(
currentInboxRow, 1).data(Qt.UserRole).toPyObject()) currentInboxRow, 1).data(Qt.UserRole).toPyObject())
# Let's make sure that it isn't already in the address book # Let's make sure that it isn't already in the address book
shared.sqlLock.acquire() queryreturn = sqlQuery('''select * from addressbook where address=?''',
t = (addressAtCurrentInboxRow,) addressAtCurrentInboxRow)
shared.sqlSubmitQueue.put(
'''select * from addressbook where address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
self.ui.tableWidgetAddressBook.insertRow(0) self.ui.tableWidgetAddressBook.insertRow(0)
newItem = QtGui.QTableWidgetItem( newItem = QtGui.QTableWidgetItem(
@ -2436,15 +2312,9 @@ class MyForm(QtGui.QMainWindow):
newItem.setFlags( newItem.setFlags(
QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled) QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled)
self.ui.tableWidgetAddressBook.setItem(0, 1, newItem) self.ui.tableWidgetAddressBook.setItem(0, 1, newItem)
t = ('--New entry. Change label in Address Book.--', sqlExecute('''INSERT INTO addressbook VALUES (?,?)''',
'--New entry. Change label in Address Book.--',
addressAtCurrentInboxRow) addressAtCurrentInboxRow)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''INSERT INTO addressbook VALUES (?,?)''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tabWidget.setCurrentIndex(5) self.ui.tabWidget.setCurrentIndex(5)
self.ui.tableWidgetAddressBook.setCurrentCell(0, 0) self.ui.tableWidgetAddressBook.setCurrentCell(0, 0)
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
@ -2459,20 +2329,11 @@ class MyForm(QtGui.QMainWindow):
currentRow = self.ui.tableWidgetInbox.selectedIndexes()[0].row() currentRow = self.ui.tableWidgetInbox.selectedIndexes()[0].row()
inventoryHashToTrash = str(self.ui.tableWidgetInbox.item( inventoryHashToTrash = str(self.ui.tableWidgetInbox.item(
currentRow, 3).data(Qt.UserRole).toPyObject()) currentRow, 3).data(Qt.UserRole).toPyObject())
t = (inventoryHashToTrash,) sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', inventoryHashToTrash)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE inbox SET folder='trash' WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlLock.release()
self.ui.textEditInboxMessage.setText("") self.ui.textEditInboxMessage.setText("")
self.ui.tableWidgetInbox.removeRow(currentRow) self.ui.tableWidgetInbox.removeRow(currentRow)
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back.")) "MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back."))
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
if currentRow == 0: if currentRow == 0:
self.ui.tableWidgetInbox.selectRow(currentRow) self.ui.tableWidgetInbox.selectRow(currentRow)
else: else:
@ -2503,20 +2364,11 @@ class MyForm(QtGui.QMainWindow):
currentRow = self.ui.tableWidgetSent.selectedIndexes()[0].row() currentRow = self.ui.tableWidgetSent.selectedIndexes()[0].row()
ackdataToTrash = str(self.ui.tableWidgetSent.item( ackdataToTrash = str(self.ui.tableWidgetSent.item(
currentRow, 3).data(Qt.UserRole).toPyObject()) currentRow, 3).data(Qt.UserRole).toPyObject())
t = (ackdataToTrash,) sqlExecute('''UPDATE sent SET folder='trash' WHERE ackdata=?''', ackdataToTrash)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET folder='trash' WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlLock.release()
self.ui.textEditSentMessage.setPlainText("") self.ui.textEditSentMessage.setPlainText("")
self.ui.tableWidgetSent.removeRow(currentRow) self.ui.tableWidgetSent.removeRow(currentRow)
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back.")) "MainWindow", "Moved items to trash. There is no user interface to view your trash, but it is still on disk if you are desperate to get it back."))
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
if currentRow == 0: if currentRow == 0:
self.ui.tableWidgetSent.selectRow(currentRow) self.ui.tableWidgetSent.selectRow(currentRow)
else: else:
@ -2527,18 +2379,10 @@ class MyForm(QtGui.QMainWindow):
addressAtCurrentRow = str(self.ui.tableWidgetSent.item( addressAtCurrentRow = str(self.ui.tableWidgetSent.item(
currentRow, 0).data(Qt.UserRole).toPyObject()) currentRow, 0).data(Qt.UserRole).toPyObject())
toRipe = decodeAddress(addressAtCurrentRow)[3] toRipe = decodeAddress(addressAtCurrentRow)[3]
t = (toRipe,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='forcepow' WHERE toripe=? AND status='toodifficult' and folder='sent' ''',
shared.sqlSubmitQueue.put( toRipe)
'''UPDATE sent SET status='forcepow' WHERE toripe=? AND status='toodifficult' and folder='sent' ''') queryreturn = sqlQuery('''select ackdata FROM sent WHERE status='forcepow' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlSubmitQueue.put(
'''select ackdata FROM sent WHERE status='forcepow' ''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
ackdata, = row ackdata, = row
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
@ -2564,14 +2408,8 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).text().toUtf8() currentRow, 0).text().toUtf8()
addressAtCurrentRow = self.ui.tableWidgetAddressBook.item( addressAtCurrentRow = self.ui.tableWidgetAddressBook.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) sqlExecute('''DELETE FROM addressbook WHERE label=? AND address=?''',
shared.sqlLock.acquire() str(labelAtCurrentRow), str(addressAtCurrentRow))
shared.sqlSubmitQueue.put(
'''DELETE FROM addressbook WHERE label=? AND address=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tableWidgetAddressBook.removeRow(currentRow) self.ui.tableWidgetAddressBook.removeRow(currentRow)
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()
@ -2641,14 +2479,8 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).text().toUtf8() currentRow, 0).text().toUtf8()
addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) sqlExecute('''DELETE FROM subscriptions WHERE label=? AND address=?''',
shared.sqlLock.acquire() str(labelAtCurrentRow), str(addressAtCurrentRow))
shared.sqlSubmitQueue.put(
'''DELETE FROM subscriptions WHERE label=? AND address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tableWidgetSubscriptions.removeRow(currentRow) self.ui.tableWidgetSubscriptions.removeRow(currentRow)
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
shared.reloadBroadcastSendersForWhichImWatching() shared.reloadBroadcastSendersForWhichImWatching()
@ -2666,14 +2498,9 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).text().toUtf8() currentRow, 0).text().toUtf8()
addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) sqlExecute(
shared.sqlLock.acquire() '''update subscriptions set enabled=1 WHERE label=? AND address=?''',
shared.sqlSubmitQueue.put( str(labelAtCurrentRow), str(addressAtCurrentRow))
'''update subscriptions set enabled=1 WHERE label=? AND address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tableWidgetSubscriptions.item( self.ui.tableWidgetSubscriptions.item(
currentRow, 0).setTextColor(QApplication.palette().text().color()) currentRow, 0).setTextColor(QApplication.palette().text().color())
self.ui.tableWidgetSubscriptions.item( self.ui.tableWidgetSubscriptions.item(
@ -2686,14 +2513,9 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).text().toUtf8() currentRow, 0).text().toUtf8()
addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(labelAtCurrentRow), str(addressAtCurrentRow)) sqlExecute(
shared.sqlLock.acquire() '''update subscriptions set enabled=0 WHERE label=? AND address=?''',
shared.sqlSubmitQueue.put( str(labelAtCurrentRow), str(addressAtCurrentRow))
'''update subscriptions set enabled=0 WHERE label=? AND address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tableWidgetSubscriptions.item( self.ui.tableWidgetSubscriptions.item(
currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128)) currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128))
self.ui.tableWidgetSubscriptions.item( self.ui.tableWidgetSubscriptions.item(
@ -2714,20 +2536,14 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).text().toUtf8() currentRow, 0).text().toUtf8()
addressAtCurrentRow = self.ui.tableWidgetBlacklist.item( addressAtCurrentRow = self.ui.tableWidgetBlacklist.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(labelAtCurrentRow), str(addressAtCurrentRow))
shared.sqlLock.acquire()
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black':
shared.sqlSubmitQueue.put( sqlExecute(
'''DELETE FROM blacklist WHERE label=? AND address=?''') '''DELETE FROM blacklist WHERE label=? AND address=?''',
shared.sqlSubmitQueue.put(t) str(labelAtCurrentRow), str(addressAtCurrentRow))
shared.sqlReturnQueue.get()
else: else:
shared.sqlSubmitQueue.put( sqlExecute(
'''DELETE FROM whitelist WHERE label=? AND address=?''') '''DELETE FROM whitelist WHERE label=? AND address=?''',
shared.sqlSubmitQueue.put(t) str(labelAtCurrentRow), str(addressAtCurrentRow))
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.ui.tableWidgetBlacklist.removeRow(currentRow) self.ui.tableWidgetBlacklist.removeRow(currentRow)
def on_action_BlacklistClipboard(self): def on_action_BlacklistClipboard(self):
@ -2749,20 +2565,14 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).setTextColor(QApplication.palette().text().color()) currentRow, 0).setTextColor(QApplication.palette().text().color())
self.ui.tableWidgetBlacklist.item( self.ui.tableWidgetBlacklist.item(
currentRow, 1).setTextColor(QApplication.palette().text().color()) currentRow, 1).setTextColor(QApplication.palette().text().color())
t = (str(addressAtCurrentRow),)
shared.sqlLock.acquire()
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black':
shared.sqlSubmitQueue.put( sqlExecute(
'''UPDATE blacklist SET enabled=1 WHERE address=?''') '''UPDATE blacklist SET enabled=1 WHERE address=?''',
shared.sqlSubmitQueue.put(t) str(addressAtCurrentRow))
shared.sqlReturnQueue.get()
else: else:
shared.sqlSubmitQueue.put( sqlExecute(
'''UPDATE whitelist SET enabled=1 WHERE address=?''') '''UPDATE whitelist SET enabled=1 WHERE address=?''',
shared.sqlSubmitQueue.put(t) str(addressAtCurrentRow))
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
def on_action_BlacklistDisable(self): def on_action_BlacklistDisable(self):
currentRow = self.ui.tableWidgetBlacklist.currentRow() currentRow = self.ui.tableWidgetBlacklist.currentRow()
@ -2772,20 +2582,12 @@ class MyForm(QtGui.QMainWindow):
currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128)) currentRow, 0).setTextColor(QtGui.QColor(128, 128, 128))
self.ui.tableWidgetBlacklist.item( self.ui.tableWidgetBlacklist.item(
currentRow, 1).setTextColor(QtGui.QColor(128, 128, 128)) currentRow, 1).setTextColor(QtGui.QColor(128, 128, 128))
t = (str(addressAtCurrentRow),)
shared.sqlLock.acquire()
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black':
shared.sqlSubmitQueue.put( sqlExecute(
'''UPDATE blacklist SET enabled=0 WHERE address=?''') '''UPDATE blacklist SET enabled=0 WHERE address=?''', str(addressAtCurrentRow))
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
else: else:
shared.sqlSubmitQueue.put( sqlExecute(
'''UPDATE whitelist SET enabled=0 WHERE address=?''') '''UPDATE whitelist SET enabled=0 WHERE address=?''', str(addressAtCurrentRow))
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# Group of functions for the Your Identities dialog box # Group of functions for the Your Identities dialog box
def on_action_YourIdentitiesNew(self): def on_action_YourIdentitiesNew(self):
@ -2851,12 +2653,7 @@ class MyForm(QtGui.QMainWindow):
currentRow = self.ui.tableWidgetSent.currentRow() currentRow = self.ui.tableWidgetSent.currentRow()
ackData = str(self.ui.tableWidgetSent.item( ackData = str(self.ui.tableWidgetSent.item(
currentRow, 3).data(Qt.UserRole).toPyObject()) currentRow, 3).data(Qt.UserRole).toPyObject())
shared.sqlLock.acquire() queryreturn = sqlQuery('''SELECT status FROM sent where ackdata=?''', ackData)
shared.sqlSubmitQueue.put(
'''SELECT status FROM sent where ackdata=?''')
shared.sqlSubmitQueue.put((ackData,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
status, = row status, = row
if status == 'toodifficult': if status == 'toodifficult':
@ -2913,13 +2710,7 @@ class MyForm(QtGui.QMainWindow):
currentRow, 3).data(Qt.UserRole).toPyObject()) currentRow, 3).data(Qt.UserRole).toPyObject())
t = (inventoryHash,) t = (inventoryHash,)
self.ubuntuMessagingMenuClear(t) self.ubuntuMessagingMenuClear(t)
shared.sqlLock.acquire() sqlExecute('''update inbox set read=1 WHERE msgid=?''', *t)
shared.sqlSubmitQueue.put(
'''update inbox set read=1 WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
def tableWidgetSentItemClicked(self): def tableWidgetSentItemClicked(self):
currentRow = self.ui.tableWidgetSent.currentRow() currentRow = self.ui.tableWidgetSent.currentRow()
@ -2944,35 +2735,23 @@ class MyForm(QtGui.QMainWindow):
def tableWidgetAddressBookItemChanged(self): def tableWidgetAddressBookItemChanged(self):
currentRow = self.ui.tableWidgetAddressBook.currentRow() currentRow = self.ui.tableWidgetAddressBook.currentRow()
shared.sqlLock.acquire()
if currentRow >= 0: if currentRow >= 0:
addressAtCurrentRow = self.ui.tableWidgetAddressBook.item( addressAtCurrentRow = self.ui.tableWidgetAddressBook.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(self.ui.tableWidgetAddressBook.item( sqlExecute('''UPDATE addressbook set label=? WHERE address=?''',
currentRow, 0).text().toUtf8()), str(addressAtCurrentRow)) str(self.ui.tableWidgetAddressBook.item(currentRow, 0).text().toUtf8()),
shared.sqlSubmitQueue.put( str(addressAtCurrentRow))
'''UPDATE addressbook set label=? WHERE address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()
def tableWidgetSubscriptionsItemChanged(self): def tableWidgetSubscriptionsItemChanged(self):
currentRow = self.ui.tableWidgetSubscriptions.currentRow() currentRow = self.ui.tableWidgetSubscriptions.currentRow()
shared.sqlLock.acquire()
if currentRow >= 0: if currentRow >= 0:
addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item( addressAtCurrentRow = self.ui.tableWidgetSubscriptions.item(
currentRow, 1).text() currentRow, 1).text()
t = (str(self.ui.tableWidgetSubscriptions.item( sqlExecute('''UPDATE subscriptions set label=? WHERE address=?''',
currentRow, 0).text().toUtf8()), str(addressAtCurrentRow)) str(self.ui.tableWidgetSubscriptions.item(currentRow, 0).text().toUtf8()),
shared.sqlSubmitQueue.put( str(addressAtCurrentRow))
'''UPDATE subscriptions set label=? WHERE address=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
self.rerenderInboxFromLabels() self.rerenderInboxFromLabels()
self.rerenderSentToLabels() self.rerenderSentToLabels()

View File

@ -19,6 +19,7 @@ import helper_generic
import helper_bitcoin import helper_bitcoin
import helper_inbox import helper_inbox
import helper_sent import helper_sent
from helper_sql import *
import tr import tr
#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys #from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys
@ -281,16 +282,13 @@ class receiveDataThread(threading.Thread):
self.sendBigInv() self.sendBigInv()
def sendBigInv(self): def sendBigInv(self):
shared.sqlLock.acquire()
# Select all hashes which are younger than two days old and in this # Select all hashes which are younger than two days old and in this
# stream. # stream.
t = (int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, int( queryreturn = sqlQuery(
time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber) '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''',
shared.sqlSubmitQueue.put( int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers,
'''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''') int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys,
shared.sqlSubmitQueue.put(t) self.streamNumber)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
bigInvList = {} bigInvList = {}
for row in queryreturn: for row in queryreturn:
hash, = row hash, = row
@ -507,15 +505,12 @@ class receiveDataThread(threading.Thread):
# won't be able to send this pubkey to others (without doing # won't be able to send this pubkey to others (without doing
# the proof of work ourselves, which this program is programmed # the proof of work ourselves, which this program is programmed
# to not do.) # to not do.)
t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[ sqlExecute(
beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') '''INSERT INTO pubkeys VALUES (?,?,?,?)''',
shared.sqlLock.acquire() ripe.digest(),
shared.sqlSubmitQueue.put( '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[beginningOfPubkeyPosition:endOfPubkeyPosition],
'''INSERT INTO pubkeys VALUES (?,?,?,?)''') int(time.time()),
shared.sqlSubmitQueue.put(t) 'yes')
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest())))
# This will check to see whether we happen to be awaiting this # This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the # pubkey in order to send a message. If we are, it will do the
@ -657,15 +652,11 @@ class receiveDataThread(threading.Thread):
# Let's store the public key in case we want to reply to this # Let's store the public key in case we want to reply to this
# person. # person.
t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''',
beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') ripe.digest(),
shared.sqlLock.acquire() '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[beginningOfPubkeyPosition:endOfPubkeyPosition],
shared.sqlSubmitQueue.put( int(time.time()),
'''INSERT INTO pubkeys VALUES (?,?,?,?)''') 'yes')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest())))
# This will check to see whether we happen to be awaiting this # This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the POW # pubkey in order to send a message. If we are, it will do the POW
@ -802,14 +793,8 @@ class receiveDataThread(threading.Thread):
print 'This msg IS an acknowledgement bound for me.' print 'This msg IS an acknowledgement bound for me.'
del shared.ackdataForWhichImWatching[encryptedData[readPosition:]] del shared.ackdataForWhichImWatching[encryptedData[readPosition:]]
t = ('ackreceived', encryptedData[readPosition:]) sqlExecute('UPDATE sent SET status=? WHERE ackdata=?',
shared.sqlLock.acquire() 'ackreceived', encryptedData[readPosition:])
shared.sqlSubmitQueue.put(
'UPDATE sent SET status=? WHERE ackdata=?')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode(
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8'))))) time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
return return
@ -932,15 +917,12 @@ class receiveDataThread(threading.Thread):
ripe.update(sha.digest()) ripe.update(sha.digest())
# Let's store the public key in case we want to reply to this # Let's store the public key in case we want to reply to this
# person. # person.
t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ sqlExecute(
messageVersionLength:endOfThePublicKeyPosition], int(time.time()), 'yes') '''INSERT INTO pubkeys VALUES (?,?,?,?)''',
shared.sqlLock.acquire() ripe.digest(),
shared.sqlSubmitQueue.put( '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionLength:endOfThePublicKeyPosition],
'''INSERT INTO pubkeys VALUES (?,?,?,?)''') int(time.time()),
shared.sqlSubmitQueue.put(t) 'yes')
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) # shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest())))
# This will check to see whether we happen to be awaiting this # This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the POW # pubkey in order to send a message. If we are, it will do the POW
@ -962,26 +944,18 @@ class receiveDataThread(threading.Thread):
return return
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists. blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist
t = (fromAddress,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''SELECT label FROM blacklist where address=? and enabled='1' ''',
shared.sqlSubmitQueue.put( fromAddress)
'''SELECT label FROM blacklist where address=? and enabled='1' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
with shared.printLock: with shared.printLock:
print 'Message ignored because address is in blacklist.' print 'Message ignored because address is in blacklist.'
blockMessage = True blockMessage = True
else: # We're using a whitelist else: # We're using a whitelist
t = (fromAddress,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''SELECT label FROM whitelist where address=? and enabled='1' ''',
shared.sqlSubmitQueue.put( toAddress)
'''SELECT label FROM whitelist where address=? and enabled='1' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
print 'Message ignored because address not in whitelist.' print 'Message ignored because address not in whitelist.'
blockMessage = True blockMessage = True
@ -1111,14 +1085,9 @@ class receiveDataThread(threading.Thread):
if toRipe in shared.neededPubkeys: if toRipe in shared.neededPubkeys:
print 'We have been awaiting the arrival of this pubkey.' print 'We have been awaiting the arrival of this pubkey.'
del shared.neededPubkeys[toRipe] del shared.neededPubkeys[toRipe]
t = (toRipe,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''',
shared.sqlSubmitQueue.put( toRipe)
'''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.workerQueue.put(('sendmessage', '')) shared.workerQueue.put(('sendmessage', ''))
else: else:
with shared.printLock: with shared.printLock:
@ -1254,13 +1223,8 @@ class receiveDataThread(threading.Thread):
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
t = (ripe,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe)
shared.sqlSubmitQueue.put(
'''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.' print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe, data, embeddedTime, 'yes') t = (ripe, data, embeddedTime, 'yes')
@ -1268,13 +1232,7 @@ class receiveDataThread(threading.Thread):
print 'We have NOT used this pubkey personally. Inserting in database.' print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe, data, embeddedTime, 'no') t = (ripe, data, embeddedTime, 'no')
# This will also update the embeddedTime. # This will also update the embeddedTime.
shared.sqlLock.acquire() sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t)
shared.sqlSubmitQueue.put(
'''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self.possibleNewPubkey(ripe) self.possibleNewPubkey(ripe)
if addressVersion == 3: if addressVersion == 3:
@ -1323,13 +1281,7 @@ class receiveDataThread(threading.Thread):
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
t = (ripe,) queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.' print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe, data, embeddedTime, 'yes') t = (ripe, data, embeddedTime, 'yes')
@ -1337,13 +1289,7 @@ class receiveDataThread(threading.Thread):
print 'We have NOT used this pubkey personally. Inserting in database.' print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe, data, embeddedTime, 'no') t = (ripe, data, embeddedTime, 'no')
# This will also update the embeddedTime. # This will also update the embeddedTime.
shared.sqlLock.acquire() sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t)
shared.sqlSubmitQueue.put(
'''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
# shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self.possibleNewPubkey(ripe) self.possibleNewPubkey(ripe)
@ -1540,13 +1486,9 @@ class receiveDataThread(threading.Thread):
hash] hash]
self.sendData(objectType, payload) self.sendData(objectType, payload)
else: else:
t = (hash,) queryreturn = sqlQuery(
shared.sqlLock.acquire() '''select objecttype, payload from inventory where hash=?''',
shared.sqlSubmitQueue.put( hash)
'''select objecttype, payload from inventory where hash=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: if queryreturn != []:
for row in queryreturn: for row in queryreturn:
objectType, payload = row objectType, payload = row

View File

@ -2,6 +2,7 @@ import threading
import shared import shared
import time import time
import sys import sys
from helper_sql import *
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory: It cleans these data structures in memory:
@ -27,21 +28,23 @@ class singleCleaner(threading.Thread):
timeWeLastClearedInventoryAndPubkeysTables = 0 timeWeLastClearedInventoryAndPubkeysTables = 0
while True: while True:
shared.sqlLock.acquire()
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
with SqlBulkExecute() as sql:
for hash, storedValue in shared.inventory.items(): for hash, storedValue in shared.inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
if int(time.time()) - 3600 > receivedTime: if int(time.time()) - 3600 > receivedTime:
t = (hash, objectType, streamNumber, payload, receivedTime,'') sql.execute(
shared.sqlSubmitQueue.put( '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') hash,
shared.sqlSubmitQueue.put(t) objectType,
shared.sqlReturnQueue.get() streamNumber,
payload,
receivedTime,
'')
del shared.inventory[hash] del shared.inventory[hash]
shared.sqlSubmitQueue.put('commit')
shared.UISignalQueue.put(('updateStatusBar', '')) shared.UISignalQueue.put(('updateStatusBar', ''))
shared.sqlLock.release()
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
# If we are running as a daemon then we are going to fill up the UI # If we are running as a daemon then we are going to fill up the UI
@ -53,29 +56,20 @@ class singleCleaner(threading.Thread):
timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
# inventory (moves data from the inventory data structure to # inventory (moves data from the inventory data structure to
# the on-disk sql database) # the on-disk sql database)
shared.sqlLock.acquire()
# inventory (clears pubkeys after 28 days and everything else # inventory (clears pubkeys after 28 days and everything else
# after 2 days and 12 hours) # after 2 days and 12 hours)
t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int( sqlExecute(
time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys) '''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''',
shared.sqlSubmitQueue.put( int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory,
'''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''') int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
# pubkeys # pubkeys
t = (int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys,) sqlExecute(
shared.sqlSubmitQueue.put( '''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''') int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
t = () queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent') ''') # If the message's folder='trash' then we'll ignore it. '''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent') ''') # If the message's folder='trash' then we'll ignore it.
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
for row in queryreturn: for row in queryreturn:
if len(row) < 5: if len(row) < 5:
with shared.printLock: with shared.printLock:
@ -96,27 +90,23 @@ class singleCleaner(threading.Thread):
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to request a public key...')) 'updateStatusBar', 'Doing work necessary to again attempt to request a public key...'))
t = (int( t = ()
time.time()), pubkeyretrynumber + 1, toripe) sqlExecute(
shared.sqlSubmitQueue.put( '''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''',
'''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''') int(time.time()),
shared.sqlSubmitQueue.put(t) pubkeyretrynumber + 1,
shared.sqlReturnQueue.get() toripe)
shared.sqlSubmitQueue.put('commit')
shared.workerQueue.put(('sendmessage', '')) shared.workerQueue.put(('sendmessage', ''))
else: # status == msgsent else: # status == msgsent
if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))):
print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.'
t = (int( sqlExecute(
time.time()), msgretrynumber + 1, 'msgqueued', ackdata) '''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''',
shared.sqlSubmitQueue.put( int(time.time()),
'''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''') msgretrynumber + 1,
shared.sqlSubmitQueue.put(t) 'msgqueued',
shared.sqlReturnQueue.get() ackdata)
shared.sqlSubmitQueue.put('commit')
shared.workerQueue.put(('sendmessage', '')) shared.workerQueue.put(('sendmessage', ''))
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
time.sleep(300) time.sleep(300)

View File

@ -10,6 +10,7 @@ import sys
from class_addressGenerator import pointMult from class_addressGenerator import pointMult
import tr import tr
from debug import logger from debug import logger
from helper_sql import *
# This thread, of which there is only one, does the heavy lifting: # This thread, of which there is only one, does the heavy lifting:
# calculating POWs. # calculating POWs.
@ -22,35 +23,23 @@ class singleWorker(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
def run(self): def run(self):
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT toripe FROM sent WHERE ((status='awaitingpubkey' OR status='doingpubkeypow') AND folder='sent')''') '''SELECT toripe FROM sent WHERE ((status='awaitingpubkey' OR status='doingpubkeypow') AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
toripe, = row toripe, = row
shared.neededPubkeys[toripe] = 0 shared.neededPubkeys[toripe] = 0
# Initialize the shared.ackdataForWhichImWatching data structure using data # Initialize the shared.ackdataForWhichImWatching data structure using data
# from the sql database. # from the sql database.
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
ackdata, = row ackdata, = row
print 'Watching for ackdata', ackdata.encode('hex') print 'Watching for ackdata', ackdata.encode('hex')
shared.ackdataForWhichImWatching[ackdata] = 0 shared.ackdataForWhichImWatching[ackdata] = 0
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''') '''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
toaddress, = row toaddress, = row
self.requestPubKey(toaddress) self.requestPubKey(toaddress)
@ -248,26 +237,19 @@ class singleWorker(threading.Thread):
if shared.safeConfigGetBoolean(myAddress, 'chan'): if shared.safeConfigGetBoolean(myAddress, 'chan'):
payload = '\x00' * 8 + payload # Attach a fake nonce on the front payload = '\x00' * 8 + payload # Attach a fake nonce on the front
# just so that it is in the correct format. # just so that it is in the correct format.
t = (hash,payload,embeddedTime,'yes') sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''',
shared.sqlLock.acquire() hash,
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') payload,
shared.sqlSubmitQueue.put(t) embeddedTime,
shared.sqlReturnQueue.get() 'yes')
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.config.set( shared.config.set(
myAddress, 'lastpubkeysendtime', str(int(time.time()))) myAddress, 'lastpubkeysendtime', str(int(time.time())))
with open(shared.appdata + 'keys.dat', 'wb') as configfile: with open(shared.appdata + 'keys.dat', 'wb') as configfile:
shared.config.write(configfile) shared.config.write(configfile)
def sendBroadcast(self): def sendBroadcast(self):
shared.sqlLock.acquire() queryreturn = sqlQuery(
t = ('broadcastqueued',) '''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued')
shared.sqlSubmitQueue.put(
'''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: for row in queryreturn:
fromaddress, subject, body, ackdata = row fromaddress, subject, body, ackdata = row
status, addressVersionNumber, streamNumber, ripe = decodeAddress( status, addressVersionNumber, streamNumber, ripe = decodeAddress(
@ -355,79 +337,49 @@ class singleWorker(threading.Thread):
# Update the status of the message in the 'sent' table to have # Update the status of the message in the 'sent' table to have
# a 'broadcastsent' status # a 'broadcastsent' status
shared.sqlLock.acquire() sqlExecute(
t = (inventoryHash,'broadcastsent', int( 'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?',
time.time()), ackdata) inventoryHash,
shared.sqlSubmitQueue.put( 'broadcastsent',
'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?') int(time.time()),
shared.sqlSubmitQueue.put(t) ackdata)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
def sendMsg(self): def sendMsg(self):
# Check to see if there are any messages queued to be sent # Check to see if there are any messages queued to be sent
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put(
'''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''') '''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already. for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already.
toaddress, = row toaddress, = row
toripe = decodeAddress(toaddress)[3] toripe = decodeAddress(toaddress)[3]
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put( '''SELECT hash FROM pubkeys WHERE hash=? ''', toripe)
'''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []: # If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down) if queryreturn != []: # If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down)
t = (toaddress,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''',
shared.sqlSubmitQueue.put( toaddress)
'''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
else: # We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already else: # We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already
if toripe in shared.neededPubkeys: if toripe in shared.neededPubkeys:
# We already sent a request for the pubkey # We already sent a request for the pubkey
t = (toaddress,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress)
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', ( shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.'))))
else: else:
# We have not yet sent a request for the pubkey # We have not yet sent a request for the pubkey
t = (toaddress,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''',
shared.sqlSubmitQueue.put( toaddress)
'''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', ( shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress) self.requestPubKey(toaddress)
shared.sqlLock.acquire()
# Get all messages that are ready to be sent, and also all messages # Get all messages that are ready to be sent, and also all messages
# which we have sent in the last 28 days which were previously marked # which we have sent in the last 28 days which were previously marked
# as 'toodifficult'. If the user as raised the maximum acceptable # as 'toodifficult'. If the user as raised the maximum acceptable
# difficulty then those messages may now be sendable. # difficulty then those messages may now be sendable.
shared.sqlSubmitQueue.put( queryreturn = sqlQuery(
'''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''') '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''',
shared.sqlSubmitQueue.put((int(time.time()) - 2419200,)) int(time.time()) - 2419200)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: # For each message we need to send.. for row in queryreturn: # For each message we need to send..
toaddress, toripe, fromaddress, subject, message, ackdata, status = row toaddress, toripe, fromaddress, subject, message, ackdata, status = row
# There is a remote possibility that we may no longer have the # There is a remote possibility that we may no longer have the
@ -436,12 +388,9 @@ class singleWorker(threading.Thread):
# user sends a message but doesn't let the POW function finish, # user sends a message but doesn't let the POW function finish,
# then leaves their client off for a long time which could cause # then leaves their client off for a long time which could cause
# the needed pubkey to expire and be deleted. # the needed pubkey to expire and be deleted.
shared.sqlLock.acquire() queryreturn = sqlQuery(
shared.sqlSubmitQueue.put( '''SELECT hash FROM pubkeys WHERE hash=? ''',
'''SELECT hash FROM pubkeys WHERE hash=? ''') toripe)
shared.sqlSubmitQueue.put((toripe,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == [] and toripe not in shared.neededPubkeys: if queryreturn == [] and toripe not in shared.neededPubkeys:
# We no longer have the needed pubkey and we haven't requested # We no longer have the needed pubkey and we haven't requested
# it. # it.
@ -449,14 +398,8 @@ class singleWorker(threading.Thread):
sys.stderr.write( sys.stderr.write(
'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex'))
t = (toaddress,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''', toaddress)
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', ( shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress) self.requestPubKey(toaddress)
@ -475,21 +418,15 @@ class singleWorker(threading.Thread):
# mark the pubkey as 'usedpersonally' so that we don't ever delete # mark the pubkey as 'usedpersonally' so that we don't ever delete
# it. # it.
shared.sqlLock.acquire() sqlExecute(
t = (toripe,) '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''',
shared.sqlSubmitQueue.put( toripe)
'''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
# Let us fetch the recipient's public key out of our database. If # Let us fetch the recipient's public key out of our database. If
# the required proof of work difficulty is too hard then we'll # the required proof of work difficulty is too hard then we'll
# abort. # abort.
shared.sqlSubmitQueue.put( queryreturn = sqlQuery(
'SELECT transmitdata FROM pubkeys WHERE hash=?') 'SELECT transmitdata FROM pubkeys WHERE hash=?',
shared.sqlSubmitQueue.put((toripe,)) toripe)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
with shared.printLock: with shared.printLock:
sys.stderr.write( sys.stderr.write(
@ -559,14 +496,9 @@ class singleWorker(threading.Thread):
if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0): if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0):
# The demanded difficulty is more than we are willing # The demanded difficulty is more than we are willing
# to do. # to do.
shared.sqlLock.acquire() sqlExecute(
t = (ackdata,) '''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''',
shared.sqlSubmitQueue.put( ackdata)
'''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(
requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
continue continue
@ -694,13 +626,7 @@ class singleWorker(threading.Thread):
try: try:
encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex')) encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex'))
except: except:
shared.sqlLock.acquire() sqlExecute('''UPDATE sent SET status='badkey' WHERE ackdata=?''', ackdata)
t = (ackdata,)
shared.sqlSubmitQueue.put('''UPDATE sent SET status='badkey' WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))))) shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8')))))
continue continue
encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted
@ -741,13 +667,8 @@ class singleWorker(threading.Thread):
newStatus = 'msgsentnoackexpected' newStatus = 'msgsentnoackexpected'
else: else:
newStatus = 'msgsent' newStatus = 'msgsent'
shared.sqlLock.acquire() sqlExecute('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''',
t = (inventoryHash,newStatus,ackdata,) inventoryHash,newStatus,ackdata)
shared.sqlSubmitQueue.put('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
def requestPubKey(self, toAddress): def requestPubKey(self, toAddress):
toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress(
@ -789,14 +710,9 @@ class singleWorker(threading.Thread):
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'sendinv', inventoryHash))
t = (toAddress,) sqlExecute(
shared.sqlLock.acquire() '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',
shared.sqlSubmitQueue.put( toAddress)
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.'))) 'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.')))

View File

@ -1,22 +1,9 @@
import shared from helper_sql import *
def insert(t): def insert(t):
shared.sqlLock.acquire() sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''', *t)
shared.sqlSubmitQueue.put(
'''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
def trash(msgid): def trash(msgid):
t = (msgid,) sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE inbox SET folder='trash' WHERE msgid=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid)) shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid))

View File

@ -1,11 +1,4 @@
import shared from helper_sql import *
def insert(t): def insert(t):
shared.sqlLock.acquire() sqlExecute('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''', *t)
shared.sqlSubmitQueue.put(
'''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()

66
src/helper_sql.py Normal file
View File

@ -0,0 +1,66 @@
import threading
import Queue
sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread.
sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock()
def sqlQuery(sqlStatement, *args):
sqlLock.acquire()
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
return queryreturn
def sqlExecute(sqlStatement, *args):
sqlLock.acquire()
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
sqlReturnQueue.get()
sqlSubmitQueue.put('commit')
sqlLock.release()
def sqlStoredProcedure(procName):
sqlLock.acquire()
sqlSubmitQueue.put(procName)
sqlLock.release()
class SqlBulkExecute:
def __enter__(self):
sqlLock.acquire()
return self
def __exit__(self, type, value, traceback):
sqlSubmitQueue.put('commit')
sqlLock.release()
def execute(self, sqlStatement, *args):
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
sqlReturnQueue.get()
def query(self, sqlStatement, *args):
sqlSubmitQueue.put(sqlStatement)
if args == ():
sqlSubmitQueue.put('')
else:
sqlSubmitQueue.put(args)
return sqlReturnQueue.get()

View File

@ -27,7 +27,7 @@ from addresses import *
import highlevelcrypto import highlevelcrypto
import shared import shared
import helper_startup import helper_startup
from helper_sql import *
config = ConfigParser.SafeConfigParser() config = ConfigParser.SafeConfigParser()
@ -36,9 +36,6 @@ MyECSubscriptionCryptorObjects = {}
myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself. myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself.
broadcastSendersForWhichImWatching = {} broadcastSendersForWhichImWatching = {}
workerQueue = Queue.Queue() workerQueue = Queue.Queue()
sqlSubmitQueue = Queue.Queue() #SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. SQL objects can only be called from one thread.
sqlReturnQueue = Queue.Queue()
sqlLock = threading.Lock()
UISignalQueue = Queue.Queue() UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue() addressGeneratorQueue = Queue.Queue()
knownNodesLock = threading.Lock() knownNodesLock = threading.Lock()
@ -85,12 +82,7 @@ namecoinDefaultRpcPort = "8336"
frozen = getattr(sys,'frozen', None) frozen = getattr(sys,'frozen', None)
def isInSqlInventory(hash): def isInSqlInventory(hash):
t = (hash,) queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []: if queryreturn == []:
return False return False
else: else:
@ -171,41 +163,29 @@ def lookupAppdataFolder():
return dataFolder return dataFolder
def isAddressInMyAddressBook(address): def isAddressInMyAddressBook(address):
t = (address,) queryreturn = sqlQuery(
sqlLock.acquire() '''select address from addressbook where address=?''',
sqlSubmitQueue.put('''select address from addressbook where address=?''') address)
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
return queryreturn != [] return queryreturn != []
#At this point we should really just have a isAddressInMy(book, address)... #At this point we should really just have a isAddressInMy(book, address)...
def isAddressInMySubscriptionsList(address): def isAddressInMySubscriptionsList(address):
t = (str(address),) # As opposed to Qt str queryreturn = sqlQuery(
sqlLock.acquire() '''select * from subscriptions where address=?''',
sqlSubmitQueue.put('''select * from subscriptions where address=?''') str(address))
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
return queryreturn != [] return queryreturn != []
def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address): def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address):
if isAddressInMyAddressBook(address): if isAddressInMyAddressBook(address):
return True return True
sqlLock.acquire() queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address)
sqlSubmitQueue.put('''SELECT address FROM whitelist where address=? and enabled = '1' ''')
sqlSubmitQueue.put((address,))
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn <> []: if queryreturn <> []:
return True return True
sqlLock.acquire() queryreturn = sqlQuery(
sqlSubmitQueue.put('''select address from subscriptions where address=? and enabled = '1' ''') '''select address from subscriptions where address=? and enabled = '1' ''',
sqlSubmitQueue.put((address,)) address)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn <> []: if queryreturn <> []:
return True return True
return False return False
@ -269,11 +249,7 @@ def reloadBroadcastSendersForWhichImWatching():
logger.debug('reloading subscriptions...') logger.debug('reloading subscriptions...')
broadcastSendersForWhichImWatching.clear() broadcastSendersForWhichImWatching.clear()
MyECSubscriptionCryptorObjects.clear() MyECSubscriptionCryptorObjects.clear()
sqlLock.acquire() queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1')
sqlSubmitQueue.put('SELECT address FROM subscriptions where enabled=1')
sqlSubmitQueue.put('')
queryreturn = sqlReturnQueue.get()
sqlLock.release()
for row in queryreturn: for row in queryreturn:
address, = row address, = row
status,addressVersionNumber,streamNumber,hash = decodeAddress(address) status,addressVersionNumber,streamNumber,hash = decodeAddress(address)
@ -307,12 +283,8 @@ def doCleanShutdown():
# This one last useless query will guarantee that the previous flush committed before we close # This one last useless query will guarantee that the previous flush committed before we close
# the program. # the program.
sqlLock.acquire() sqlQuery('SELECT address FROM subscriptions')
sqlSubmitQueue.put('SELECT address FROM subscriptions') sqlStoredProcedure('exit')
sqlSubmitQueue.put('')
sqlReturnQueue.get()
sqlSubmitQueue.put('exit')
sqlLock.release()
logger.info('Finished flushing inventory.') logger.info('Finished flushing inventory.')
# Wait long enough to guarantee that any running proof of work worker threads will check the # Wait long enough to guarantee that any running proof of work worker threads will check the
@ -333,16 +305,12 @@ def broadcastToSendDataQueues(data):
def flushInventory(): def flushInventory():
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
sqlLock.acquire() with SqlBulkExecute() as sql:
for hash, storedValue in inventory.items(): for hash, storedValue in inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
t = (hash,objectType,streamNumber,payload,receivedTime,'') sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''') hash,objectType,streamNumber,payload,receivedTime,'')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
del inventory[hash] del inventory[hash]
sqlSubmitQueue.put('commit')
sqlLock.release()
def fixPotentiallyInvalidUTF8Data(text): def fixPotentiallyInvalidUTF8Data(text):
try: try: