use helper_sql in class_singleWorker

This commit is contained in:
Grant T. Olson 2013-08-29 07:27:30 -04:00
parent d879e35e26
commit 1fb11495a6
1 changed files with 51 additions and 135 deletions

View File

@ -10,6 +10,7 @@ import sys
from class_addressGenerator import pointMult
import tr
from debug import logger
from helper_sql import *
# This thread, of which there is only one, does the heavy lifting:
# calculating POWs.
@ -22,35 +23,23 @@ class singleWorker(threading.Thread):
threading.Thread.__init__(self)
def run(self):
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
queryreturn = sqlQuery(
'''SELECT toripe FROM sent WHERE ((status='awaitingpubkey' OR status='doingpubkeypow') AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn:
toripe, = row
shared.neededPubkeys[toripe] = 0
# Initialize the shared.ackdataForWhichImWatching data structure using data
# from the sql database.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
queryreturn = sqlQuery(
'''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn:
ackdata, = row
print 'Watching for ackdata', ackdata.encode('hex')
shared.ackdataForWhichImWatching[ackdata] = 0
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
queryreturn = sqlQuery(
'''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn:
toaddress, = row
self.requestPubKey(toaddress)
@ -248,26 +237,19 @@ class singleWorker(threading.Thread):
if shared.safeConfigGetBoolean(myAddress, 'chan'):
payload = '\x00' * 8 + payload # Attach a fake nonce on the front
# just so that it is in the correct format.
t = (hash,payload,embeddedTime,'yes')
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''',
hash,
payload,
embeddedTime,
'yes')
shared.config.set(
myAddress, 'lastpubkeysendtime', str(int(time.time())))
with open(shared.appdata + 'keys.dat', 'wb') as configfile:
shared.config.write(configfile)
def sendBroadcast(self):
shared.sqlLock.acquire()
t = ('broadcastqueued',)
shared.sqlSubmitQueue.put(
'''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
queryreturn = sqlQuery(
'''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued')
for row in queryreturn:
fromaddress, subject, body, ackdata = row
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
@ -355,79 +337,49 @@ class singleWorker(threading.Thread):
# Update the status of the message in the 'sent' table to have
# a 'broadcastsent' status
shared.sqlLock.acquire()
t = (inventoryHash,'broadcastsent', int(
time.time()), ackdata)
shared.sqlSubmitQueue.put(
'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?',
inventoryHash,
'broadcastsent',
int(time.time()),
ackdata)
def sendMsg(self):
# Check to see if there are any messages queued to be sent
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
queryreturn = sqlQuery(
'''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already.
toaddress, = row
toripe = decodeAddress(toaddress)[3]
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
queryreturn = sqlQuery(
'''SELECT hash FROM pubkeys WHERE hash=? ''', toripe)
if queryreturn != []: # If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down)
t = (toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''',
toaddress)
else: # We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already
if toripe in shared.neededPubkeys:
# We already sent a request for the pubkey
t = (toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress)
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.'))))
else:
# We have not yet sent a request for the pubkey
t = (toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''',
toaddress)
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress)
shared.sqlLock.acquire()
# Get all messages that are ready to be sent, and also all messages
# which we have sent in the last 28 days which were previously marked
# as 'toodifficult'. If the user as raised the maximum acceptable
# difficulty then those messages may now be sendable.
shared.sqlSubmitQueue.put(
'''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''')
shared.sqlSubmitQueue.put((int(time.time()) - 2419200,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
queryreturn = sqlQuery(
'''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''',
int(time.time()) - 2419200)
for row in queryreturn: # For each message we need to send..
toaddress, toripe, fromaddress, subject, message, ackdata, status = row
# There is a remote possibility that we may no longer have the
@ -436,12 +388,9 @@ class singleWorker(threading.Thread):
# user sends a message but doesn't let the POW function finish,
# then leaves their client off for a long time which could cause
# the needed pubkey to expire and be deleted.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
queryreturn = sqlQuery(
'''SELECT hash FROM pubkeys WHERE hash=? ''',
toripe)
if queryreturn == [] and toripe not in shared.neededPubkeys:
# We no longer have the needed pubkey and we haven't requested
# it.
@ -449,14 +398,8 @@ class singleWorker(threading.Thread):
sys.stderr.write(
'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex'))
t = (toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''', toaddress)
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress)
@ -475,21 +418,15 @@ class singleWorker(threading.Thread):
# mark the pubkey as 'usedpersonally' so that we don't ever delete
# it.
shared.sqlLock.acquire()
t = (toripe,)
shared.sqlSubmitQueue.put(
'''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
sqlExecute(
'''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''',
toripe)
# Let us fetch the recipient's public key out of our database. If
# the required proof of work difficulty is too hard then we'll
# abort.
shared.sqlSubmitQueue.put(
'SELECT transmitdata FROM pubkeys WHERE hash=?')
shared.sqlSubmitQueue.put((toripe,))
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
queryreturn = sqlQuery(
'SELECT transmitdata FROM pubkeys WHERE hash=?',
toripe)
if queryreturn == []:
with shared.printLock:
sys.stderr.write(
@ -559,14 +496,9 @@ class singleWorker(threading.Thread):
if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0):
# The demanded difficulty is more than we are willing
# to do.
shared.sqlLock.acquire()
t = (ackdata,)
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''',
ackdata)
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(
requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
continue
@ -694,13 +626,7 @@ class singleWorker(threading.Thread):
try:
encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex'))
except:
shared.sqlLock.acquire()
t = (ackdata,)
shared.sqlSubmitQueue.put('''UPDATE sent SET status='badkey' WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute('''UPDATE sent SET status='badkey' WHERE ackdata=?''', ackdata)
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8')))))
continue
encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted
@ -741,13 +667,8 @@ class singleWorker(threading.Thread):
newStatus = 'msgsentnoackexpected'
else:
newStatus = 'msgsent'
shared.sqlLock.acquire()
t = (inventoryHash,newStatus,ackdata,)
shared.sqlSubmitQueue.put('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''',
inventoryHash,newStatus,ackdata)
def requestPubKey(self, toAddress):
toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress(
@ -789,14 +710,9 @@ class singleWorker(threading.Thread):
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
t = (toAddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
sqlExecute(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',
toAddress)
shared.UISignalQueue.put((
'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.')))