Check to see whether we are awaiting a new pubkey within the receiveData thread not the workerThread

This commit is contained in:
Jonathan Warren 2013-06-10 23:43:06 -04:00
parent 7b508884e3
commit 7f19ac82d0

View File

@ -668,7 +668,8 @@ class receiveDataThread(threading.Thread):
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. #shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it.
self.possibleNewPubkey(ripe.digest())
fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest()) fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest())
shared.printLock.acquire() shared.printLock.acquire()
@ -802,7 +803,8 @@ class receiveDataThread(threading.Thread):
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. #shared.workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it.
self.possibleNewPubkey(ripe.digest())
fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest()) fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest())
shared.printLock.acquire() shared.printLock.acquire()
@ -1051,7 +1053,8 @@ class receiveDataThread(threading.Thread):
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. #shared.workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it.
self.possibleNewPubkey(ripe.digest())
fromAddress = encodeAddress(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()) fromAddress = encodeAddress(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest())
#If this message is bound for one of my version 3 addresses (or higher), then we must check to make sure it meets our demanded proof of work requirement. #If this message is bound for one of my version 3 addresses (or higher), then we must check to make sure it meets our demanded proof of work requirement.
if decodeAddress(toAddress)[1] >= 3:#If the toAddress version number is 3 or higher: if decodeAddress(toAddress)[1] >= 3:#If the toAddress version number is 3 or higher:
@ -1192,6 +1195,23 @@ class receiveDataThread(threading.Thread):
else: else:
return '['+mailingListName+'] ' + subject return '['+mailingListName+'] ' + subject
def possiblyNewPubkey(self,toRipe):
if toRipe in neededPubkeys:
print 'We have been awaiting the arrival of this pubkey.'
del neededPubkeys[toRipe]
t = (toRipe,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.workerQueue.put(('sendmessage',''))
else:
shared.printLock.acquire()
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
shared.printLock.release()
#We have received a pubkey #We have received a pubkey
def recpubkey(self,data): def recpubkey(self,data):
self.pubkeyProcessingStartTime = time.time() self.pubkeyProcessingStartTime = time.time()
@ -1318,23 +1338,17 @@ class receiveDataThread(threading.Thread):
if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: if queryreturn != []: #if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.' print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe,data,embeddedTime,'yes') t = (ripe,data,embeddedTime,'yes')
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
else: else:
print 'We have NOT used this pubkey personally. Inserting in database.' print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime. t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime.
shared.sqlLock.acquire() shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t) shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) #shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self.possibleNewPubkey(ripe)
if addressVersion == 3: if addressVersion == 3:
if len(data) < 170: #sanity check. if len(data) < 170: #sanity check.
print '(within processpubkey) payloadLength less than 170. Sanity check failed.' print '(within processpubkey) payloadLength less than 170. Sanity check failed.'
@ -1385,22 +1399,17 @@ class receiveDataThread(threading.Thread):
if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: if queryreturn != []: #if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.' print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe,data,embeddedTime,'yes') t = (ripe,data,embeddedTime,'yes')
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
else: else:
print 'We have NOT used this pubkey personally. Inserting in database.' print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime. t = (ripe,data,embeddedTime,'no') #This will also update the embeddedTime.
shared.sqlLock.acquire() shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') shared.sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
shared.sqlSubmitQueue.put(t) shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) #shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self.possibleNewPubkey(ripe)
#We have received a getpubkey message #We have received a getpubkey message
@ -2666,23 +2675,23 @@ class singleWorker(threading.Thread):
self.doPOWForMyV2Pubkey(data) self.doPOWForMyV2Pubkey(data)
elif command == 'doPOWForMyV3Pubkey': elif command == 'doPOWForMyV3Pubkey':
self.doPOWForMyV3Pubkey(data) self.doPOWForMyV3Pubkey(data)
elif command == 'newpubkey': """elif command == 'newpubkey':
toAddressVersion,toStreamNumber,toRipe = data toAddressVersion,toStreamNumber,toRipe = data
if toRipe in neededPubkeys: if toRipe in neededPubkeys:
print 'We have been awaiting the arrival of this pubkey.' print 'We have been awaiting the arrival of this pubkey.'
del neededPubkeys[toRipe] del neededPubkeys[toRipe]
t = (toRipe,) t = (toRipe,)
shared.sqlLock.acquire() shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''') shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND status='awaitingpubkey' and folder='sent' ''')
shared.sqlSubmitQueue.put(t) shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release() shared.sqlLock.release()
self.sendMsg() self.sendMsg()
else: else:
shared.printLock.acquire() shared.printLock.acquire()
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
shared.printLock.release() shared.printLock.release()"""
else: else:
shared.printLock.acquire() shared.printLock.acquire()
sys.stderr.write('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) sys.stderr.write('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)