From 1fb11495a603aad93a63f0a9c3fed52663341679 Mon Sep 17 00:00:00 2001 From: "Grant T. Olson" Date: Thu, 29 Aug 2013 07:27:30 -0400 Subject: [PATCH] use helper_sql in class_singleWorker --- src/class_singleWorker.py | 186 +++++++++++--------------------------- 1 file changed, 51 insertions(+), 135 deletions(-) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index a3d0a0c5..d3c0c784 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -10,6 +10,7 @@ import sys from class_addressGenerator import pointMult import tr from debug import logger +from helper_sql import * # This thread, of which there is only one, does the heavy lifting: # calculating POWs. @@ -22,35 +23,23 @@ class singleWorker(threading.Thread): threading.Thread.__init__(self) def run(self): - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT toripe FROM sent WHERE ((status='awaitingpubkey' OR status='doingpubkeypow') AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: toripe, = row shared.neededPubkeys[toripe] = 0 # Initialize the shared.ackdataForWhichImWatching data structure using data # from the sql database. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: ackdata, = row print 'Watching for ackdata', ackdata.encode('hex') shared.ackdataForWhichImWatching[ackdata] = 0 - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: toaddress, = row self.requestPubKey(toaddress) @@ -248,26 +237,19 @@ class singleWorker(threading.Thread): if shared.safeConfigGetBoolean(myAddress, 'chan'): payload = '\x00' * 8 + payload # Attach a fake nonce on the front # just so that it is in the correct format. - t = (hash,payload,embeddedTime,'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', + hash, + payload, + embeddedTime, + 'yes') shared.config.set( myAddress, 'lastpubkeysendtime', str(int(time.time()))) with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) def sendBroadcast(self): - shared.sqlLock.acquire() - t = ('broadcastqueued',) - shared.sqlSubmitQueue.put( - '''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT fromaddress, subject, message, ackdata FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued') for row in queryreturn: fromaddress, subject, body, ackdata = row status, addressVersionNumber, streamNumber, ripe = decodeAddress( @@ -355,79 +337,49 @@ class singleWorker(threading.Thread): # Update the status of the message in the 'sent' table to have # a 'broadcastsent' status - shared.sqlLock.acquire() - t = (inventoryHash,'broadcastsent', int( - time.time()), ackdata) - shared.sqlSubmitQueue.put( - 'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + 'UPDATE sent SET msgid=?, status=?, lastactiontime=? WHERE ackdata=?', + inventoryHash, + 'broadcastsent', + int(time.time()), + ackdata) def sendMsg(self): # Check to see if there are any messages queued to be sent - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( + queryreturn = sqlQuery( '''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''') - shared.sqlSubmitQueue.put('') - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already. toaddress, = row toripe = decodeAddress(toaddress)[3] - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT hash FROM pubkeys WHERE hash=? ''') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM pubkeys WHERE hash=? ''', toripe) if queryreturn != []: # If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down) - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) else: # We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already if toripe in shared.neededPubkeys: # We already sent a request for the pubkey - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) else: # We have not yet sent a request for the pubkey - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) - shared.sqlLock.acquire() # Get all messages that are ready to be sent, and also all messages # which we have sent in the last 28 days which were previously marked # as 'toodifficult'. If the user as raised the maximum acceptable # difficulty then those messages may now be sendable. - shared.sqlSubmitQueue.put( - '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''') - shared.sqlSubmitQueue.put((int(time.time()) - 2419200,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''', + int(time.time()) - 2419200) for row in queryreturn: # For each message we need to send.. toaddress, toripe, fromaddress, subject, message, ackdata, status = row # There is a remote possibility that we may no longer have the @@ -436,12 +388,9 @@ class singleWorker(threading.Thread): # user sends a message but doesn't let the POW function finish, # then leaves their client off for a long time which could cause # the needed pubkey to expire and be deleted. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT hash FROM pubkeys WHERE hash=? ''') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM pubkeys WHERE hash=? ''', + toripe) if queryreturn == [] and toripe not in shared.neededPubkeys: # We no longer have the needed pubkey and we haven't requested # it. @@ -449,14 +398,8 @@ class singleWorker(threading.Thread): sys.stderr.write( 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) - t = (toaddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow' ''', toaddress) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) @@ -475,21 +418,15 @@ class singleWorker(threading.Thread): # mark the pubkey as 'usedpersonally' so that we don't ever delete # it. - shared.sqlLock.acquire() - t = (toripe,) - shared.sqlSubmitQueue.put( - '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') + sqlExecute( + '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''', + toripe) # Let us fetch the recipient's public key out of our database. If # the required proof of work difficulty is too hard then we'll # abort. - shared.sqlSubmitQueue.put( - 'SELECT transmitdata FROM pubkeys WHERE hash=?') - shared.sqlSubmitQueue.put((toripe,)) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + 'SELECT transmitdata FROM pubkeys WHERE hash=?', + toripe) if queryreturn == []: with shared.printLock: sys.stderr.write( @@ -559,14 +496,9 @@ class singleWorker(threading.Thread): if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0): # The demanded difficulty is more than we are willing # to do. - shared.sqlLock.acquire() - t = (ackdata,) - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='toodifficult' WHERE ackdata=? ''', + ackdata) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float( requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) continue @@ -694,13 +626,7 @@ class singleWorker(threading.Thread): try: encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex')) except: - shared.sqlLock.acquire() - t = (ackdata,) - shared.sqlSubmitQueue.put('''UPDATE sent SET status='badkey' WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET status='badkey' WHERE ackdata=?''', ackdata) shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))))) continue encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted @@ -741,13 +667,8 @@ class singleWorker(threading.Thread): newStatus = 'msgsentnoackexpected' else: newStatus = 'msgsent' - shared.sqlLock.acquire() - t = (inventoryHash,newStatus,ackdata,) - shared.sqlSubmitQueue.put('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''UPDATE sent SET msgid=?, status=? WHERE ackdata=?''', + inventoryHash,newStatus,ackdata) def requestPubKey(self, toAddress): toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( @@ -789,14 +710,9 @@ class singleWorker(threading.Thread): shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) - t = (toAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''', + toAddress) shared.UISignalQueue.put(( 'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.')))