From 7f19ac82d09a55a286299c0dd736c51daba7ef55 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Mon, 10 Jun 2013 23:43:06 -0400 Subject: [PATCH] Check to see whether we are awaiting a new pubkey within the receiveData thread not the workerThread --- src/bitmessagemain.py | 103 +++++++++++++++++++++++------------------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 0c85f339..dbeac1cd 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -668,7 +668,8 @@ class receiveDataThread(threading.Thread): shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + #shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + self.possibleNewPubkey(ripe.digest()) fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest()) shared.printLock.acquire() @@ -802,7 +803,8 @@ class receiveDataThread(threading.Thread): shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + #shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + self.possibleNewPubkey(ripe.digest()) fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest()) shared.printLock.acquire() @@ -1051,7 +1053,8 @@ class receiveDataThread(threading.Thread): shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + #shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. + self.possibleNewPubkey(ripe.digest()) fromAddress = encodeAddress(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()) #If this message is bound for one of my version 3 addresses (or higher), then we must check to make sure it meets our demanded proof of work requirement. if decodeAddress(toAddress)[1] >= 3:#If the toAddress version number is 3 or higher: @@ -1192,6 +1195,23 @@ class receiveDataThread(threading.Thread): else: return '['+mailingListName+'] ' + subject + def possiblyNewPubkey(self,toRipe): + if toRipe in neededPubkeys: + print 'We have been awaiting the arrival of this pubkey.' + del neededPubkeys[toRipe] + t = (toRipe,) + shared.sqlLock.acquire() + shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.sqlLock.release() + shared.workerQueue.put(('sendmessage','')) + else: + shared.printLock.acquire() + print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') + shared.printLock.release() + #We have received a pubkey def recpubkey(self,data): self.pubkeyProcessingStartTime = time.time() @@ -1318,23 +1338,17 @@ class receiveDataThread(threading.Thread): if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe,data,embeddedTime,'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) else: print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) + shared.sqlLock.acquire() + shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.sqlLock.release() + #shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) + self.possibleNewPubkey(ripe) if addressVersion == 3: if len(data) < 170: #sanity check. print '(within processpubkey) payloadLength less than 170. Sanity check failed.' @@ -1385,22 +1399,17 @@ class receiveDataThread(threading.Thread): if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' t = (ripe,data,embeddedTime,'yes') - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() else: print 'We have NOT used this pubkey personally. Inserting in database.' t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime. - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) + shared.sqlLock.acquire() + shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.sqlLock.release() + #shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) + self.possibleNewPubkey(ripe) #We have received a getpubkey message @@ -2666,23 +2675,23 @@ class singleWorker(threading.Thread): self.doPOWForMyV2Pubkey(data) elif command == 'doPOWForMyV3Pubkey': self.doPOWForMyV3Pubkey(data) - elif command == 'newpubkey': - toAddressVersion,toStreamNumber,toRipe = data - if toRipe in neededPubkeys: - print 'We have been awaiting the arrival of this pubkey.' - del neededPubkeys[toRipe] - t = (toRipe,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''') - shared.sqlSubmitQueue.put(t) - shared.sqlReturnQueue.get() - shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - self.sendMsg() - else: - shared.printLock.acquire() - print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') - shared.printLock.release() + """elif command == 'newpubkey': + toAddressVersion,toStreamNumber,toRipe = data + if toRipe in neededPubkeys: + print 'We have been awaiting the arrival of this pubkey.' + del neededPubkeys[toRipe] + t = (toRipe,) + shared.sqlLock.acquire() + shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''') + shared.sqlSubmitQueue.put(t) + shared.sqlReturnQueue.get() + shared.sqlSubmitQueue.put('commit') + shared.sqlLock.release() + self.sendMsg() + else: + shared.printLock.acquire() + print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') + shared.printLock.release()""" else: shared.printLock.acquire() sys.stderr.write('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)