@ -321,101 +320,104 @@ class receiveDataThread(threading.Thread):
#print 'self.data is currently ', repr(self.data)
#shared.printLock.release()
iflen(self.data)<20:#if so little of the data has arrived that we can't even unpack the payload length
pass
elif self.data[0:4]!='\xe9\xbe\xb4\xd9':
return
if self.data[0:4]!='\xe9\xbe\xb4\xd9':
ifverbose>=1:
shared.printLock.acquire()
sys.stderr.write('The magic bytes were not correct. First 40 bytes of data: %s\n'%repr(self.data[0:40]))
print'self.data:',self.data.encode('hex')
shared.printLock.release()
self.data=""
else:
self.payloadLength,=unpack('>L',self.data[16:20])
iflen(self.data)>=self.payloadLength+24:#check if the whole message has arrived yet. If it has,...
ifself.data[20:24]==hashlib.sha512(self.data[24:self.payloadLength+24]).digest()[0:4]:#test the checksum in the message. If it is correct...
#print 'message checksum is correct'
#The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance.
ifself.initiatedConnectionandself.connectionIsOrWasFullyEstablished:#The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
ifself.payloadLength<=180000000:#If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
remoteCommand=self.data[4:16]
return
self.payloadLength,=unpack('>L',self.data[16:20])
iflen(self.data)<self.payloadLength+24:#check if the whole message has arrived yet.
return
ifself.data[20:24]!=hashlib.sha512(self.data[24:self.payloadLength+24]).digest()[0:4]:#test the checksum in the message. If it is correct...
print'Checksum incorrect. Clearing this message.'
self.data=self.data[self.payloadLength+24:]
self.processData()
return
#The time we've last seen this node is obviously right now since we just received valid data from it. So update the knownNodes list so that other peers can be made aware of its existance.
ifself.initiatedConnectionandself.connectionIsOrWasFullyEstablished:#The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
ifself.payloadLength<=180000000:#If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
delself.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash]#It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
delnumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
delnumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]=len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
iflen(self.ackDataThatWeHaveYetToSend)>0:
self.data=self.ackDataThatWeHaveYetToSend.pop()
self.processData()
self.data=self.data[self.payloadLength+24:]#take this message out and then process the next message
delself.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash]#It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
delnumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
delnumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
print'(concerning',self.HOST+')','number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now',len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST]=len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)#this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
@ -941,8 +943,7 @@ class receiveDataThread(threading.Thread):
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
#self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.')
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(encryptedData[readPosition:],'Acknowledgement of the message received just now.')))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(encryptedData[readPosition:],'Acknowledgement of the message received just now. '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(int(time.time()))),'utf-8'))))
return
else:
shared.printLock.acquire()
@ -1152,7 +1153,7 @@ class receiveDataThread(threading.Thread):
print'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:',len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:',len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
@ -2562,7 +2569,7 @@ class singleCleaner(threading.Thread):
shared.sqlSubmitQueue.put('commit')
t=()
shared.sqlSubmitQueue.put('''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='findingpubkey' OR status='sentmessage') AND folder='sent') ''')#If the message's folder='trash' then we'll ignore it.
shared.sqlSubmitQueue.put('''select toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent') ''')#If the message's folder='trash' then we'll ignore it.
shared.sqlSubmitQueue.put(t)
queryreturn=shared.sqlReturnQueue.get()
forrowinqueryreturn:
@ -2573,28 +2580,31 @@ class singleCleaner(threading.Thread):
print'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.'
try:
delneededPubkeys[toripe]#We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently.
except:
pass
shared.workerQueue.put(('sendmessage',toaddress))
#self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing work necessary to again attempt to request a public key...")
shared.UISignalQueue.put(('updateStatusBar','Doing work necessary to again attempt to request a public key...'))
t=(int(time.time()),pubkeyretrynumber+1,toripe)
shared.sqlSubmitQueue.put('''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=? WHERE toripe=?''')
shared.sqlSubmitQueue.put('''UPDATE sent SET lastactiontime=?, pubkeyretrynumber=?, status='msgqueued' WHERE toripe=?''')
shared.sqlSubmitQueue.put('''UPDATE sent SET lastactiontime=?, msgretrynumber=?, status=? WHERE ackdata=?''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.workerQueue.put(('sendmessage',toaddress))
shared.sqlSubmitQueue.put('commit')
shared.workerQueue.put(('sendmessage',''))
#self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing work necessary to again attempt to deliver a message...")
shared.UISignalQueue.put(('updateStatusBar','Doing work necessary to again attempt to deliver a message...'))
shared.sqlSubmitQueue.put('commit')
@ -2608,85 +2618,34 @@ class singleWorker(threading.Thread):
threading.Thread.__init__(self)
defrun(self):
time.sleep(10)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT toripe FROM sent WHERE (status=? AND folder='sent')''')
shared.sqlSubmitQueue.put(('findingpubkey',))
shared.sqlSubmitQueue.put('''SELECT toripe FROM sent WHERE (status='awaitingpubkey' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
forrowinqueryreturn:
toripe,=row
#It is possible for the status of a message in our sent folder (which is also our 'outbox' folder) to have a status of 'findingpubkey' even if we have the pubkey. This can
#happen if the worker thread is working on the POW for an earlier message and does not get to the message in question before the user closes Bitmessage. In this case, the
#status will still be 'findingpubkey' but Bitmessage will never have checked to see whether it actually already has the pubkey. We should therefore check here.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
ifqueryreturn!=[]:#If we have the pubkey then send the message otherwise put the hash in the neededPubkeys data structure so that we will pay attention to it if it comes over the wire.
self.sendMsg(toripe)
else:
neededPubkeys[toripe]=0
neededPubkeys[toripe]=0
self.sendBroadcast()#just in case there are any proof of work tasks for Broadcasts that have yet to be sent.
#Now let us see if there are any proofs of work for msg messages that we have yet to complete..
shared.sqlLock.acquire()
t=('doingpow',)
shared.sqlSubmitQueue.put('''SELECT toripe FROM sent WHERE status=? and folder='sent'''')
shared.sqlSubmitQueue.put(t)
shared.sqlSubmitQueue.put('''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
forrowinqueryreturn:
toripe,=row
#Evidentially there is a remote possibility that we may, for some reason, no longer have the recipient's pubkey. Let us make sure we still have it or else the sendMsg function will appear to freeze.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
ifqueryreturn!=[]:
#We have the needed pubkey
self.sendMsg(toripe)
else:
shared.printLock.acquire()
sys.stderr.write('For some reason, the status of a message in our outbox is \'doingpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n'%toripe.encode('hex'))
shared.printLock.release()
toaddress,=row
self.requestPubKey(toaddress)
time.sleep(10)#give some time for the GUI to start before we start on any existing POW tasks.
self.sendMsg()#just in case there are any pending tasks for msg messages that have yet to be sent.
self.sendBroadcast()#just in case there are any tasks for Broadcasts that have yet to be sent.
whileTrue:
command,data=shared.workerQueue.get()
#statusbar = 'The singleWorker thread is working on work.'
print'We have already requested this pubkey (the ripe hash is in neededPubkeys). We will re-request again soon.'
#self.emit(SIGNAL("updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"),toRipe,'Public key was requested earlier. Receiver must be offline. Will retry.')
shared.UISignalQueue.put(('updateSentItemStatusByHash',(toRipe,'Public key was requested earlier. Receiver must be offline. Will retry.')))
else:
print'We already have the necessary public key.'
self.sendMsg(toRipe)#by calling this function, we are asserting that we already have the pubkey for toRipe
#self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent on '+unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
@ -2992,25 +2957,83 @@ class singleWorker(threading.Thread):
sys.stderr.write('Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n')
shared.printLock.release()
defsendMsg(self,toRipe):
defsendMsg(self):
#Check to see if there are any messages queued to be sent
shared.sqlLock.acquire()
t=('doingpow','findingpubkey',toRipe)
shared.sqlSubmitQueue.put('''UPDATE sent SET status=? WHERE status=? AND toripe=? and folder='sent'''')
shared.sqlSubmitQueue.put(t)
shared.sqlSubmitQueue.put('''SELECT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''')
shared.sqlSubmitQueue.put('')
queryreturn=shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
t=('doingpow',toRipe)
shared.sqlSubmitQueue.put('''SELECT toaddress, fromaddress, subject, message, ackdata FROM sent WHERE status=? AND toripe=? and folder='sent'''')
shared.sqlSubmitQueue.put(t)
shared.sqlLock.release()
forrowinqueryreturn:#For each address to which we need to send a message, check to see if we have its pubkey already.
toaddress,=row
toripe=decodeAddress(toaddress)[3]
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
ifqueryreturn!=[]:#If we have the needed pubkey, set the status to doingmsgpow (we'll do it further down)
t=(toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued'''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
else:#We don't have the needed pubkey. Set the status to 'awaitingpubkey' and request it if we haven't already
iftoripeinneededPubkeys:
#We already sent a request for the pubkey
t=(toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued'''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash',(toripe,'Encryption key was requested earlier.')))
else:
#We have not yet sent a request for the pubkey
t=(toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued'''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash',(toripe,'Sending a request for the recipient\'s encryption key.')))
self.requestPubKey(toaddress)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT toaddress, toripe, fromaddress, subject, message, ackdata FROM sent WHERE status='doingmsgpow' and folder='sent'''')
#Evidently there is a remote possibility that we may no longer have the recipient's pubkey. Let us make sure we still have it or else the sendMsg function will appear to freeze.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''SELECT hash FROM pubkeys WHERE hash=? ''')
shared.sqlSubmitQueue.put((toripe,))
queryreturn=shared.sqlReturnQueue.get()
shared.sqlLock.release()
ifqueryreturn==[]:
#We no longer have the needed pubkey
shared.printLock.acquire()
sys.stderr.write('For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n'%toripe.encode('hex'))
shared.printLock.release()
t=(toaddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='msgqueued' WHERE toaddress=? AND status='doingmsgpow'''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash',(toripe,'Sending a request for the recipient\'s encryption key.')))
#self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Message sent. Waiting on acknowledgement. Sent on ' + unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,'Message sent. Waiting on acknowledgement. Sent on '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(int(time.time()))),'utf-8'))))
print'sending inv (within sendmsg function)'
print'Broadcasting inv for my msg(within sendmsg function):',inventoryHash.encode('hex')
#self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),'Broacasting the public key request. This program will auto-retry if they are offline.')
t=(toAddress,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow'''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateStatusBar','Broacasting the public key request. This program will auto-retry if they are offline.'))
#self.emit(SIGNAL("updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"),ripe,'Sending public key request. Waiting for reply. Requested at ' + unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
shared.UISignalQueue.put(('updateSentItemStatusByHash',(ripe,'Sending public key request. Waiting for reply. Requested at '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(int(time.time()))),'utf-8'))))
newItem=myTableWidgetItem('Message sent. Waiting on acknowledgement. Sent at '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(lastactiontime)),'utf-8'))
elifstatus=='doingpow':
elifstatus=='doingmsgpow':
newItem=myTableWidgetItem('Need to do work to send message. Work is queued.')
elifstatus=='ackreceived':
newItem=myTableWidgetItem('Acknowledgement of the message received '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(int(lastactiontime))),'utf-8'))
elifstatus=='broadcastpending':
newItem=myTableWidgetItem('Doing the work necessary to send broadcast...')
elifstatus=='broadcastqueued':
newItem=myTableWidgetItem('Broadcast queued.')
elifstatus=='broadcastsent':
newItem=myTableWidgetItem('Broadcast on '+unicode(strftime(shared.config.get('bitmessagesettings','timeformat'),localtime(int(lastactiontime))),'utf-8'))
else:
@ -770,6 +774,7 @@ class MyForm(QtGui.QMainWindow):
defclick_actionDeleteAllTrashedMessages(self):
ifQtGui.QMessageBox.question(self,'Delete trash?',"Are you sure you want to delete all trashed messages?",QtGui.QMessageBox.Yes,QtGui.QMessageBox.No)==QtGui.QMessageBox.No:
return
self.statusBar().showMessage('Deleting messages and freeing empty space...')
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''delete from inbox where folder='trash'''')
shared.sqlSubmitQueue.put('')
@ -781,6 +786,7 @@ class MyForm(QtGui.QMainWindow):
@ -1104,7 +1110,7 @@ class MyForm(QtGui.QMainWindow):
self.statusBar().showMessage('Warning: You are currently not connected. Bitmessage will do the work necessary to send the message but it won\'t send until you connect.')