From ce8de60d962c10f7f5a8908d93f463deee3775d9 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Thu, 28 Mar 2013 17:56:20 -0400 Subject: [PATCH] Better deserialization --- bitmessagemain.py | 731 +++++++++++-------------------------------- defaultKnownNodes.py | 2 + 2 files changed, 183 insertions(+), 550 deletions(-) diff --git a/bitmessagemain.py b/bitmessagemain.py index bc64ce0a..978bfaf4 100755 --- a/bitmessagemain.py +++ b/bitmessagemain.py @@ -7,7 +7,7 @@ #Right now, PyBitmessage only support connecting to stream 1. It doesn't yet contain logic to expand into further streams. softwareVersion = '0.2.7' -verbose = 2 +verbose = 1 maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 #Equals two days and 12 hours. lengthOfTimeToLeaveObjectsInInventory = 237600 #Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice. lengthOfTimeToHoldOnToAllPubkeys = 2419200 #Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time. @@ -40,8 +40,6 @@ from defaultKnownNodes import * import time import socket import threading -#import rsa -#from rsa.bigfile import * import hashlib from struct import * import pickle @@ -255,7 +253,7 @@ class receiveDataThread(QThread): while True: try: - self.data = self.data + self.sock.recv(65536) + self.data += self.sock.recv(65536) except socket.timeout: printLock.acquire() print 'Timeout occurred waiting for data. Closing receiveData thread.' @@ -305,14 +303,14 @@ class receiveDataThread(QThread): def processData(self): global verbose - #if verbose >= 2: + #if verbose >= 3: #printLock.acquire() #print 'self.data is currently ', repr(self.data) #printLock.release() if len(self.data) < 20: #if so little of the data has arrived that we can't even unpack the payload length pass elif self.data[0:4] != '\xe9\xbe\xb4\xd9': - if verbose >= 2: + if verbose >= 1: printLock.acquire() sys.stderr.write('The magic bytes were not correct. First 40 bytes of data: %s\n' % repr(self.data[0:40])) print 'self.data:', self.data.encode('hex') @@ -332,25 +330,25 @@ class receiveDataThread(QThread): print 'remoteCommand', repr(remoteCommand.replace('\x00','')), ' from', self.HOST printLock.release() if remoteCommand == 'version\x00\x00\x00\x00\x00': - self.recversion() + self.recversion(self.data[24:self.payloadLength+24]) elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00': self.recverack() elif remoteCommand == 'addr\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recaddr() + self.recaddr(self.data[24:self.payloadLength+24]) elif remoteCommand == 'getpubkey\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recgetpubkey() + self.recgetpubkey(self.data[24:self.payloadLength+24]) elif remoteCommand == 'pubkey\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recpubkey() + self.recpubkey(self.data[24:self.payloadLength+24]) elif remoteCommand == 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recinv() + self.recinv(self.data[24:self.payloadLength+24]) elif remoteCommand == 'getdata\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recgetdata() + self.recgetdata(self.data[24:self.payloadLength+24]) elif remoteCommand == 'getbiginv\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: self.sendBigInv() elif remoteCommand == 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recmsg() + self.recmsg(self.data[24:self.payloadLength+24]) elif remoteCommand == 'broadcast\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: - self.recbroadcast() + self.recbroadcast(self.data[24:self.payloadLength+24]) elif remoteCommand == 'getaddr\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: self.sendaddr() elif remoteCommand == 'ping\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished: @@ -371,9 +369,10 @@ class receiveDataThread(QThread): printLock.release() del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash] elif isInSqlInventory(objectHash): - printLock.acquire() - print 'Inventory (SQL on disk) already has object listed in inv message.' - printLock.release() + if verbose >= 2: + printLock.acquire() + print 'Inventory (SQL on disk) already has object listed in inv message.' + printLock.release() del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash] else: #print 'processData function making request for object:', objectHash.encode('hex') @@ -391,8 +390,8 @@ class receiveDataThread(QThread): print 'Checksum incorrect. Clearing this message.' self.data = self.data[self.payloadLength+24:] - def isProofOfWorkSufficient(self): - POW, = unpack('>Q',hashlib.sha512(hashlib.sha512(self.data[24:32]+ hashlib.sha512(self.data[32:24+self.payloadLength]).digest()).digest()).digest()[0:8]) + def isProofOfWorkSufficient(self,data): + POW, = unpack('>Q',hashlib.sha512(hashlib.sha512(data[:8]+ hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8]) #print 'POW:', POW #Notice that I have divided the averageProofOfWorkNonceTrialsPerByte by two. This makes the POW requirement easier. This gives us wiggle-room: if we decide that we want to make the POW easier, the change won't obsolete old clients because they already expect a lower POW. If we decide that the current work done by clients feels approperate then we can remove this division by 2 and make the requirement match what is actually done by a sending node. If we want to raise the POW requirement then old nodes will HAVE to upgrade no matter what. return POW < 2**64 / ((self.payloadLength+payloadLengthExtraBytes) * (averageProofOfWorkNonceTrialsPerByte/2)) @@ -490,13 +489,13 @@ class receiveDataThread(QThread): self.sock.send(headerData + payload) #We have received a broadcast message - def recbroadcast(self): + def recbroadcast(self,data): self.messageProcessingStartTime = time.time() #First we must check to make sure the proof of work is sufficient. - if not self.isProofOfWorkSufficient(): + if not self.isProofOfWorkSufficient(data): print 'Proof of work in broadcast message insufficient.' return - embeddedTime, = unpack('>I',self.data[32:36]) + embeddedTime, = unpack('>I',data[8:12]) if embeddedTime > (int(time.time())+10800): #prevent funny business print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.' return @@ -507,7 +506,7 @@ class receiveDataThread(QThread): print 'The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.' return inventoryLock.acquire() - self.inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24]) + self.inventoryHash = calculateInventoryHash(data) if self.inventoryHash in inventory: print 'We have already received this broadcast object. Ignoring.' inventoryLock.release() @@ -518,13 +517,13 @@ class receiveDataThread(QThread): return #It is valid so far. Let's let our peers know about it. objectType = 'broadcast' - inventory[self.inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime) + inventory[self.inventoryHash] = (objectType, self.streamNumber, data, embeddedTime) inventoryLock.release() self.broadcastinv(self.inventoryHash) self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()")) - self.processbroadcast()#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it. + self.processbroadcast(data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it. # Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster. if self.payloadLength > 100000000: #Size is greater than 100 megabytes @@ -548,32 +547,32 @@ class receiveDataThread(QThread): printLock.release() #A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one. - def processbroadcast(self): - readPosition = 36 - broadcastVersion, broadcastVersionLength = decodeVarint(self.data[readPosition:readPosition+9]) + def processbroadcast(self,data): + readPosition = 12 + broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9]) if broadcastVersion <> 1: #Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored. return readPosition += broadcastVersionLength beginningOfPubkeyPosition = readPosition #used when we add the pubkey to our pubkey table - sendersAddressVersion, sendersAddressVersionLength = decodeVarint(self.data[readPosition:readPosition+9]) + sendersAddressVersion, sendersAddressVersionLength = decodeVarint(data[readPosition:readPosition+9]) if sendersAddressVersion <= 1 or sendersAddressVersion >=3: #Cannot decode senderAddressVersion higher than 2. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored. return readPosition += sendersAddressVersionLength if sendersAddressVersion == 2: - sendersStream, sendersStreamLength = decodeVarint(self.data[readPosition:readPosition+9]) + sendersStream, sendersStreamLength = decodeVarint(data[readPosition:readPosition+9]) if sendersStream <= 0 or sendersStream <> self.streamNumber: return readPosition += sendersStreamLength - behaviorBitfield = self.data[readPosition:readPosition+4] + behaviorBitfield = data[readPosition:readPosition+4] readPosition += 4 - sendersPubSigningKey = '\x04' + self.data[readPosition:readPosition+64] + sendersPubSigningKey = '\x04' + data[readPosition:readPosition+64] readPosition += 64 - sendersPubEncryptionKey = '\x04' + self.data[readPosition:readPosition+64] + sendersPubEncryptionKey = '\x04' + data[readPosition:readPosition+64] readPosition += 64 endOfPubkeyPosition = readPosition - sendersHash = self.data[readPosition:readPosition+20] + sendersHash = data[readPosition:readPosition+20] if sendersHash not in broadcastSendersForWhichImWatching: #Display timing data printLock.acquire() @@ -590,20 +589,20 @@ class receiveDataThread(QThread): if ripe.digest() != sendersHash: #The sender of this message lied. return - messageEncodingType, messageEncodingTypeLength = decodeVarint(self.data[readPosition:readPosition+9]) + messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+9]) if messageEncodingType == 0: return readPosition += messageEncodingTypeLength - messageLength, messageLengthLength = decodeVarint(self.data[readPosition:readPosition+9]) + messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+9]) readPosition += messageLengthLength - message = self.data[readPosition:readPosition+messageLength] + message = data[readPosition:readPosition+messageLength] readPosition += messageLength readPositionAtBottomOfMessage = readPosition - signatureLength, signatureLengthLength = decodeVarint(self.data[readPosition:readPosition+9]) + signatureLength, signatureLengthLength = decodeVarint(data[readPosition:readPosition+9]) readPosition += signatureLengthLength - signature = self.data[readPosition:readPosition+signatureLength] + signature = data[readPosition:readPosition+signatureLength] try: - highlevelcrypto.verify(self.data[36:readPositionAtBottomOfMessage],signature,sendersPubSigningKey.encode('hex')) + highlevelcrypto.verify(data[12:readPositionAtBottomOfMessage],signature,sendersPubSigningKey.encode('hex')) print 'ECDSA verify passed' except Exception, err: print 'ECDSA verify failed', err @@ -612,7 +611,7 @@ class receiveDataThread(QThread): #Let's store the public key in case we want to reply to this person. #We don't have the correct nonce or time (which would let us send out a pubkey message) so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.) - t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+self.data[beginningOfPubkeyPosition:endOfPubkeyPosition],int(time.time()),'yes') + t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+data[beginningOfPubkeyPosition:endOfPubkeyPosition],int(time.time()),'yes') sqlLock.acquire() sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) @@ -663,97 +662,17 @@ class receiveDataThread(QThread): print 'Time spent processing this interesting broadcast:', time.time()- self.messageProcessingStartTime printLock.release() - """elif sendersAddressVersion == 1: - sendersStream, sendersStreamLength = decodeVarint(self.data[readPosition:readPosition+9]) - if sendersStream <= 0: - return - readPosition += sendersStreamLength - sendersHash = self.data[readPosition:readPosition+20] - if sendersHash not in broadcastSendersForWhichImWatching: - return - #At this point, this message claims to be from sendersHash and we are interested in it. We still have to hash the public key to make sure it is truly the key that matches the hash, and also check the signiture. - readPosition += 20 - nLength, nLengthLength = decodeVarint(self.data[readPosition:readPosition+9]) - if nLength < 1: - return - readPosition += nLengthLength - nString = self.data[readPosition:readPosition+nLength] - readPosition += nLength - eLength, eLengthLength = decodeVarint(self.data[readPosition:readPosition+9]) - if eLength < 1: - return - readPosition += eLengthLength - eString = self.data[readPosition:readPosition+eLength] - #We are now ready to hash the public key and verify that its hash matches the hash claimed in the message - readPosition += eLength - sha = hashlib.new('sha512') - sha.update(nString+eString) - ripe = hashlib.new('ripemd160') - ripe.update(sha.digest()) - if ripe.digest() != sendersHash: - #The sender of this message lied. - return - - readPositionAtBeginningOfMessageEncodingType = readPosition - messageEncodingType, messageEncodingTypeLength = decodeVarint(self.data[readPosition:readPosition+9]) - if messageEncodingType == 0: - return - readPosition += messageEncodingTypeLength - messageLength, messageLengthLength = decodeVarint(self.data[readPosition:readPosition+9]) - readPosition += messageLengthLength - message = self.data[readPosition:readPosition+messageLength] - readPosition += messageLength - signature = self.data[readPosition:readPosition+nLength] - sendersPubkey = rsa.PublicKey(convertStringToInt(nString),convertStringToInt(eString)) - #print 'senders Pubkey', sendersPubkey - try: - rsa.verify(self.data[readPositionAtBeginningOfMessageEncodingType:readPositionAtBeginningOfMessageEncodingType+messageEncodingTypeLength+messageLengthLength+messageLength],signature,sendersPubkey) - print 'verify passed' - except Exception, err: - print 'verify failed', err - return - #verify passed - fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest()) - print 'fromAddress:', fromAddress - - if messageEncodingType == 2: - bodyPositionIndex = string.find(message,'\nBody:') - if bodyPositionIndex > 1: - subject = message[8:bodyPositionIndex] - body = message[bodyPositionIndex+6:] - else: - subject = '' - body = message - elif messageEncodingType == 1: - body = message - subject = '' - elif messageEncodingType == 0: - print 'messageEncodingType == 0. Doing nothing with the message.' - else: - body = 'Unknown encoding type.\n\n' + repr(message) - subject = '' - - toAddress = '[Broadcast subscribers]' - if messageEncodingType <> 0: - sqlLock.acquire() - t = (self.inventoryHash,toAddress,fromAddress,subject,int(time.time()),body,'inbox') - sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - sqlLock.release() - self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body)""" - #We have received a msg message. - def recmsg(self): + def recmsg(self,data): self.messageProcessingStartTime = time.time() #First we must check to make sure the proof of work is sufficient. - if not self.isProofOfWorkSufficient(): + if not self.isProofOfWorkSufficient(data): print 'Proof of work in msg message insufficient.' return - readPosition = 32 - embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4]) + readPosition = 8 + embeddedTime, = unpack('>I',data[readPosition:readPosition+4]) if embeddedTime > int(time.time())+10800: print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime return @@ -761,12 +680,12 @@ class receiveDataThread(QThread): print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime return readPosition += 4 - streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(self.data[readPosition:readPosition+9]) + streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9]) if streamNumberAsClaimedByMsg != self.streamNumber: print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.' return readPosition += streamNumberAsClaimedByMsgLength - self.inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24]) + self.inventoryHash = calculateInventoryHash(data) inventoryLock.acquire() if self.inventoryHash in inventory: print 'We have already received this msg message. Ignoring.' @@ -778,12 +697,12 @@ class receiveDataThread(QThread): return #This msg message is valid. Let's let our peers know about it. objectType = 'msg' - inventory[self.inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime) + inventory[self.inventoryHash] = (objectType, self.streamNumber, data, embeddedTime) inventoryLock.release() self.broadcastinv(self.inventoryHash) self.emit(SIGNAL("incrementNumberOfMessagesProcessed()")) - self.processmsg(readPosition) #When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it. + self.processmsg(readPosition,data) #When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it. # Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are based on test timings and you may change them at-will. if self.payloadLength > 100000000: #Size is greater than 100 megabytes @@ -805,232 +724,35 @@ class receiveDataThread(QThread): printLock.acquire() print 'Total message processing time:', time.time()- self.messageProcessingStartTime, 'seconds.' printLock.release() - - - #This section is for my RSA keys (version 1 addresses). If we don't have any version 1 addresses it will never run. This code will soon be removed. - """initialDecryptionSuccessful = False - infile = cStringIO.StringIO(self.data[readPosition:self.payloadLength+24]) - outfile = cStringIO.StringIO() - #print 'len(myRSAAddressHashes.items()):', len(myRSAAddressHashes.items()) - for key, value in myRSAAddressHashes.items(): - try: - decrypt_bigfile(infile, outfile, value) - #The initial decryption passed though there is a small chance that the message isn't actually for me. We'll need to check that the 20 zeros are present. - #print 'initial decryption successful using key', repr(key) - initialDecryptionSuccessful = True - printLock.acquire() - print 'Initial decryption passed' - printLock.release() - break - except Exception, err: - infile.seek(0) - #print 'Exception:', err - #print 'outfile len is:', len(outfile.getvalue()),'data is:', repr(outfile.getvalue()) - #print 'Initial decryption failed using key', value - #decryption failed for this key. The message is for someone else (or for a different key of mine). - if initialDecryptionSuccessful and outfile.getvalue()[:20] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00': #this run of 0s allows the true message receiver to identify his message - #This is clearly a message bound for me. - outfile.seek(0) - data = outfile.getvalue() - readPosition = 20 #To start reading past the 20 zero bytes - messageVersion, messageVersionLength = decodeVarint(data[readPosition:readPosition+10]) - readPosition += messageVersionLength - if messageVersion == 1: - bitfieldBehavior = data[readPosition:readPosition+4] - readPosition += 4 - sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(data[readPosition:readPosition+10]) - if sendersAddressVersionNumber == 1: - readPosition += sendersAddressVersionNumberLength - sendersStreamNumber, sendersStreamNumberLength = decodeVarint(data[readPosition:readPosition+10]) - if sendersStreamNumber == 0: - print 'sendersStreamNumber = 0. Ignoring message' - else: - readPosition += sendersStreamNumberLength - - sendersNLength, sendersNLengthLength = decodeVarint(data[readPosition:readPosition+10]) - readPosition += sendersNLengthLength - sendersN = data[readPosition:readPosition+sendersNLength] - readPosition += sendersNLength - sendersELength, sendersELengthLength = decodeVarint(data[readPosition:readPosition+10]) - readPosition += sendersELengthLength - sendersE = data[readPosition:readPosition+sendersELength] - readPosition += sendersELength - endOfThePublicKeyPosition = readPosition - - messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+10]) - readPosition += messageEncodingTypeLength - print 'Message Encoding Type:', messageEncodingType - messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+10]) - print 'message length:', messageLength - readPosition += messageLengthLength - message = data[readPosition:readPosition+messageLength] - #print 'First 150 characters of message:', repr(message[:150]) - readPosition += messageLength - ackLength, ackLengthLength = decodeVarint(data[readPosition:readPosition+10]) - #print 'ackLength:', ackLength - readPosition += ackLengthLength - ackData = data[readPosition:readPosition+ackLength] - readPosition += ackLength - payloadSigniture = data[readPosition:readPosition+sendersNLength] #We're using the length of the sender's n because it should match the signiture size. - sendersPubkey = rsa.PublicKey(convertStringToInt(sendersN),convertStringToInt(sendersE)) - print 'sender\'s Pubkey', sendersPubkey - - #Check the cryptographic signiture - verifyPassed = False - try: - rsa.verify(data[:-len(payloadSigniture)],payloadSigniture, sendersPubkey) - print 'verify passed' - verifyPassed = True - except Exception, err: - print 'verify failed', err - if verifyPassed: - #calculate the fromRipe. - sha = hashlib.new('sha512') - sha.update(sendersN+sendersE) - ripe = hashlib.new('ripemd160') - ripe.update(sha.digest()) - - #Let's store the public key in case we want to reply to this person. - #We don't have the correct nonce in order to send out a pubkey message so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.) - t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+data[20+messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes') - sqlLock.acquire() - sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - sqlLock.release() - - blockMessage = False #Gets set to True if the user shouldn't see the message according to black or white lists. - fromAddress = encodeAddress(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()) - if config.get('bitmessagesettings', 'blackwhitelist') == 'black': #If we are using a blacklist - t = (fromAddress,) - sqlLock.acquire() - sqlSubmitQueue.put('''SELECT label, enabled FROM blacklist where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() - for row in queryreturn: - label, enabled = row - if enabled: - print 'Message ignored because address is in blacklist.' - blockMessage = True - else: #We're using a whitelist - t = (fromAddress,) - sqlLock.acquire() - sqlSubmitQueue.put('''SELECT label, enabled FROM whitelist where address=?''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() - if queryreturn == []: - print 'Message ignored because address not in whitelist.' - blockMessage = True - for row in queryreturn: #It could be in the whitelist but disabled. Let's check. - label, enabled = row - if not enabled: - print 'Message ignored because address in whitelist but not enabled.' - blockMessage = True - - if not blockMessage: - print 'fromAddress:', fromAddress - print 'First 150 characters of message:', repr(message[:150]) - - #Look up the destination address (my address) based on the destination ripe hash. - #I realize that I could have a data structure devoted to this task, or maintain an indexed table - #in the sql database, but I would prefer to minimize the number of data structures this program - #uses. Searching linearly through the user's short list of addresses doesn't take very long anyway. - configSections = config.sections() - for addressInKeysFile in configSections: - if addressInKeysFile <> 'bitmessagesettings': - status,addressVersionNumber,streamNumber,hash = decodeAddress(addressInKeysFile) - if hash == key: - toAddress = addressInKeysFile - toLabel = config.get(addressInKeysFile, 'label') - if toLabel == '': - toLabel = addressInKeysFile - break - - if messageEncodingType == 2: - bodyPositionIndex = string.find(message,'\nBody:') - if bodyPositionIndex > 1: - subject = message[8:bodyPositionIndex] - body = message[bodyPositionIndex+6:] - else: - subject = '' - body = message - elif messageEncodingType == 1: - body = message - subject = '' - elif messageEncodingType == 0: - print 'messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.' - else: - body = 'Unknown encoding type.\n\n' + repr(message) - subject = '' - print 'within recmsg, self.inventoryHash is', repr(self.inventoryHash) - if messageEncodingType <> 0: - sqlLock.acquire() - t = (self.inventoryHash,toAddress,fromAddress,subject,int(time.time()),body,'inbox') - sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - sqlLock.release() - self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body) - #Now let us worry about the acknowledgement data - #We'll need to make sure that our client will properly process the ackData; if the packet is malformed, it might cause us to clear out self.data and an attacker could use that behavior to determine that we decoded this message. - ackDataValidThusFar = True - if len(ackData) < 24: - print 'The length of ackData is unreasonably short. Not sending ackData.' - ackDataValidThusFar = False - if ackData[0:4] != '\xe9\xbe\xb4\xd9': - print 'Ackdata magic bytes were wrong. Not sending ackData.' - ackDataValidThusFar = False - if ackDataValidThusFar: - ackDataPayloadLength, = unpack('>L',ackData[16:20]) - if len(ackData)-24 != ackDataPayloadLength: #This ackData includes the protocol header which is not counted in the payload length. - print 'ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.' - ackDataValidThusFar = False - if ackDataValidThusFar: - print 'ackData is valid. Will process it.' - self.ackDataThatWeHaveYetToSend.append(ackData) #When we have processed all data, the processData function will pop the ackData out and process it as if it is a message received from our peer. - else: - print 'This program cannot decode messages from addresses with versions higher than 1. Ignoring.' - statusbar = 'This program cannot decode messages from addresses with versions higher than 1. Ignoring it.' - self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),statusbar) - else: - statusbar = 'Error: Cannot decode incoming msg versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage. Ignoring message.' - self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),statusbar) - else: - printLock.acquire() - print 'Could not decrypt with any RSA keys if you have any.' - printLock.release() - infile.close() - outfile.close()""" + #A msg message has a valid time and POW and requires processing. The recmsg function calls this one. - def processmsg(self,readPosition): + def processmsg(self,readPosition, encryptedData): initialDecryptionSuccessful = False #Let's check whether this is a message acknowledgement bound for us. - if self.data[readPosition:24+self.payloadLength] in ackdataForWhichImWatching: + if encryptedData[readPosition:] in ackdataForWhichImWatching: printLock.acquire() print 'This msg IS an acknowledgement bound for me.' printLock.release() - del ackdataForWhichImWatching[self.data[readPosition:24+self.payloadLength]] - t = ('ackreceived',self.data[readPosition:24+self.payloadLength]) + del ackdataForWhichImWatching[encryptedData[readPosition:]] + t = ('ackreceived',encryptedData[readPosition:]) sqlLock.acquire() sqlSubmitQueue.put('UPDATE sent SET status=? WHERE ackdata=?') sqlSubmitQueue.put(t) sqlReturnQueue.get() sqlLock.release() - self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),self.data[readPosition:24+self.payloadLength],'Acknowledgement of the message received just now.') + self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.') return else: printLock.acquire() - print 'This was NOT an acknowledgement bound for me.' #Msg potential ack data:', repr(self.data[readPosition:24+self.payloadLength]) + print 'This was NOT an acknowledgement bound for me.' #print 'ackdataForWhichImWatching', ackdataForWhichImWatching printLock.release() #This is not an acknowledgement bound for me. See if it is a message bound for me by trying to decrypt it with my private keys. for key, cryptorObject in myECAddressHashes.items(): try: - data = cryptorObject.decrypt(self.data[readPosition:self.payloadLength+24]) + unencryptedData = cryptorObject.decrypt(encryptedData[readPosition:]) toRipe = key #This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data. initialDecryptionSuccessful = True print 'EC decryption successful using key associated with ripe hash:', key.encode('hex') @@ -1046,12 +768,12 @@ class receiveDataThread(QThread): else: #This is a message bound for me. readPosition = 0 - messageVersion, messageVersionLength = decodeVarint(data[readPosition:readPosition+10]) + messageVersion, messageVersionLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += messageVersionLength if messageVersion != 1: print 'Cannot understand message versions other than one. Ignoring message.' return - sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(data[readPosition:readPosition+10]) + sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += sendersAddressVersionNumberLength if sendersAddressVersionNumber == 0: print 'Cannot understand sendersAddressVersionNumber = 0. Ignoring message.' @@ -1059,47 +781,47 @@ class receiveDataThread(QThread): if sendersAddressVersionNumber >= 3: print 'Sender\'s address version number', sendersAddressVersionNumber, ' not yet supported. Ignoring message.' return - if len(data) < 170: + if len(unencryptedData) < 170: print 'Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message.' return - sendersStreamNumber, sendersStreamNumberLength = decodeVarint(data[readPosition:readPosition+10]) + sendersStreamNumber, sendersStreamNumberLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) if sendersStreamNumber == 0: print 'sender\'s stream number is 0. Ignoring message.' return readPosition += sendersStreamNumberLength - behaviorBitfield = data[readPosition:readPosition+4] + behaviorBitfield = unencryptedData[readPosition:readPosition+4] readPosition += 4 - pubSigningKey = '\x04' + data[readPosition:readPosition+64] + pubSigningKey = '\x04' + unencryptedData[readPosition:readPosition+64] readPosition += 64 - pubEncryptionKey = '\x04' + data[readPosition:readPosition+64] + pubEncryptionKey = '\x04' + unencryptedData[readPosition:readPosition+64] readPosition += 64 endOfThePublicKeyPosition = readPosition #needed for when we store the pubkey in our database of pubkeys for later use. - if toRipe != data[readPosition:readPosition+20]: + if toRipe != unencryptedData[readPosition:readPosition+20]: printLock.acquire() print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.' - print 'See: http://tools.ietf.org/html/draft-ietf-smime-sender-auth-00' + print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html' print 'your toRipe:', toRipe.encode('hex') - print 'embedded destination toRipe:', data[readPosition:readPosition+20].encode('hex') + print 'embedded destination toRipe:', unencryptedData[readPosition:readPosition+20].encode('hex') printLock.release() return readPosition += 20 - messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+10]) + messageEncodingType, messageEncodingTypeLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += messageEncodingTypeLength - messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+10]) + messageLength, messageLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += messageLengthLength - message = data[readPosition:readPosition+messageLength] + message = unencryptedData[readPosition:readPosition+messageLength] #print 'First 150 characters of message:', repr(message[:150]) readPosition += messageLength - ackLength, ackLengthLength = decodeVarint(data[readPosition:readPosition+10]) + ackLength, ackLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += ackLengthLength - ackData = data[readPosition:readPosition+ackLength] + ackData = unencryptedData[readPosition:readPosition+ackLength] readPosition += ackLength positionOfBottomOfAckData = readPosition #needed to mark the end of what is covered by the signature - signatureLength, signatureLengthLength = decodeVarint(data[readPosition:readPosition+10]) + signatureLength, signatureLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10]) readPosition += signatureLengthLength - signature = data[readPosition:readPosition+signatureLength] + signature = unencryptedData[readPosition:readPosition+signatureLength] try: - highlevelcrypto.verify(data[:positionOfBottomOfAckData],signature,pubSigningKey.encode('hex')) + highlevelcrypto.verify(unencryptedData[:positionOfBottomOfAckData],signature,pubSigningKey.encode('hex')) print 'ECDSA verify passed' except Exception, err: print 'ECDSA verify failed', err @@ -1114,7 +836,7 @@ class receiveDataThread(QThread): ripe.update(sha.digest()) #Let's store the public key in case we want to reply to this person. #We don't have the correct nonce or time (which would let us send out a pubkey message) so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.) - t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+data[messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes') + t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+unencryptedData[messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes') sqlLock.acquire() sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) @@ -1265,18 +987,17 @@ class receiveDataThread(QThread): return '['+mailingListName+'] ' + subject #We have received a pubkey - def recpubkey(self): + def recpubkey(self,data): self.pubkeyProcessingStartTime = time.time() - if self.payloadLength < 146: #sanity check + if len(data) < 146 or len(data) >600: #sanity check return #We must check to make sure the proof of work is sufficient. - if not self.isProofOfWorkSufficient(): + if not self.isProofOfWorkSufficient(data): print 'Proof of work in pubkey message insufficient.' return - readPosition = 24 #for the message header - readPosition += 8 #for the nonce - embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4]) + readPosition = 8 #for the nonce + embeddedTime, = unpack('>I',data[readPosition:readPosition+4]) if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys-86400: #If the pubkey is more than a month old then reject it. (the 86400 is included to give an extra day of wiggle-room. If the wiggle-room is actually of any use, everyone on the network will delete this pubkey from their database the next time the cleanerThread cleans anyway- except for the node that actually wants the pubkey.) printLock.acquire() print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime @@ -1288,15 +1009,15 @@ class receiveDataThread(QThread): printLock.release() return readPosition += 4 #for the time - addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) + addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10]) readPosition += varintLength - streamNumber, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) + streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10]) readPosition += varintLength if self.streamNumber != streamNumber: print 'stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.' return - inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24]) + inventoryHash = calculateInventoryHash(data) inventoryLock.acquire() if inventoryHash in inventory: print 'We have already received this pubkey. Ignoring it.' @@ -1307,12 +1028,12 @@ class receiveDataThread(QThread): inventoryLock.release() return objectType = 'pubkey' - inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], int(time.time())) + inventory[inventoryHash] = (objectType, self.streamNumber, data, int(time.time())) inventoryLock.release() self.broadcastinv(inventoryHash) self.emit(SIGNAL("incrementNumberOfPubkeysProcessed()")) - self.processpubkey() + self.processpubkey(data) lengthOfTimeWeShouldUseToProcessThisMessage = .2 sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - (time.time()- self.pubkeyProcessingStartTime) @@ -1325,14 +1046,13 @@ class receiveDataThread(QThread): #print 'Total pubkey processing time:', time.time()- self.pubkeyProcessingStartTime, 'seconds.' #printLock.release() - def processpubkey(self): - readPosition = 24 #for the message header - readPosition += 8 #for the nonce - embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4]) + def processpubkey(self,data): + readPosition = 8 #for the nonce + embeddedTime, = unpack('>I',data[readPosition:readPosition+4]) readPosition += 4 #for the time - addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) + addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10]) readPosition += varintLength - streamNumber, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) + streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10]) readPosition += varintLength if addressVersion == 0: print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.' @@ -1346,12 +1066,12 @@ class receiveDataThread(QThread): if self.payloadLength < 146: #sanity check. This is the minimum possible length. print 'payloadLength less than 146. Sanity check failed.' return - bitfieldBehaviors = self.data[readPosition:readPosition+4] + bitfieldBehaviors = data[readPosition:readPosition+4] readPosition += 4 - publicSigningKey = self.data[readPosition:readPosition+64] + publicSigningKey = data[readPosition:readPosition+64] #Is it possible for a public key to be invalid such that trying to encrypt or sign with it will cause an error? If it is, we should probably test these keys here. readPosition += 64 - publicEncryptionKey = self.data[readPosition:readPosition+64] + publicEncryptionKey = data[readPosition:readPosition+64] if len(publicEncryptionKey) < 64: print 'publicEncryptionKey length less than 64. Sanity check failed.' return @@ -1376,7 +1096,7 @@ class receiveDataThread(QThread): sqlLock.release() if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: print 'We HAVE used this pubkey personally. Updating time.' - t = (ripe,True,self.data[24:24+self.payloadLength],embeddedTime,'yes') + t = (ripe,True,data,embeddedTime,'yes') sqlLock.acquire() sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) @@ -1387,7 +1107,7 @@ class receiveDataThread(QThread): workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) else: print 'We have NOT used this pubkey personally. Inserting in database.' - t = (ripe,True,self.data[24:24+self.payloadLength],embeddedTime,'no') #This will also update the embeddedTime. + t = (ripe,True,data,embeddedTime,'no') #This will also update the embeddedTime. sqlLock.acquire() sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) @@ -1397,66 +1117,16 @@ class receiveDataThread(QThread): printLock.release() workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) - #This code which deals with old RSA addresses will soon be removed. - """elif addressVersion == 1: - nLength, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) - readPosition += varintLength - nString = self.data[readPosition:readPosition+nLength] - readPosition += nLength - eLength, varintLength = decodeVarint(self.data[readPosition:readPosition+10]) - readPosition += varintLength - eString = self.data[readPosition:readPosition+eLength] - readPosition += eLength - - sha = hashlib.new('sha512') - sha.update(nString+eString) - ripeHasher = hashlib.new('ripemd160') - ripeHasher.update(sha.digest()) - ripe = ripeHasher.digest() - - print 'within recpubkey, addressVersion', addressVersion - print 'streamNumber', streamNumber - print 'ripe', repr(ripe) - print 'n=', convertStringToInt(nString) - print 'e=', convertStringToInt(eString) - - t = (ripe,) - sqlLock.acquire() - sqlSubmitQueue.put('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() - if queryreturn != []: #if this pubkey is already in our database and if we have used it personally: - print 'We HAVE used this pubkey personally. Updating time.' - t = (ripe,True,self.data[24:24+self.payloadLength],int(time.time()),'yes') - sqlLock.acquire() - sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - sqlLock.release() - printLock.acquire() - print 'added foreign pubkey into our database' - printLock.release() - workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) - else: - print 'We have NOT used this pubkey personally. Inserting in database.' - t = (ripe,True,self.data[24:24+self.payloadLength],int(time.time()),'no') - sqlLock.acquire() - sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') - sqlSubmitQueue.put(t) - sqlReturnQueue.get() - sqlLock.release() - printLock.acquire() - print 'added foreign pubkey into our database' - printLock.release() - workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))""" #We have received a getpubkey message - def recgetpubkey(self): - if not self.isProofOfWorkSufficient(): + def recgetpubkey(self,data): + if not self.isProofOfWorkSufficient(data): print 'Proof of work in getpubkey message insufficient.' return - embeddedTime, = unpack('>I',self.data[32:36]) + if len(data) < 34: + print 'getpubkey message doesn\'t contain enough data. Ignoring.' + return + embeddedTime, = unpack('>I',data[8:12]) if embeddedTime > int(time.time())+10800: print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime return @@ -1464,13 +1134,13 @@ class receiveDataThread(QThread): print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime return - addressVersionNumber, addressVersionLength = decodeVarint(self.data[36:42]) - streamNumber, streamNumberLength = decodeVarint(self.data[36+addressVersionLength:42+addressVersionLength]) + addressVersionNumber, addressVersionLength = decodeVarint(data[12:22]) + streamNumber, streamNumberLength = decodeVarint(data[12+addressVersionLength:22+addressVersionLength]) if streamNumber <> self.streamNumber: print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber return - inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24]) + inventoryHash = calculateInventoryHash(data) inventoryLock.acquire() if inventoryHash in inventory: print 'We have already received this getpubkey request. Ignoring it.' @@ -1482,7 +1152,7 @@ class receiveDataThread(QThread): return self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0 objectType = 'getpubkey' - inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime) + inventory[inventoryHash] = (objectType, self.streamNumber, data, embeddedTime) inventoryLock.release() #This getpubkey request is valid so far. Forward to peers. self.broadcastinv(inventoryHash) @@ -1496,10 +1166,15 @@ class receiveDataThread(QThread): elif addressVersionNumber > 2: print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.' return - print 'the hash requested in this getpubkey request is:', self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength].encode('hex') + + requestedHash = data[12+addressVersionLength+streamNumberLength:32+addressVersionLength+streamNumberLength] + if len(requestedHash) != 20: + print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.' + return + print 'the hash requested in this getpubkey request is:', requestedHash.encode('hex') sqlLock.acquire() - t = (self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength],int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) #this prevents SQL injection + t = (requestedHash,int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) #this prevents SQL injection sqlSubmitQueue.put('''SELECT hash, transmitdata, time FROM pubkeys WHERE hash=? AND havecorrectnonce=1 AND time>?''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() @@ -1515,48 +1190,11 @@ class receiveDataThread(QThread): inventory[inventoryHash] = (objectType, self.streamNumber, payload, timeEncodedInPubkey)#If the time embedded in this pubkey is more than 3 days old then this object isn't going to last very long in the inventory- the cleanerThread is going to come along and move it from the inventory in memory to the SQL inventory and then delete it from the SQL inventory. It should still find its way back to the original requestor if he is online however. self.broadcastinv(inventoryHash) else: #the pubkey is not in our database of pubkeys. Let's check if the requested key is ours (which would mean we should do the POW, put it in the pubkey table, and broadcast out the pubkey.) - if self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength] in myECAddressHashes: #if this address hash is one of mine + if requestedHash in myECAddressHashes: #if this address hash is one of mine printLock.acquire() print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.' printLock.release() - myAddress = encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]) - workerQueue.put(('doPOWForMyV2Pubkey',myAddress)) - - #This code which deals with old RSA addresses will soon be removed. - """elif self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength] in myRSAAddressHashes: - print 'Found getpubkey requested hash in my list of RSA hashes.' - payload = '\x00\x00\x00\x01' #bitfield of features supported by me (see the wiki). - payload += self.data[36:36+addressVersionLength+streamNumberLength] - #print int(config.get(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'n')) - nString = convertIntToString(int(config.get(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'n'))) - eString = convertIntToString(config.getint(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'e')) - payload += encodeVarint(len(nString)) - payload += nString - payload += encodeVarint(len(eString)) - payload += eString - - nonce = 0 - trialValue = 99999999999999999999 - target = 2**64 / ((len(payload)+payloadLengthExtraBytes+8) * averageProofOfWorkNonceTrialsPerByte) - print '(For pubkey message) Doing proof of work...' - initialHash = hashlib.sha512(payload).digest() - while trialValue > target: - nonce += 1 - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - print '(For pubkey message) Found proof of work', trialValue, 'Nonce:', nonce - - payload = pack('>Q',nonce) + payload - t = (self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength],True,payload,int(time.time())+1209600) #after two weeks (1,209,600 seconds), we may remove our own pub key from our database. It will be regenerated and put back in the database if it is requested. - sqlLock.acquire() - #** pubkeys insert query not yet fixed! ** - sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release() - inventoryHash = calculateInventoryHash(payload) - objectType = 'pubkey' - inventory[inventoryHash] = (objectType, self.streamNumber, payload, int(time.time())) - self.broadcastinv(inventoryHash) """ + workerQueue.put(('doPOWForMyV2Pubkey',requestedHash)) else: printLock.acquire() print 'This getpubkey request is not for any of my keys.' @@ -1564,26 +1202,26 @@ class receiveDataThread(QThread): #We have received an inv message - def recinv(self): - numberOfItemsInInv, lengthOfVarint = decodeVarint(self.data[24:34]) + def recinv(self,data): + numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10]) + if len(data) < lengthOfVarint + (numberOfItemsInInv * 32): + print 'inv message doesn\'t contain enough data. Ignoring.' + return if numberOfItemsInInv == 1: #we'll just request this data from the person who advertised the object. - for i in range(numberOfItemsInInv): - if len(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously. - self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0 - if self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)] in inventory: - printLock.acquire() - print 'Inventory (in memory) has inventory item already.' - printLock.release() - elif isInSqlInventory(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]): - print 'Inventory (SQL on disk) has inventory item already.' - else: - self.sendgetdata(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]) + if data[lengthOfVarint:32+lengthOfVarint] in inventory: + printLock.acquire() + print 'Inventory (in memory) has inventory item already.' + printLock.release() + elif isInSqlInventory(data[lengthOfVarint:32+lengthOfVarint]): + print 'Inventory (SQL on disk) has inventory item already.' + else: + self.sendgetdata(data[lengthOfVarint:32+lengthOfVarint]) else: print 'inv message lists', numberOfItemsInInv, 'objects.' for i in range(numberOfItemsInInv): #upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers. - if len(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously. - self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0 - self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0 + if len(data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously. + self.objectsOfWhichThisRemoteNodeIsAlreadyAware[data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]] = 0 + self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]] = 0 #Send a getdata message to our peer to request the object with the given hash @@ -1603,12 +1241,12 @@ class receiveDataThread(QThread): printLock.release() #We have received a getdata request from our peer - def recgetdata(self): - value, lengthOfVarint = decodeVarint(self.data[24:34]) + def recgetdata(self, data): + value, lengthOfVarint = decodeVarint(data[:10]) #print 'Number of items in getdata request:', value try: for i in xrange(value): - hash = self.data[24+lengthOfVarint+(i*32):56+lengthOfVarint+(i*32)] + hash = data[lengthOfVarint+(i*32):32+lengthOfVarint+(i*32)] printLock.acquire() print 'received getdata request for item:', hash.encode('hex') printLock.release() @@ -1682,10 +1320,10 @@ class receiveDataThread(QThread): #We have received an addr message. - def recaddr(self): + def recaddr(self,data): listOfAddressDetailsToBroadcastToPeers = [] numberOfAddressesIncluded = 0 - numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(self.data[24:29]) + numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(data[:10]) if verbose >= 1: printLock.acquire() @@ -1702,61 +1340,56 @@ class receiveDataThread(QThread): needToWriteKnownNodesToDisk = False for i in range(0,numberOfAddressesIncluded): try: - if self.data[40+lengthOfNumberOfAddresses+(34*i):52+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': + if data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': printLock.acquire() - print 'Skipping IPv6 address.', repr(self.data[40+lengthOfNumberOfAddresses+(34*i):56+lengthOfNumberOfAddresses+(34*i)]) + print 'Skipping IPv6 address.', repr(data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)]) printLock.release() continue - #print repr(self.data[6+lengthOfNumberOfAddresses+(34*i):18+lengthOfNumberOfAddresses+(34*i)]) except Exception, err: - if verbose >= 2: - printLock.acquire() - sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) - printLock.release() + printLock.acquire() + sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) + printLock.release() break #giving up on unpacking any more. We should still be connected however. try: - recaddrStream, = unpack('>I',self.data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)]) + recaddrStream, = unpack('>I',data[4+lengthOfNumberOfAddresses+(34*i):8+lengthOfNumberOfAddresses+(34*i)]) except Exception, err: - if verbose >= 2: - printLock.acquire() - sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - printLock.release() + printLock.acquire() + sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) + printLock.release() break #giving up on unpacking any more. We should still be connected however. if recaddrStream == 0: continue if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. continue try: - recaddrServices, = unpack('>Q',self.data[32+lengthOfNumberOfAddresses+(34*i):40+lengthOfNumberOfAddresses+(34*i)]) + recaddrServices, = unpack('>Q',data[8+lengthOfNumberOfAddresses+(34*i):16+lengthOfNumberOfAddresses+(34*i)]) except Exception, err: - if verbose >= 2: - printLock.acquire() - sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) - printLock.release() + printLock.acquire() + sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) + printLock.release() break #giving up on unpacking any more. We should still be connected however. try: - recaddrPort, = unpack('>H',self.data[56+lengthOfNumberOfAddresses+(34*i):58+lengthOfNumberOfAddresses+(34*i)]) + recaddrPort, = unpack('>H',data[32+lengthOfNumberOfAddresses+(34*i):34+lengthOfNumberOfAddresses+(34*i)]) except Exception, err: - if verbose >= 2: - printLock.acquire() - sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) - printLock.release() + printLock.acquire() + sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) + printLock.release() break #giving up on unpacking any more. We should still be connected however. #print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i - hostFromAddrMessage = socket.inet_ntoa(self.data[52+lengthOfNumberOfAddresses+(34*i):56+lengthOfNumberOfAddresses+(34*i)]) + hostFromAddrMessage = socket.inet_ntoa(data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)]) #print 'hostFromAddrMessage', hostFromAddrMessage - if self.data[52+lengthOfNumberOfAddresses+(34*i)] == '\x7F': + if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x7F': print 'Ignoring IP address in loopback range:', hostFromAddrMessage continue - if self.data[52+lengthOfNumberOfAddresses+(34*i)] == '\x0A': + if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x0A': print 'Ignoring IP address in private range:', hostFromAddrMessage continue - if self.data[52+lengthOfNumberOfAddresses+(34*i):52+lengthOfNumberOfAddresses+(34*i)+2] == '\xC0A8': + if data[28+lengthOfNumberOfAddresses+(34*i):30+lengthOfNumberOfAddresses+(34*i)] == '\xC0A8': print 'Ignoring IP address in private range:', hostFromAddrMessage continue - timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',self.data[24+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message. + timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message. if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. knownNodes[recaddrStream] = {} if hostFromAddrMessage not in knownNodes[recaddrStream]: @@ -1799,7 +1432,7 @@ class receiveDataThread(QThread): datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] datatosend = datatosend + payload - if verbose >= 2: + if verbose >= 1: printLock.acquire() print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' printLock.release() @@ -1872,33 +1505,31 @@ class receiveDataThread(QThread): datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] datatosend = datatosend + payload - if verbose >= 2: + if verbose >= 1: printLock.acquire() print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' printLock.release() self.sock.send(datatosend) #We have received a version message - def recversion(self): + def recversion(self,data): if self.payloadLength < 83: #This version message is unreasonably short. Forget it. return - elif not self.verackSent: #There is a potential exploit if we don't check to make sure that we have not already received and accepted a version message: An attacker could connect directly to us, send a msg message with the ackdata set to an invalid version message which would cause us to close the connection to the attacker thus proving that we were able to decode the message. Checking the connectionIsOrWasFullyEstablished variable would also suffice. - self.remoteProtocolVersion, = unpack('>L',self.data[24:28]) + elif not self.verackSent: + self.remoteProtocolVersion, = unpack('>L',data[:4]) #print 'remoteProtocolVersion', self.remoteProtocolVersion - self.myExternalIP = socket.inet_ntoa(self.data[64:68]) + self.myExternalIP = socket.inet_ntoa(data[40:44]) #print 'myExternalIP', self.myExternalIP - self.remoteNodeIncomingPort, = unpack('>H',self.data[94:96]) + self.remoteNodeIncomingPort, = unpack('>H',data[70:72]) #print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort - #print 'self.data[96:104]', repr(self.data[96:104]) - #print 'eightBytesOfRandomDataUsedToDetectConnectionsToSelf', repr(eightBytesOfRandomDataUsedToDetectConnectionsToSelf) - useragentLength, lengthOfUseragentVarint = decodeVarint(self.data[104:108]) - readPosition = 104 + lengthOfUseragentVarint - useragent = self.data[readPosition:readPosition+useragentLength] + useragentLength, lengthOfUseragentVarint = decodeVarint(data[80:84]) + readPosition = 80 + lengthOfUseragentVarint + useragent = data[readPosition:readPosition+useragentLength] readPosition += useragentLength - numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(self.data[readPosition:]) + numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(data[readPosition:]) readPosition += lengthOfNumberOfStreamsInVersionMessage - self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(self.data[readPosition:]) + self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:]) printLock.acquire() print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber printLock.release() @@ -1907,17 +1538,15 @@ class receiveDataThread(QThread): printLock.acquire() print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber,'.' printLock.release() - self.data = '' return #If this was an incoming connection, then the sendData thread doesn't know the stream. We have to set it. if not self.initiatedConnection: broadcastToSendDataQueues((0,'setStreamNumber',(self.HOST,self.streamNumber))) - if self.data[96:104] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf: + if data[72:80] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf: self.sock.close() printLock.acquire() print 'Closing connection to myself: ', self.HOST printLock.release() - self.data = '' return knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time())) @@ -1925,12 +1554,6 @@ class receiveDataThread(QThread): pickle.dump(knownNodes, output) output.close() - #I've commented out this code because it should be up to the newer node to decide whether their protocol version is incompatiable with the remote node's version. - '''if self.remoteProtocolVersion > 1: - print 'The remote node''s protocol version is too new for this program to understand. Disconnecting. It is:', self.remoteProtocolVersion - self.sock.close() - self.selfInitiatedConnectionList.remove(self) - else:''' self.sendverack() if self.initiatedConnection == False: self.sendversion() @@ -2520,11 +2143,19 @@ class singleWorker(QThread): workerQueue.task_done() - def doPOWForMyV2Pubkey(self,myAddress): #This function also broadcasts out the pubkey message once it is done with the POW - status,addressVersionNumber,streamNumber,hash = decodeAddress(myAddress) + def doPOWForMyV2Pubkey(self,hash): #This function also broadcasts out the pubkey message once it is done with the POW + #Look up my stream number based on my address hash + configSections = config.sections() + for addressInKeysFile in configSections: + if addressInKeysFile <> 'bitmessagesettings': + status,addressVersionNumber,streamNumber,hashFromThisParticularAddress = decodeAddress(addressInKeysFile) + if hash == hashFromThisParticularAddress: + myAddress = addressInKeysFile + break + embeddedTime = int(time.time())+random.randrange(-300, 300) #the current time plus or minus five minutes payload = pack('>I',(embeddedTime)) - payload += encodeVarint(2) #Address version number + payload += encodeVarint(addressVersionNumber) #Address version number payload += encodeVarint(streamNumber) payload += '\x00\x00\x00\x01' #bitfield of features supported by me (see the wiki). diff --git a/defaultKnownNodes.py b/defaultKnownNodes.py index 250877da..8c3330db 100644 --- a/defaultKnownNodes.py +++ b/defaultKnownNodes.py @@ -10,6 +10,8 @@ def createDefaultKnownNodes(appdata): ############## Stream 1 ################ stream1 = {} + stream1['80.69.173.220'] = (443,int(time.time())) + stream1['109.95.105.15'] = (8443,int(time.time())) stream1['66.65.120.151'] = (8080,int(time.time())) stream1['76.180.233.38'] = (8444,int(time.time())) stream1['84.48.88.42'] = (8444,int(time.time()))