diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index b5aa5893..d04da7a6 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -19,6 +19,7 @@ import helper_generic import helper_bitcoin import helper_inbox import helper_sent +from helper_sql import * import tr #from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys @@ -281,16 +282,13 @@ class receiveDataThread(threading.Thread): self.sendBigInv() def sendBigInv(self): - shared.sqlLock.acquire() # Select all hashes which are younger than two days old and in this # stream. - t = (int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, int( - time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber) - shared.sqlSubmitQueue.put( - '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''', + int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, + int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, + self.streamNumber) bigInvList = {} for row in queryreturn: hash, = row @@ -507,15 +505,12 @@ class receiveDataThread(threading.Thread): # won't be able to send this pubkey to others (without doing # the proof of work ourselves, which this program is programmed # to not do.) - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[ - beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[beginningOfPubkeyPosition:endOfPubkeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the @@ -657,15 +652,11 @@ class receiveDataThread(threading.Thread): # Let's store the public key in case we want to reply to this # person. - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ - beginningOfPubkeyPosition:endOfPubkeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[beginningOfPubkeyPosition:endOfPubkeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the POW @@ -802,14 +793,8 @@ class receiveDataThread(threading.Thread): print 'This msg IS an acknowledgement bound for me.' del shared.ackdataForWhichImWatching[encryptedData[readPosition:]] - t = ('ackreceived', encryptedData[readPosition:]) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - 'UPDATE sent SET status=? WHERE ackdata=?') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('UPDATE sent SET status=? WHERE ackdata=?', + 'ackreceived', encryptedData[readPosition:]) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode( time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8'))))) return @@ -932,15 +917,12 @@ class receiveDataThread(threading.Thread): ripe.update(sha.digest()) # Let's store the public key in case we want to reply to this # person. - t = (ripe.digest(), '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[ - messageVersionLength:endOfThePublicKeyPosition], int(time.time()), 'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''INSERT INTO pubkeys VALUES (?,?,?,?)''', + ripe.digest(), + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionLength:endOfThePublicKeyPosition], + int(time.time()), + 'yes') # shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) # This will check to see whether we happen to be awaiting this # pubkey in order to send a message. If we are, it will do the POW @@ -962,26 +944,18 @@ class receiveDataThread(threading.Thread): return blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists. if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT label FROM blacklist where address=? and enabled='1' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT label FROM blacklist where address=? and enabled='1' ''', + fromAddress) if queryreturn != []: with shared.printLock: print 'Message ignored because address is in blacklist.' blockMessage = True else: # We're using a whitelist - t = (fromAddress,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT label FROM whitelist where address=? and enabled='1' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT label FROM whitelist where address=? and enabled='1' ''', + toAddress) if queryreturn == []: print 'Message ignored because address not in whitelist.' blockMessage = True @@ -1111,14 +1085,9 @@ class receiveDataThread(threading.Thread): if toRipe in shared.neededPubkeys: print 'We have been awaiting the arrival of this pubkey.' del shared.neededPubkeys[toRipe] - t = (toRipe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''', + toRipe) shared.workerQueue.put(('sendmessage', '')) else: with shared.printLock: @@ -1254,13 +1223,8 @@ class receiveDataThread(threading.Thread): print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - t = (ripe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe) if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe, data, embeddedTime, 'yes') @@ -1268,13 +1232,7 @@ class receiveDataThread(threading.Thread): print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe, data, embeddedTime, 'no') # This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) self.possibleNewPubkey(ripe) if addressVersion == 3: @@ -1323,13 +1281,7 @@ class receiveDataThread(threading.Thread): print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - t = (ripe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''', ripe) if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe, data, embeddedTime, 'yes') @@ -1337,13 +1289,7 @@ class receiveDataThread(threading.Thread): print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe, data, embeddedTime, 'no') # This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?)''', *t) # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) self.possibleNewPubkey(ripe) @@ -1540,13 +1486,9 @@ class receiveDataThread(threading.Thread): hash] self.sendData(objectType, payload) else: - t = (hash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put( - '''select objecttype, payload from inventory where hash=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() + queryreturn = sqlQuery( + '''select objecttype, payload from inventory where hash=?''', + hash) if queryreturn != []: for row in queryreturn: objectType, payload = row