Better deserialization

This commit is contained in:
Jonathan Warren 2013-03-28 17:56:20 -04:00
parent dda83acfe1
commit ce8de60d96
2 changed files with 183 additions and 550 deletions

View File

@ -7,7 +7,7 @@
#Right now, PyBitmessage only support connecting to stream 1. It doesn't yet contain logic to expand into further streams.
softwareVersion = '0.2.7'
verbose = 2
verbose = 1
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 #Equals two days and 12 hours.
lengthOfTimeToLeaveObjectsInInventory = 237600 #Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
lengthOfTimeToHoldOnToAllPubkeys = 2419200 #Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
@ -40,8 +40,6 @@ from defaultKnownNodes import *
import time
import socket
import threading
#import rsa
#from rsa.bigfile import *
import hashlib
from struct import *
import pickle
@ -255,7 +253,7 @@ class receiveDataThread(QThread):
while True:
try:
self.data = self.data + self.sock.recv(65536)
self.data += self.sock.recv(65536)
except socket.timeout:
printLock.acquire()
print 'Timeout occurred waiting for data. Closing receiveData thread.'
@ -305,14 +303,14 @@ class receiveDataThread(QThread):
def processData(self):
global verbose
#if verbose >= 2:
#if verbose >= 3:
#printLock.acquire()
#print 'self.data is currently ', repr(self.data)
#printLock.release()
if len(self.data) < 20: #if so little of the data has arrived that we can't even unpack the payload length
pass
elif self.data[0:4] != '\xe9\xbe\xb4\xd9':
if verbose >= 2:
if verbose >= 1:
printLock.acquire()
sys.stderr.write('The magic bytes were not correct. First 40 bytes of data: %s\n' % repr(self.data[0:40]))
print 'self.data:', self.data.encode('hex')
@ -332,25 +330,25 @@ class receiveDataThread(QThread):
print 'remoteCommand', repr(remoteCommand.replace('\x00','')), ' from', self.HOST
printLock.release()
if remoteCommand == 'version\x00\x00\x00\x00\x00':
self.recversion()
self.recversion(self.data[24:self.payloadLength+24])
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
self.recverack()
elif remoteCommand == 'addr\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recaddr()
self.recaddr(self.data[24:self.payloadLength+24])
elif remoteCommand == 'getpubkey\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recgetpubkey()
self.recgetpubkey(self.data[24:self.payloadLength+24])
elif remoteCommand == 'pubkey\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recpubkey()
self.recpubkey(self.data[24:self.payloadLength+24])
elif remoteCommand == 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recinv()
self.recinv(self.data[24:self.payloadLength+24])
elif remoteCommand == 'getdata\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recgetdata()
self.recgetdata(self.data[24:self.payloadLength+24])
elif remoteCommand == 'getbiginv\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.sendBigInv()
elif remoteCommand == 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recmsg()
self.recmsg(self.data[24:self.payloadLength+24])
elif remoteCommand == 'broadcast\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.recbroadcast()
self.recbroadcast(self.data[24:self.payloadLength+24])
elif remoteCommand == 'getaddr\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
self.sendaddr()
elif remoteCommand == 'ping\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
@ -371,9 +369,10 @@ class receiveDataThread(QThread):
printLock.release()
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash]
elif isInSqlInventory(objectHash):
printLock.acquire()
print 'Inventory (SQL on disk) already has object listed in inv message.'
printLock.release()
if verbose >= 2:
printLock.acquire()
print 'Inventory (SQL on disk) already has object listed in inv message.'
printLock.release()
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[objectHash]
else:
#print 'processData function making request for object:', objectHash.encode('hex')
@ -391,8 +390,8 @@ class receiveDataThread(QThread):
print 'Checksum incorrect. Clearing this message.'
self.data = self.data[self.payloadLength+24:]
def isProofOfWorkSufficient(self):
POW, = unpack('>Q',hashlib.sha512(hashlib.sha512(self.data[24:32]+ hashlib.sha512(self.data[32:24+self.payloadLength]).digest()).digest()).digest()[0:8])
def isProofOfWorkSufficient(self,data):
POW, = unpack('>Q',hashlib.sha512(hashlib.sha512(data[:8]+ hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8])
#print 'POW:', POW
#Notice that I have divided the averageProofOfWorkNonceTrialsPerByte by two. This makes the POW requirement easier. This gives us wiggle-room: if we decide that we want to make the POW easier, the change won't obsolete old clients because they already expect a lower POW. If we decide that the current work done by clients feels approperate then we can remove this division by 2 and make the requirement match what is actually done by a sending node. If we want to raise the POW requirement then old nodes will HAVE to upgrade no matter what.
return POW < 2**64 / ((self.payloadLength+payloadLengthExtraBytes) * (averageProofOfWorkNonceTrialsPerByte/2))
@ -490,13 +489,13 @@ class receiveDataThread(QThread):
self.sock.send(headerData + payload)
#We have received a broadcast message
def recbroadcast(self):
def recbroadcast(self,data):
self.messageProcessingStartTime = time.time()
#First we must check to make sure the proof of work is sufficient.
if not self.isProofOfWorkSufficient():
if not self.isProofOfWorkSufficient(data):
print 'Proof of work in broadcast message insufficient.'
return
embeddedTime, = unpack('>I',self.data[32:36])
embeddedTime, = unpack('>I',data[8:12])
if embeddedTime > (int(time.time())+10800): #prevent funny business
print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.'
return
@ -507,7 +506,7 @@ class receiveDataThread(QThread):
print 'The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.'
return
inventoryLock.acquire()
self.inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
self.inventoryHash = calculateInventoryHash(data)
if self.inventoryHash in inventory:
print 'We have already received this broadcast object. Ignoring.'
inventoryLock.release()
@ -518,13 +517,13 @@ class receiveDataThread(QThread):
return
#It is valid so far. Let's let our peers know about it.
objectType = 'broadcast'
inventory[self.inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime)
inventory[self.inventoryHash] = (objectType, self.streamNumber, data, embeddedTime)
inventoryLock.release()
self.broadcastinv(self.inventoryHash)
self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()"))
self.processbroadcast()#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it.
self.processbroadcast(data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it.
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster.
if self.payloadLength > 100000000: #Size is greater than 100 megabytes
@ -548,32 +547,32 @@ class receiveDataThread(QThread):
printLock.release()
#A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one.
def processbroadcast(self):
readPosition = 36
broadcastVersion, broadcastVersionLength = decodeVarint(self.data[readPosition:readPosition+9])
def processbroadcast(self,data):
readPosition = 12
broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9])
if broadcastVersion <> 1:
#Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored.
return
readPosition += broadcastVersionLength
beginningOfPubkeyPosition = readPosition #used when we add the pubkey to our pubkey table
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(self.data[readPosition:readPosition+9])
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(data[readPosition:readPosition+9])
if sendersAddressVersion <= 1 or sendersAddressVersion >=3:
#Cannot decode senderAddressVersion higher than 2. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored.
return
readPosition += sendersAddressVersionLength
if sendersAddressVersion == 2:
sendersStream, sendersStreamLength = decodeVarint(self.data[readPosition:readPosition+9])
sendersStream, sendersStreamLength = decodeVarint(data[readPosition:readPosition+9])
if sendersStream <= 0 or sendersStream <> self.streamNumber:
return
readPosition += sendersStreamLength
behaviorBitfield = self.data[readPosition:readPosition+4]
behaviorBitfield = data[readPosition:readPosition+4]
readPosition += 4
sendersPubSigningKey = '\x04' + self.data[readPosition:readPosition+64]
sendersPubSigningKey = '\x04' + data[readPosition:readPosition+64]
readPosition += 64
sendersPubEncryptionKey = '\x04' + self.data[readPosition:readPosition+64]
sendersPubEncryptionKey = '\x04' + data[readPosition:readPosition+64]
readPosition += 64
endOfPubkeyPosition = readPosition
sendersHash = self.data[readPosition:readPosition+20]
sendersHash = data[readPosition:readPosition+20]
if sendersHash not in broadcastSendersForWhichImWatching:
#Display timing data
printLock.acquire()
@ -590,20 +589,20 @@ class receiveDataThread(QThread):
if ripe.digest() != sendersHash:
#The sender of this message lied.
return
messageEncodingType, messageEncodingTypeLength = decodeVarint(self.data[readPosition:readPosition+9])
messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+9])
if messageEncodingType == 0:
return
readPosition += messageEncodingTypeLength
messageLength, messageLengthLength = decodeVarint(self.data[readPosition:readPosition+9])
messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+9])
readPosition += messageLengthLength
message = self.data[readPosition:readPosition+messageLength]
message = data[readPosition:readPosition+messageLength]
readPosition += messageLength
readPositionAtBottomOfMessage = readPosition
signatureLength, signatureLengthLength = decodeVarint(self.data[readPosition:readPosition+9])
signatureLength, signatureLengthLength = decodeVarint(data[readPosition:readPosition+9])
readPosition += signatureLengthLength
signature = self.data[readPosition:readPosition+signatureLength]
signature = data[readPosition:readPosition+signatureLength]
try:
highlevelcrypto.verify(self.data[36:readPositionAtBottomOfMessage],signature,sendersPubSigningKey.encode('hex'))
highlevelcrypto.verify(data[12:readPositionAtBottomOfMessage],signature,sendersPubSigningKey.encode('hex'))
print 'ECDSA verify passed'
except Exception, err:
print 'ECDSA verify failed', err
@ -612,7 +611,7 @@ class receiveDataThread(QThread):
#Let's store the public key in case we want to reply to this person.
#We don't have the correct nonce or time (which would let us send out a pubkey message) so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.)
t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+self.data[beginningOfPubkeyPosition:endOfPubkeyPosition],int(time.time()),'yes')
t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+data[beginningOfPubkeyPosition:endOfPubkeyPosition],int(time.time()),'yes')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
@ -663,97 +662,17 @@ class receiveDataThread(QThread):
print 'Time spent processing this interesting broadcast:', time.time()- self.messageProcessingStartTime
printLock.release()
"""elif sendersAddressVersion == 1:
sendersStream, sendersStreamLength = decodeVarint(self.data[readPosition:readPosition+9])
if sendersStream <= 0:
return
readPosition += sendersStreamLength
sendersHash = self.data[readPosition:readPosition+20]
if sendersHash not in broadcastSendersForWhichImWatching:
return
#At this point, this message claims to be from sendersHash and we are interested in it. We still have to hash the public key to make sure it is truly the key that matches the hash, and also check the signiture.
readPosition += 20
nLength, nLengthLength = decodeVarint(self.data[readPosition:readPosition+9])
if nLength < 1:
return
readPosition += nLengthLength
nString = self.data[readPosition:readPosition+nLength]
readPosition += nLength
eLength, eLengthLength = decodeVarint(self.data[readPosition:readPosition+9])
if eLength < 1:
return
readPosition += eLengthLength
eString = self.data[readPosition:readPosition+eLength]
#We are now ready to hash the public key and verify that its hash matches the hash claimed in the message
readPosition += eLength
sha = hashlib.new('sha512')
sha.update(nString+eString)
ripe = hashlib.new('ripemd160')
ripe.update(sha.digest())
if ripe.digest() != sendersHash:
#The sender of this message lied.
return
readPositionAtBeginningOfMessageEncodingType = readPosition
messageEncodingType, messageEncodingTypeLength = decodeVarint(self.data[readPosition:readPosition+9])
if messageEncodingType == 0:
return
readPosition += messageEncodingTypeLength
messageLength, messageLengthLength = decodeVarint(self.data[readPosition:readPosition+9])
readPosition += messageLengthLength
message = self.data[readPosition:readPosition+messageLength]
readPosition += messageLength
signature = self.data[readPosition:readPosition+nLength]
sendersPubkey = rsa.PublicKey(convertStringToInt(nString),convertStringToInt(eString))
#print 'senders Pubkey', sendersPubkey
try:
rsa.verify(self.data[readPositionAtBeginningOfMessageEncodingType:readPositionAtBeginningOfMessageEncodingType+messageEncodingTypeLength+messageLengthLength+messageLength],signature,sendersPubkey)
print 'verify passed'
except Exception, err:
print 'verify failed', err
return
#verify passed
fromAddress = encodeAddress(sendersAddressVersion,sendersStream,ripe.digest())
print 'fromAddress:', fromAddress
if messageEncodingType == 2:
bodyPositionIndex = string.find(message,'\nBody:')
if bodyPositionIndex > 1:
subject = message[8:bodyPositionIndex]
body = message[bodyPositionIndex+6:]
else:
subject = ''
body = message
elif messageEncodingType == 1:
body = message
subject = ''
elif messageEncodingType == 0:
print 'messageEncodingType == 0. Doing nothing with the message.'
else:
body = 'Unknown encoding type.\n\n' + repr(message)
subject = ''
toAddress = '[Broadcast subscribers]'
if messageEncodingType <> 0:
sqlLock.acquire()
t = (self.inventoryHash,toAddress,fromAddress,subject,int(time.time()),body,'inbox')
sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body)"""
#We have received a msg message.
def recmsg(self):
def recmsg(self,data):
self.messageProcessingStartTime = time.time()
#First we must check to make sure the proof of work is sufficient.
if not self.isProofOfWorkSufficient():
if not self.isProofOfWorkSufficient(data):
print 'Proof of work in msg message insufficient.'
return
readPosition = 32
embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4])
readPosition = 8
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
if embeddedTime > int(time.time())+10800:
print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime
return
@ -761,12 +680,12 @@ class receiveDataThread(QThread):
print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime
return
readPosition += 4
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(self.data[readPosition:readPosition+9])
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9])
if streamNumberAsClaimedByMsg != self.streamNumber:
print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.'
return
readPosition += streamNumberAsClaimedByMsgLength
self.inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
self.inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire()
if self.inventoryHash in inventory:
print 'We have already received this msg message. Ignoring.'
@ -778,12 +697,12 @@ class receiveDataThread(QThread):
return
#This msg message is valid. Let's let our peers know about it.
objectType = 'msg'
inventory[self.inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime)
inventory[self.inventoryHash] = (objectType, self.streamNumber, data, embeddedTime)
inventoryLock.release()
self.broadcastinv(self.inventoryHash)
self.emit(SIGNAL("incrementNumberOfMessagesProcessed()"))
self.processmsg(readPosition) #When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it.
self.processmsg(readPosition,data) #When this function returns, we will have either successfully processed the message bound for us, ignored it because it isn't bound for us, or found problem with the message that warranted ignoring it.
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are based on test timings and you may change them at-will.
if self.payloadLength > 100000000: #Size is greater than 100 megabytes
@ -805,232 +724,35 @@ class receiveDataThread(QThread):
printLock.acquire()
print 'Total message processing time:', time.time()- self.messageProcessingStartTime, 'seconds.'
printLock.release()
#This section is for my RSA keys (version 1 addresses). If we don't have any version 1 addresses it will never run. This code will soon be removed.
"""initialDecryptionSuccessful = False
infile = cStringIO.StringIO(self.data[readPosition:self.payloadLength+24])
outfile = cStringIO.StringIO()
#print 'len(myRSAAddressHashes.items()):', len(myRSAAddressHashes.items())
for key, value in myRSAAddressHashes.items():
try:
decrypt_bigfile(infile, outfile, value)
#The initial decryption passed though there is a small chance that the message isn't actually for me. We'll need to check that the 20 zeros are present.
#print 'initial decryption successful using key', repr(key)
initialDecryptionSuccessful = True
printLock.acquire()
print 'Initial decryption passed'
printLock.release()
break
except Exception, err:
infile.seek(0)
#print 'Exception:', err
#print 'outfile len is:', len(outfile.getvalue()),'data is:', repr(outfile.getvalue())
#print 'Initial decryption failed using key', value
#decryption failed for this key. The message is for someone else (or for a different key of mine).
if initialDecryptionSuccessful and outfile.getvalue()[:20] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00': #this run of 0s allows the true message receiver to identify his message
#This is clearly a message bound for me.
outfile.seek(0)
data = outfile.getvalue()
readPosition = 20 #To start reading past the 20 zero bytes
messageVersion, messageVersionLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += messageVersionLength
if messageVersion == 1:
bitfieldBehavior = data[readPosition:readPosition+4]
readPosition += 4
sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(data[readPosition:readPosition+10])
if sendersAddressVersionNumber == 1:
readPosition += sendersAddressVersionNumberLength
sendersStreamNumber, sendersStreamNumberLength = decodeVarint(data[readPosition:readPosition+10])
if sendersStreamNumber == 0:
print 'sendersStreamNumber = 0. Ignoring message'
else:
readPosition += sendersStreamNumberLength
sendersNLength, sendersNLengthLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += sendersNLengthLength
sendersN = data[readPosition:readPosition+sendersNLength]
readPosition += sendersNLength
sendersELength, sendersELengthLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += sendersELengthLength
sendersE = data[readPosition:readPosition+sendersELength]
readPosition += sendersELength
endOfThePublicKeyPosition = readPosition
messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += messageEncodingTypeLength
print 'Message Encoding Type:', messageEncodingType
messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+10])
print 'message length:', messageLength
readPosition += messageLengthLength
message = data[readPosition:readPosition+messageLength]
#print 'First 150 characters of message:', repr(message[:150])
readPosition += messageLength
ackLength, ackLengthLength = decodeVarint(data[readPosition:readPosition+10])
#print 'ackLength:', ackLength
readPosition += ackLengthLength
ackData = data[readPosition:readPosition+ackLength]
readPosition += ackLength
payloadSigniture = data[readPosition:readPosition+sendersNLength] #We're using the length of the sender's n because it should match the signiture size.
sendersPubkey = rsa.PublicKey(convertStringToInt(sendersN),convertStringToInt(sendersE))
print 'sender\'s Pubkey', sendersPubkey
#Check the cryptographic signiture
verifyPassed = False
try:
rsa.verify(data[:-len(payloadSigniture)],payloadSigniture, sendersPubkey)
print 'verify passed'
verifyPassed = True
except Exception, err:
print 'verify failed', err
if verifyPassed:
#calculate the fromRipe.
sha = hashlib.new('sha512')
sha.update(sendersN+sendersE)
ripe = hashlib.new('ripemd160')
ripe.update(sha.digest())
#Let's store the public key in case we want to reply to this person.
#We don't have the correct nonce in order to send out a pubkey message so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.)
t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+data[20+messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
blockMessage = False #Gets set to True if the user shouldn't see the message according to black or white lists.
fromAddress = encodeAddress(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest())
if config.get('bitmessagesettings', 'blackwhitelist') == 'black': #If we are using a blacklist
t = (fromAddress,)
sqlLock.acquire()
sqlSubmitQueue.put('''SELECT label, enabled FROM blacklist where address=?''')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
for row in queryreturn:
label, enabled = row
if enabled:
print 'Message ignored because address is in blacklist.'
blockMessage = True
else: #We're using a whitelist
t = (fromAddress,)
sqlLock.acquire()
sqlSubmitQueue.put('''SELECT label, enabled FROM whitelist where address=?''')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn == []:
print 'Message ignored because address not in whitelist.'
blockMessage = True
for row in queryreturn: #It could be in the whitelist but disabled. Let's check.
label, enabled = row
if not enabled:
print 'Message ignored because address in whitelist but not enabled.'
blockMessage = True
if not blockMessage:
print 'fromAddress:', fromAddress
print 'First 150 characters of message:', repr(message[:150])
#Look up the destination address (my address) based on the destination ripe hash.
#I realize that I could have a data structure devoted to this task, or maintain an indexed table
#in the sql database, but I would prefer to minimize the number of data structures this program
#uses. Searching linearly through the user's short list of addresses doesn't take very long anyway.
configSections = config.sections()
for addressInKeysFile in configSections:
if addressInKeysFile <> 'bitmessagesettings':
status,addressVersionNumber,streamNumber,hash = decodeAddress(addressInKeysFile)
if hash == key:
toAddress = addressInKeysFile
toLabel = config.get(addressInKeysFile, 'label')
if toLabel == '':
toLabel = addressInKeysFile
break
if messageEncodingType == 2:
bodyPositionIndex = string.find(message,'\nBody:')
if bodyPositionIndex > 1:
subject = message[8:bodyPositionIndex]
body = message[bodyPositionIndex+6:]
else:
subject = ''
body = message
elif messageEncodingType == 1:
body = message
subject = ''
elif messageEncodingType == 0:
print 'messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.'
else:
body = 'Unknown encoding type.\n\n' + repr(message)
subject = ''
print 'within recmsg, self.inventoryHash is', repr(self.inventoryHash)
if messageEncodingType <> 0:
sqlLock.acquire()
t = (self.inventoryHash,toAddress,fromAddress,subject,int(time.time()),body,'inbox')
sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body)
#Now let us worry about the acknowledgement data
#We'll need to make sure that our client will properly process the ackData; if the packet is malformed, it might cause us to clear out self.data and an attacker could use that behavior to determine that we decoded this message.
ackDataValidThusFar = True
if len(ackData) < 24:
print 'The length of ackData is unreasonably short. Not sending ackData.'
ackDataValidThusFar = False
if ackData[0:4] != '\xe9\xbe\xb4\xd9':
print 'Ackdata magic bytes were wrong. Not sending ackData.'
ackDataValidThusFar = False
if ackDataValidThusFar:
ackDataPayloadLength, = unpack('>L',ackData[16:20])
if len(ackData)-24 != ackDataPayloadLength: #This ackData includes the protocol header which is not counted in the payload length.
print 'ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.'
ackDataValidThusFar = False
if ackDataValidThusFar:
print 'ackData is valid. Will process it.'
self.ackDataThatWeHaveYetToSend.append(ackData) #When we have processed all data, the processData function will pop the ackData out and process it as if it is a message received from our peer.
else:
print 'This program cannot decode messages from addresses with versions higher than 1. Ignoring.'
statusbar = 'This program cannot decode messages from addresses with versions higher than 1. Ignoring it.'
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),statusbar)
else:
statusbar = 'Error: Cannot decode incoming msg versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage. Ignoring message.'
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),statusbar)
else:
printLock.acquire()
print 'Could not decrypt with any RSA keys if you have any.'
printLock.release()
infile.close()
outfile.close()"""
#A msg message has a valid time and POW and requires processing. The recmsg function calls this one.
def processmsg(self,readPosition):
def processmsg(self,readPosition, encryptedData):
initialDecryptionSuccessful = False
#Let's check whether this is a message acknowledgement bound for us.
if self.data[readPosition:24+self.payloadLength] in ackdataForWhichImWatching:
if encryptedData[readPosition:] in ackdataForWhichImWatching:
printLock.acquire()
print 'This msg IS an acknowledgement bound for me.'
printLock.release()
del ackdataForWhichImWatching[self.data[readPosition:24+self.payloadLength]]
t = ('ackreceived',self.data[readPosition:24+self.payloadLength])
del ackdataForWhichImWatching[encryptedData[readPosition:]]
t = ('ackreceived',encryptedData[readPosition:])
sqlLock.acquire()
sqlSubmitQueue.put('UPDATE sent SET status=? WHERE ackdata=?')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),self.data[readPosition:24+self.payloadLength],'Acknowledgement of the message received just now.')
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.')
return
else:
printLock.acquire()
print 'This was NOT an acknowledgement bound for me.' #Msg potential ack data:', repr(self.data[readPosition:24+self.payloadLength])
print 'This was NOT an acknowledgement bound for me.'
#print 'ackdataForWhichImWatching', ackdataForWhichImWatching
printLock.release()
#This is not an acknowledgement bound for me. See if it is a message bound for me by trying to decrypt it with my private keys.
for key, cryptorObject in myECAddressHashes.items():
try:
data = cryptorObject.decrypt(self.data[readPosition:self.payloadLength+24])
unencryptedData = cryptorObject.decrypt(encryptedData[readPosition:])
toRipe = key #This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
print 'EC decryption successful using key associated with ripe hash:', key.encode('hex')
@ -1046,12 +768,12 @@ class receiveDataThread(QThread):
else:
#This is a message bound for me.
readPosition = 0
messageVersion, messageVersionLength = decodeVarint(data[readPosition:readPosition+10])
messageVersion, messageVersionLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += messageVersionLength
if messageVersion != 1:
print 'Cannot understand message versions other than one. Ignoring message.'
return
sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(data[readPosition:readPosition+10])
sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += sendersAddressVersionNumberLength
if sendersAddressVersionNumber == 0:
print 'Cannot understand sendersAddressVersionNumber = 0. Ignoring message.'
@ -1059,47 +781,47 @@ class receiveDataThread(QThread):
if sendersAddressVersionNumber >= 3:
print 'Sender\'s address version number', sendersAddressVersionNumber, ' not yet supported. Ignoring message.'
return
if len(data) < 170:
if len(unencryptedData) < 170:
print 'Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message.'
return
sendersStreamNumber, sendersStreamNumberLength = decodeVarint(data[readPosition:readPosition+10])
sendersStreamNumber, sendersStreamNumberLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
if sendersStreamNumber == 0:
print 'sender\'s stream number is 0. Ignoring message.'
return
readPosition += sendersStreamNumberLength
behaviorBitfield = data[readPosition:readPosition+4]
behaviorBitfield = unencryptedData[readPosition:readPosition+4]
readPosition += 4
pubSigningKey = '\x04' + data[readPosition:readPosition+64]
pubSigningKey = '\x04' + unencryptedData[readPosition:readPosition+64]
readPosition += 64
pubEncryptionKey = '\x04' + data[readPosition:readPosition+64]
pubEncryptionKey = '\x04' + unencryptedData[readPosition:readPosition+64]
readPosition += 64
endOfThePublicKeyPosition = readPosition #needed for when we store the pubkey in our database of pubkeys for later use.
if toRipe != data[readPosition:readPosition+20]:
if toRipe != unencryptedData[readPosition:readPosition+20]:
printLock.acquire()
print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.'
print 'See: http://tools.ietf.org/html/draft-ietf-smime-sender-auth-00'
print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
print 'your toRipe:', toRipe.encode('hex')
print 'embedded destination toRipe:', data[readPosition:readPosition+20].encode('hex')
print 'embedded destination toRipe:', unencryptedData[readPosition:readPosition+20].encode('hex')
printLock.release()
return
readPosition += 20
messageEncodingType, messageEncodingTypeLength = decodeVarint(data[readPosition:readPosition+10])
messageEncodingType, messageEncodingTypeLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += messageEncodingTypeLength
messageLength, messageLengthLength = decodeVarint(data[readPosition:readPosition+10])
messageLength, messageLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += messageLengthLength
message = data[readPosition:readPosition+messageLength]
message = unencryptedData[readPosition:readPosition+messageLength]
#print 'First 150 characters of message:', repr(message[:150])
readPosition += messageLength
ackLength, ackLengthLength = decodeVarint(data[readPosition:readPosition+10])
ackLength, ackLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += ackLengthLength
ackData = data[readPosition:readPosition+ackLength]
ackData = unencryptedData[readPosition:readPosition+ackLength]
readPosition += ackLength
positionOfBottomOfAckData = readPosition #needed to mark the end of what is covered by the signature
signatureLength, signatureLengthLength = decodeVarint(data[readPosition:readPosition+10])
signatureLength, signatureLengthLength = decodeVarint(unencryptedData[readPosition:readPosition+10])
readPosition += signatureLengthLength
signature = data[readPosition:readPosition+signatureLength]
signature = unencryptedData[readPosition:readPosition+signatureLength]
try:
highlevelcrypto.verify(data[:positionOfBottomOfAckData],signature,pubSigningKey.encode('hex'))
highlevelcrypto.verify(unencryptedData[:positionOfBottomOfAckData],signature,pubSigningKey.encode('hex'))
print 'ECDSA verify passed'
except Exception, err:
print 'ECDSA verify failed', err
@ -1114,7 +836,7 @@ class receiveDataThread(QThread):
ripe.update(sha.digest())
#Let's store the public key in case we want to reply to this person.
#We don't have the correct nonce or time (which would let us send out a pubkey message) so we'll just fill it with 1's. We won't be able to send this pubkey to others (without doing the proof of work ourselves, which this program is programmed to not do.)
t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+data[messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes')
t = (ripe.digest(),False,'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'+'\xFF\xFF\xFF\xFF'+unencryptedData[messageVersionLength:endOfThePublicKeyPosition],int(time.time()),'yes')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
@ -1265,18 +987,17 @@ class receiveDataThread(QThread):
return '['+mailingListName+'] ' + subject
#We have received a pubkey
def recpubkey(self):
def recpubkey(self,data):
self.pubkeyProcessingStartTime = time.time()
if self.payloadLength < 146: #sanity check
if len(data) < 146 or len(data) >600: #sanity check
return
#We must check to make sure the proof of work is sufficient.
if not self.isProofOfWorkSufficient():
if not self.isProofOfWorkSufficient(data):
print 'Proof of work in pubkey message insufficient.'
return
readPosition = 24 #for the message header
readPosition += 8 #for the nonce
embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4])
readPosition = 8 #for the nonce
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys-86400: #If the pubkey is more than a month old then reject it. (the 86400 is included to give an extra day of wiggle-room. If the wiggle-room is actually of any use, everyone on the network will delete this pubkey from their database the next time the cleanerThread cleans anyway- except for the node that actually wants the pubkey.)
printLock.acquire()
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
@ -1288,15 +1009,15 @@ class receiveDataThread(QThread):
printLock.release()
return
readPosition += 4 #for the time
addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += varintLength
streamNumber, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += varintLength
if self.streamNumber != streamNumber:
print 'stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.'
return
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire()
if inventoryHash in inventory:
print 'We have already received this pubkey. Ignoring it.'
@ -1307,12 +1028,12 @@ class receiveDataThread(QThread):
inventoryLock.release()
return
objectType = 'pubkey'
inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], int(time.time()))
inventory[inventoryHash] = (objectType, self.streamNumber, data, int(time.time()))
inventoryLock.release()
self.broadcastinv(inventoryHash)
self.emit(SIGNAL("incrementNumberOfPubkeysProcessed()"))
self.processpubkey()
self.processpubkey(data)
lengthOfTimeWeShouldUseToProcessThisMessage = .2
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - (time.time()- self.pubkeyProcessingStartTime)
@ -1325,14 +1046,13 @@ class receiveDataThread(QThread):
#print 'Total pubkey processing time:', time.time()- self.pubkeyProcessingStartTime, 'seconds.'
#printLock.release()
def processpubkey(self):
readPosition = 24 #for the message header
readPosition += 8 #for the nonce
embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4])
def processpubkey(self,data):
readPosition = 8 #for the nonce
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
readPosition += 4 #for the time
addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += varintLength
streamNumber, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += varintLength
if addressVersion == 0:
print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.'
@ -1346,12 +1066,12 @@ class receiveDataThread(QThread):
if self.payloadLength < 146: #sanity check. This is the minimum possible length.
print 'payloadLength less than 146. Sanity check failed.'
return
bitfieldBehaviors = self.data[readPosition:readPosition+4]
bitfieldBehaviors = data[readPosition:readPosition+4]
readPosition += 4
publicSigningKey = self.data[readPosition:readPosition+64]
publicSigningKey = data[readPosition:readPosition+64]
#Is it possible for a public key to be invalid such that trying to encrypt or sign with it will cause an error? If it is, we should probably test these keys here.
readPosition += 64
publicEncryptionKey = self.data[readPosition:readPosition+64]
publicEncryptionKey = data[readPosition:readPosition+64]
if len(publicEncryptionKey) < 64:
print 'publicEncryptionKey length less than 64. Sanity check failed.'
return
@ -1376,7 +1096,7 @@ class receiveDataThread(QThread):
sqlLock.release()
if queryreturn != []: #if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe,True,self.data[24:24+self.payloadLength],embeddedTime,'yes')
t = (ripe,True,data,embeddedTime,'yes')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
@ -1387,7 +1107,7 @@ class receiveDataThread(QThread):
workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
else:
print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe,True,self.data[24:24+self.payloadLength],embeddedTime,'no') #This will also update the embeddedTime.
t = (ripe,True,data,embeddedTime,'no') #This will also update the embeddedTime.
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
@ -1397,66 +1117,16 @@ class receiveDataThread(QThread):
printLock.release()
workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
#This code which deals with old RSA addresses will soon be removed.
"""elif addressVersion == 1:
nLength, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
readPosition += varintLength
nString = self.data[readPosition:readPosition+nLength]
readPosition += nLength
eLength, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
readPosition += varintLength
eString = self.data[readPosition:readPosition+eLength]
readPosition += eLength
sha = hashlib.new('sha512')
sha.update(nString+eString)
ripeHasher = hashlib.new('ripemd160')
ripeHasher.update(sha.digest())
ripe = ripeHasher.digest()
print 'within recpubkey, addressVersion', addressVersion
print 'streamNumber', streamNumber
print 'ripe', repr(ripe)
print 'n=', convertStringToInt(nString)
print 'e=', convertStringToInt(eString)
t = (ripe,)
sqlLock.acquire()
sqlSubmitQueue.put('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND usedpersonally='yes' ''')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
if queryreturn != []: #if this pubkey is already in our database and if we have used it personally:
print 'We HAVE used this pubkey personally. Updating time.'
t = (ripe,True,self.data[24:24+self.payloadLength],int(time.time()),'yes')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
printLock.acquire()
print 'added foreign pubkey into our database'
printLock.release()
workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
else:
print 'We have NOT used this pubkey personally. Inserting in database.'
t = (ripe,True,self.data[24:24+self.payloadLength],int(time.time()),'no')
sqlLock.acquire()
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''')
sqlSubmitQueue.put(t)
sqlReturnQueue.get()
sqlLock.release()
printLock.acquire()
print 'added foreign pubkey into our database'
printLock.release()
workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))"""
#We have received a getpubkey message
def recgetpubkey(self):
if not self.isProofOfWorkSufficient():
def recgetpubkey(self,data):
if not self.isProofOfWorkSufficient(data):
print 'Proof of work in getpubkey message insufficient.'
return
embeddedTime, = unpack('>I',self.data[32:36])
if len(data) < 34:
print 'getpubkey message doesn\'t contain enough data. Ignoring.'
return
embeddedTime, = unpack('>I',data[8:12])
if embeddedTime > int(time.time())+10800:
print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime
return
@ -1464,13 +1134,13 @@ class receiveDataThread(QThread):
print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime
return
addressVersionNumber, addressVersionLength = decodeVarint(self.data[36:42])
streamNumber, streamNumberLength = decodeVarint(self.data[36+addressVersionLength:42+addressVersionLength])
addressVersionNumber, addressVersionLength = decodeVarint(data[12:22])
streamNumber, streamNumberLength = decodeVarint(data[12+addressVersionLength:22+addressVersionLength])
if streamNumber <> self.streamNumber:
print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber
return
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire()
if inventoryHash in inventory:
print 'We have already received this getpubkey request. Ignoring it.'
@ -1482,7 +1152,7 @@ class receiveDataThread(QThread):
return
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0
objectType = 'getpubkey'
inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime)
inventory[inventoryHash] = (objectType, self.streamNumber, data, embeddedTime)
inventoryLock.release()
#This getpubkey request is valid so far. Forward to peers.
self.broadcastinv(inventoryHash)
@ -1496,10 +1166,15 @@ class receiveDataThread(QThread):
elif addressVersionNumber > 2:
print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.'
return
print 'the hash requested in this getpubkey request is:', self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength].encode('hex')
requestedHash = data[12+addressVersionLength+streamNumberLength:32+addressVersionLength+streamNumberLength]
if len(requestedHash) != 20:
print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.'
return
print 'the hash requested in this getpubkey request is:', requestedHash.encode('hex')
sqlLock.acquire()
t = (self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength],int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) #this prevents SQL injection
t = (requestedHash,int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) #this prevents SQL injection
sqlSubmitQueue.put('''SELECT hash, transmitdata, time FROM pubkeys WHERE hash=? AND havecorrectnonce=1 AND time>?''')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
@ -1515,48 +1190,11 @@ class receiveDataThread(QThread):
inventory[inventoryHash] = (objectType, self.streamNumber, payload, timeEncodedInPubkey)#If the time embedded in this pubkey is more than 3 days old then this object isn't going to last very long in the inventory- the cleanerThread is going to come along and move it from the inventory in memory to the SQL inventory and then delete it from the SQL inventory. It should still find its way back to the original requestor if he is online however.
self.broadcastinv(inventoryHash)
else: #the pubkey is not in our database of pubkeys. Let's check if the requested key is ours (which would mean we should do the POW, put it in the pubkey table, and broadcast out the pubkey.)
if self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength] in myECAddressHashes: #if this address hash is one of mine
if requestedHash in myECAddressHashes: #if this address hash is one of mine
printLock.acquire()
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
printLock.release()
myAddress = encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength])
workerQueue.put(('doPOWForMyV2Pubkey',myAddress))
#This code which deals with old RSA addresses will soon be removed.
"""elif self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength] in myRSAAddressHashes:
print 'Found getpubkey requested hash in my list of RSA hashes.'
payload = '\x00\x00\x00\x01' #bitfield of features supported by me (see the wiki).
payload += self.data[36:36+addressVersionLength+streamNumberLength]
#print int(config.get(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'n'))
nString = convertIntToString(int(config.get(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'n')))
eString = convertIntToString(config.getint(encodeAddress(addressVersionNumber,streamNumber,self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength]), 'e'))
payload += encodeVarint(len(nString))
payload += nString
payload += encodeVarint(len(eString))
payload += eString
nonce = 0
trialValue = 99999999999999999999
target = 2**64 / ((len(payload)+payloadLengthExtraBytes+8) * averageProofOfWorkNonceTrialsPerByte)
print '(For pubkey message) Doing proof of work...'
initialHash = hashlib.sha512(payload).digest()
while trialValue > target:
nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
print '(For pubkey message) Found proof of work', trialValue, 'Nonce:', nonce
payload = pack('>Q',nonce) + payload
t = (self.data[36+addressVersionLength+streamNumberLength:56+addressVersionLength+streamNumberLength],True,payload,int(time.time())+1209600) #after two weeks (1,209,600 seconds), we may remove our own pub key from our database. It will be regenerated and put back in the database if it is requested.
sqlLock.acquire()
#** pubkeys insert query not yet fixed! **
sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?)''')
sqlSubmitQueue.put(t)
queryreturn = sqlReturnQueue.get()
sqlLock.release()
inventoryHash = calculateInventoryHash(payload)
objectType = 'pubkey'
inventory[inventoryHash] = (objectType, self.streamNumber, payload, int(time.time()))
self.broadcastinv(inventoryHash) """
workerQueue.put(('doPOWForMyV2Pubkey',requestedHash))
else:
printLock.acquire()
print 'This getpubkey request is not for any of my keys.'
@ -1564,26 +1202,26 @@ class receiveDataThread(QThread):
#We have received an inv message
def recinv(self):
numberOfItemsInInv, lengthOfVarint = decodeVarint(self.data[24:34])
def recinv(self,data):
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
if len(data) < lengthOfVarint + (numberOfItemsInInv * 32):
print 'inv message doesn\'t contain enough data. Ignoring.'
return
if numberOfItemsInInv == 1: #we'll just request this data from the person who advertised the object.
for i in range(numberOfItemsInInv):
if len(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
if self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)] in inventory:
printLock.acquire()
print 'Inventory (in memory) has inventory item already.'
printLock.release()
elif isInSqlInventory(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]):
print 'Inventory (SQL on disk) has inventory item already.'
else:
self.sendgetdata(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)])
if data[lengthOfVarint:32+lengthOfVarint] in inventory:
printLock.acquire()
print 'Inventory (in memory) has inventory item already.'
printLock.release()
elif isInSqlInventory(data[lengthOfVarint:32+lengthOfVarint]):
print 'Inventory (SQL on disk) has inventory item already.'
else:
self.sendgetdata(data[lengthOfVarint:32+lengthOfVarint])
else:
print 'inv message lists', numberOfItemsInInv, 'objects.'
for i in range(numberOfItemsInInv): #upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
if len(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
if len(data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]) == 32: #The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]] = 0
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[data[lengthOfVarint+(32*i):32+lengthOfVarint+(32*i)]] = 0
#Send a getdata message to our peer to request the object with the given hash
@ -1603,12 +1241,12 @@ class receiveDataThread(QThread):
printLock.release()
#We have received a getdata request from our peer
def recgetdata(self):
value, lengthOfVarint = decodeVarint(self.data[24:34])
def recgetdata(self, data):
value, lengthOfVarint = decodeVarint(data[:10])
#print 'Number of items in getdata request:', value
try:
for i in xrange(value):
hash = self.data[24+lengthOfVarint+(i*32):56+lengthOfVarint+(i*32)]
hash = data[lengthOfVarint+(i*32):32+lengthOfVarint+(i*32)]
printLock.acquire()
print 'received getdata request for item:', hash.encode('hex')
printLock.release()
@ -1682,10 +1320,10 @@ class receiveDataThread(QThread):
#We have received an addr message.
def recaddr(self):
def recaddr(self,data):
listOfAddressDetailsToBroadcastToPeers = []
numberOfAddressesIncluded = 0
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(self.data[24:29])
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(data[:10])
if verbose >= 1:
printLock.acquire()
@ -1702,61 +1340,56 @@ class receiveDataThread(QThread):
needToWriteKnownNodesToDisk = False
for i in range(0,numberOfAddressesIncluded):
try:
if self.data[40+lengthOfNumberOfAddresses+(34*i):52+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
if data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
printLock.acquire()
print 'Skipping IPv6 address.', repr(self.data[40+lengthOfNumberOfAddresses+(34*i):56+lengthOfNumberOfAddresses+(34*i)])
print 'Skipping IPv6 address.', repr(data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)])
printLock.release()
continue
#print repr(self.data[6+lengthOfNumberOfAddresses+(34*i):18+lengthOfNumberOfAddresses+(34*i)])
except Exception, err:
if verbose >= 2:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
printLock.release()
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I',self.data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)])
recaddrStream, = unpack('>I',data[4+lengthOfNumberOfAddresses+(34*i):8+lengthOfNumberOfAddresses+(34*i)])
except Exception, err:
if verbose >= 2:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
printLock.release()
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
continue
try:
recaddrServices, = unpack('>Q',self.data[32+lengthOfNumberOfAddresses+(34*i):40+lengthOfNumberOfAddresses+(34*i)])
recaddrServices, = unpack('>Q',data[8+lengthOfNumberOfAddresses+(34*i):16+lengthOfNumberOfAddresses+(34*i)])
except Exception, err:
if verbose >= 2:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
printLock.release()
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try:
recaddrPort, = unpack('>H',self.data[56+lengthOfNumberOfAddresses+(34*i):58+lengthOfNumberOfAddresses+(34*i)])
recaddrPort, = unpack('>H',data[32+lengthOfNumberOfAddresses+(34*i):34+lengthOfNumberOfAddresses+(34*i)])
except Exception, err:
if verbose >= 2:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
printLock.release()
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
#print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i
hostFromAddrMessage = socket.inet_ntoa(self.data[52+lengthOfNumberOfAddresses+(34*i):56+lengthOfNumberOfAddresses+(34*i)])
hostFromAddrMessage = socket.inet_ntoa(data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)])
#print 'hostFromAddrMessage', hostFromAddrMessage
if self.data[52+lengthOfNumberOfAddresses+(34*i)] == '\x7F':
if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x7F':
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
continue
if self.data[52+lengthOfNumberOfAddresses+(34*i)] == '\x0A':
if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x0A':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
if self.data[52+lengthOfNumberOfAddresses+(34*i):52+lengthOfNumberOfAddresses+(34*i)+2] == '\xC0A8':
if data[28+lengthOfNumberOfAddresses+(34*i):30+lengthOfNumberOfAddresses+(34*i)] == '\xC0A8':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',self.data[24+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
knownNodes[recaddrStream] = {}
if hostFromAddrMessage not in knownNodes[recaddrStream]:
@ -1799,7 +1432,7 @@ class receiveDataThread(QThread):
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
datatosend = datatosend + payload
if verbose >= 2:
if verbose >= 1:
printLock.acquire()
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
printLock.release()
@ -1872,33 +1505,31 @@ class receiveDataThread(QThread):
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
datatosend = datatosend + payload
if verbose >= 2:
if verbose >= 1:
printLock.acquire()
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
printLock.release()
self.sock.send(datatosend)
#We have received a version message
def recversion(self):
def recversion(self,data):
if self.payloadLength < 83:
#This version message is unreasonably short. Forget it.
return
elif not self.verackSent: #There is a potential exploit if we don't check to make sure that we have not already received and accepted a version message: An attacker could connect directly to us, send a msg message with the ackdata set to an invalid version message which would cause us to close the connection to the attacker thus proving that we were able to decode the message. Checking the connectionIsOrWasFullyEstablished variable would also suffice.
self.remoteProtocolVersion, = unpack('>L',self.data[24:28])
elif not self.verackSent:
self.remoteProtocolVersion, = unpack('>L',data[:4])
#print 'remoteProtocolVersion', self.remoteProtocolVersion
self.myExternalIP = socket.inet_ntoa(self.data[64:68])
self.myExternalIP = socket.inet_ntoa(data[40:44])
#print 'myExternalIP', self.myExternalIP
self.remoteNodeIncomingPort, = unpack('>H',self.data[94:96])
self.remoteNodeIncomingPort, = unpack('>H',data[70:72])
#print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
#print 'self.data[96:104]', repr(self.data[96:104])
#print 'eightBytesOfRandomDataUsedToDetectConnectionsToSelf', repr(eightBytesOfRandomDataUsedToDetectConnectionsToSelf)
useragentLength, lengthOfUseragentVarint = decodeVarint(self.data[104:108])
readPosition = 104 + lengthOfUseragentVarint
useragent = self.data[readPosition:readPosition+useragentLength]
useragentLength, lengthOfUseragentVarint = decodeVarint(data[80:84])
readPosition = 80 + lengthOfUseragentVarint
useragent = data[readPosition:readPosition+useragentLength]
readPosition += useragentLength
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(self.data[readPosition:])
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(data[readPosition:])
readPosition += lengthOfNumberOfStreamsInVersionMessage
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(self.data[readPosition:])
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:])
printLock.acquire()
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
printLock.release()
@ -1907,17 +1538,15 @@ class receiveDataThread(QThread):
printLock.acquire()
print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber,'.'
printLock.release()
self.data = ''
return
#If this was an incoming connection, then the sendData thread doesn't know the stream. We have to set it.
if not self.initiatedConnection:
broadcastToSendDataQueues((0,'setStreamNumber',(self.HOST,self.streamNumber)))
if self.data[96:104] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
if data[72:80] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
self.sock.close()
printLock.acquire()
print 'Closing connection to myself: ', self.HOST
printLock.release()
self.data = ''
return
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
@ -1925,12 +1554,6 @@ class receiveDataThread(QThread):
pickle.dump(knownNodes, output)
output.close()
#I've commented out this code because it should be up to the newer node to decide whether their protocol version is incompatiable with the remote node's version.
'''if self.remoteProtocolVersion > 1:
print 'The remote node''s protocol version is too new for this program to understand. Disconnecting. It is:', self.remoteProtocolVersion
self.sock.close()
self.selfInitiatedConnectionList.remove(self)
else:'''
self.sendverack()
if self.initiatedConnection == False:
self.sendversion()
@ -2520,11 +2143,19 @@ class singleWorker(QThread):
workerQueue.task_done()
def doPOWForMyV2Pubkey(self,myAddress): #This function also broadcasts out the pubkey message once it is done with the POW
status,addressVersionNumber,streamNumber,hash = decodeAddress(myAddress)
def doPOWForMyV2Pubkey(self,hash): #This function also broadcasts out the pubkey message once it is done with the POW
#Look up my stream number based on my address hash
configSections = config.sections()
for addressInKeysFile in configSections:
if addressInKeysFile <> 'bitmessagesettings':
status,addressVersionNumber,streamNumber,hashFromThisParticularAddress = decodeAddress(addressInKeysFile)
if hash == hashFromThisParticularAddress:
myAddress = addressInKeysFile
break
embeddedTime = int(time.time())+random.randrange(-300, 300) #the current time plus or minus five minutes
payload = pack('>I',(embeddedTime))
payload += encodeVarint(2) #Address version number
payload += encodeVarint(addressVersionNumber) #Address version number
payload += encodeVarint(streamNumber)
payload += '\x00\x00\x00\x01'