2013-11-14 03:45:10 +00:00
import time
import threading
import shared
import hashlib
import random
from struct import unpack , pack
import sys
import string
from subprocess import call # used when the API must execute an outside program
from pyelliptic . openssl import OpenSSL
import highlevelcrypto
from addresses import *
import helper_generic
2014-05-02 14:46:36 +00:00
from helper_generic import addDataPadding
2013-11-14 03:45:10 +00:00
import helper_bitcoin
import helper_inbox
import helper_sent
from helper_sql import *
import tr
from debug import logger
2014-08-06 02:01:01 +00:00
import l10n
2013-11-14 03:45:10 +00:00
class objectProcessor ( threading . Thread ) :
"""
The objectProcessor thread , of which there is only one , receives network
objecs ( msg , broadcast , pubkey , getpubkey ) from the receiveDataThreads .
"""
def __init__ ( self ) :
threading . Thread . __init__ ( self )
2013-12-02 06:35:34 +00:00
"""
It may be the case that the last time Bitmessage was running , the user
closed it before it finished processing everything in the
objectProcessorQueue . Assuming that Bitmessage wasn ' t closed forcefully,
it should have saved the data in the queue into the objectprocessorqueue
table . Let ' s pull it out.
"""
queryreturn = sqlQuery (
''' SELECT objecttype, data FROM objectprocessorqueue ''' )
with shared . objectProcessorQueueSizeLock :
for row in queryreturn :
objectType , data = row
shared . objectProcessorQueueSize + = len ( data )
shared . objectProcessorQueue . put ( ( objectType , data ) )
sqlExecute ( ''' DELETE FROM objectprocessorqueue ''' )
logger . debug ( ' Loaded %s objects from disk into the objectProcessorQueue. ' % str ( len ( queryreturn ) ) )
2013-11-14 03:45:10 +00:00
def run ( self ) :
while True :
2013-11-20 06:29:37 +00:00
objectType , data = shared . objectProcessorQueue . get ( )
2013-11-14 03:45:10 +00:00
2013-11-20 06:29:37 +00:00
if objectType == ' getpubkey ' :
self . processgetpubkey ( data )
elif objectType == ' pubkey ' :
self . processpubkey ( data )
elif objectType == ' msg ' :
2013-11-14 03:45:10 +00:00
self . processmsg ( data )
2013-11-20 06:29:37 +00:00
elif objectType == ' broadcast ' :
self . processbroadcast ( data )
2013-12-02 06:35:34 +00:00
elif objectType == ' checkShutdownVariable ' : # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable.
pass
2013-11-20 06:29:37 +00:00
else :
logger . critical ( ' Error! Bug! The class_objectProcessor was passed an object type it doesn \' t recognize: %s ' % str ( objectType ) )
with shared . objectProcessorQueueSizeLock :
shared . objectProcessorQueueSize - = len ( data ) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
2013-12-02 06:35:34 +00:00
if shared . shutdown :
time . sleep ( .5 ) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute ( ) as sql :
while shared . objectProcessorQueueSize > 1 :
objectType , data = shared . objectProcessorQueue . get ( )
sql . execute ( ''' INSERT INTO objectprocessorqueue VALUES (?,?) ''' ,
objectType , data )
with shared . objectProcessorQueueSizeLock :
shared . objectProcessorQueueSize - = len ( data ) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
numberOfObjectsThatWereInTheObjectProcessorQueue + = 1
logger . debug ( ' Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting. ' % str ( numberOfObjectsThatWereInTheObjectProcessorQueue ) )
shared . shutdown = 2
break
2013-11-20 06:29:37 +00:00
def processgetpubkey ( self , data ) :
readPosition = 8 # bypass the nonce
embeddedTime , = unpack ( ' >I ' , data [ readPosition : readPosition + 4 ] )
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0 :
embeddedTime , = unpack ( ' >Q ' , data [ readPosition : readPosition + 8 ] )
readPosition + = 8
else :
readPosition + = 4
requestedAddressVersionNumber , addressVersionLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = addressVersionLength
streamNumber , streamNumberLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = streamNumberLength
if requestedAddressVersionNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is zero. That doesn \' t make any sense. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
elif requestedAddressVersionNumber == 1 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is 1 which isn \' t supported anymore. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
elif requestedAddressVersionNumber > 4 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is too high. Can \' t understand. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
myAddress = ' '
if requestedAddressVersionNumber < = 3 :
requestedHash = data [ readPosition : readPosition + 20 ]
if len ( requestedHash ) != 20 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The length of the requested hash is not 20 bytes. Something is wrong. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' the hash requested in this getpubkey request is: %s ' % requestedHash . encode ( ' hex ' ) )
2013-11-20 06:29:37 +00:00
if requestedHash in shared . myAddressesByHash : # if this address hash is one of mine
myAddress = shared . myAddressesByHash [ requestedHash ]
elif requestedAddressVersionNumber > = 4 :
requestedTag = data [ readPosition : readPosition + 32 ]
if len ( requestedTag ) != 32 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The length of the requested tag is not 32 bytes. Something is wrong. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . debug ( ' the tag requested in this getpubkey request is: %s ' % requestedTag . encode ( ' hex ' ) )
2013-11-20 06:29:37 +00:00
if requestedTag in shared . myAddressesByTag :
myAddress = shared . myAddressesByTag [ requestedTag ]
if myAddress == ' ' :
2014-01-17 01:10:04 +00:00
logger . info ( ' This getpubkey request is not for any of my keys. ' )
2013-11-20 06:29:37 +00:00
return
if decodeAddress ( myAddress ) [ 1 ] != requestedAddressVersionNumber :
2014-01-17 01:10:04 +00:00
logger . warning ( ' (Within the processgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn \' t match my actual address version number. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
if decodeAddress ( myAddress ) [ 2 ] != streamNumber :
2014-01-17 01:10:04 +00:00
logger . warning ( ' (Within the processgetpubkey function) Someone requested one of my pubkeys but the stream number on which we heard this getpubkey object doesn \' t match this address \' stream number. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
if shared . safeConfigGetBoolean ( myAddress , ' chan ' ) :
2014-01-17 01:10:04 +00:00
logger . info ( ' Ignoring getpubkey request because it is for one of my chan addresses. The other party should already have the pubkey. ' )
2013-11-20 06:29:37 +00:00
return
try :
2013-12-06 06:52:19 +00:00
lastPubkeySendTime = int ( shared . config . get (
2013-11-20 06:29:37 +00:00
myAddress , ' lastpubkeysendtime ' ) )
except :
lastPubkeySendTime = 0
if lastPubkeySendTime > time . time ( ) - shared . lengthOfTimeToHoldOnToAllPubkeys : # If the last time we sent our pubkey was more recent than 28 days ago...
2014-01-17 01:10:04 +00:00
logger . info ( ' Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s ' % lastPubkeySendTime )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out. ' )
2013-11-20 06:29:37 +00:00
if requestedAddressVersionNumber == 2 :
shared . workerQueue . put ( (
' doPOWForMyV2Pubkey ' , requestedHash ) )
elif requestedAddressVersionNumber == 3 :
shared . workerQueue . put ( (
' sendOutOrStoreMyV3Pubkey ' , requestedHash ) )
elif requestedAddressVersionNumber == 4 :
shared . workerQueue . put ( (
' sendOutOrStoreMyV4Pubkey ' , myAddress ) )
def processpubkey ( self , data ) :
pubkeyProcessingStartTime = time . time ( )
shared . numberOfPubkeysProcessed + = 1
shared . UISignalQueue . put ( (
' updateNumberOfPubkeysProcessed ' , ' no data ' ) )
readPosition = 8 # bypass the nonce
embeddedTime , = unpack ( ' >I ' , data [ readPosition : readPosition + 4 ] )
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0 :
embeddedTime , = unpack ( ' >Q ' , data [ readPosition : readPosition + 8 ] )
readPosition + = 8
else :
readPosition + = 4
addressVersion , varintLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = varintLength
streamNumber , varintLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = varintLength
if addressVersion == 0 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' (Within processpubkey) addressVersion of 0 doesn \' t make sense. ' )
2013-11-20 06:29:37 +00:00
return
if addressVersion > 4 or addressVersion == 1 :
2014-01-17 01:10:04 +00:00
logger . info ( ' This version of Bitmessage cannot handle version %s addresses. ' % addressVersion )
2013-11-20 06:29:37 +00:00
return
if addressVersion == 2 :
if len ( data ) < 146 : # sanity check. This is the minimum possible length.
2014-01-17 01:10:04 +00:00
logger . debug ( ' (within processpubkey) payloadLength less than 146. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
bitfieldBehaviors = data [ readPosition : readPosition + 4 ]
readPosition + = 4
publicSigningKey = data [ readPosition : readPosition + 64 ]
# Is it possible for a public key to be invalid such that trying to
# encrypt or sign with it will cause an error? If it is, we should
# probably test these keys here.
readPosition + = 64
publicEncryptionKey = data [ readPosition : readPosition + 64 ]
if len ( publicEncryptionKey ) < 64 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' publicEncryptionKey length less than 64. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
sha = hashlib . new ( ' sha512 ' )
sha . update (
' \x04 ' + publicSigningKey + ' \x04 ' + publicEncryptionKey )
ripeHasher = hashlib . new ( ' ripemd160 ' )
ripeHasher . update ( sha . digest ( ) )
ripe = ripeHasher . digest ( )
2014-01-17 01:10:04 +00:00
logger . info ( ' within recpubkey, addressVersion: %s , streamNumber: %s \n \
ripe % s \n \
publicSigningKey in hex : % s \n \
publicEncryptionKey in hex : % s ' % (addressVersion,
streamNumber ,
ripe . encode ( ' hex ' ) ,
publicSigningKey . encode ( ' hex ' ) ,
publicEncryptionKey . encode ( ' hex ' )
)
)
2013-11-20 06:29:37 +00:00
queryreturn = sqlQuery (
''' SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally= ' yes ' ''' , ripe , addressVersion )
if queryreturn != [ ] : # if this pubkey is already in our database and if we have used it personally:
2014-01-17 01:10:04 +00:00
logger . info ( ' We HAVE used this pubkey personally. Updating time. ' )
2013-11-20 06:29:37 +00:00
t = ( ripe , addressVersion , data , embeddedTime , ' yes ' )
else :
2014-01-17 01:10:04 +00:00
logger . info ( ' We have NOT used this pubkey personally. Inserting in database. ' )
2013-11-20 06:29:37 +00:00
t = ( ripe , addressVersion , data , embeddedTime , ' no ' )
# This will also update the embeddedTime.
sqlExecute ( ''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' , * t )
# shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self . possibleNewPubkey ( ripe = ripe )
if addressVersion == 3 :
if len ( data ) < 170 : # sanity check.
2014-01-17 01:10:04 +00:00
logger . warning ( ' (within processpubkey) payloadLength less than 170. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
bitfieldBehaviors = data [ readPosition : readPosition + 4 ]
readPosition + = 4
publicSigningKey = ' \x04 ' + data [ readPosition : readPosition + 64 ]
# Is it possible for a public key to be invalid such that trying to
# encrypt or sign with it will cause an error? If it is, we should
# probably test these keys here.
readPosition + = 64
publicEncryptionKey = ' \x04 ' + data [ readPosition : readPosition + 64 ]
readPosition + = 64
specifiedNonceTrialsPerByte , specifiedNonceTrialsPerByteLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes , specifiedPayloadLengthExtraBytesLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = specifiedPayloadLengthExtraBytesLength
endOfSignedDataPosition = readPosition
signatureLength , signatureLengthLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = signatureLengthLength
signature = data [ readPosition : readPosition + signatureLength ]
try :
if not highlevelcrypto . verify ( data [ 8 : endOfSignedDataPosition ] , signature , publicSigningKey . encode ( ' hex ' ) ) :
2014-01-17 01:10:04 +00:00
logger . warning ( ' ECDSA verify failed (within processpubkey) ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' ECDSA verify passed (within processpubkey) ' )
2013-11-20 06:29:37 +00:00
except Exception as err :
2014-01-17 01:10:04 +00:00
logger . warning ( ' ECDSA verify failed (within processpubkey) %s ' % err )
2013-11-20 06:29:37 +00:00
return
sha = hashlib . new ( ' sha512 ' )
sha . update ( publicSigningKey + publicEncryptionKey )
ripeHasher = hashlib . new ( ' ripemd160 ' )
ripeHasher . update ( sha . digest ( ) )
ripe = ripeHasher . digest ( )
2014-01-17 01:10:04 +00:00
logger . info ( ' within recpubkey, addressVersion: %s , streamNumber: %s \n \
ripe % s \n \
publicSigningKey in hex : % s \n \
publicEncryptionKey in hex : % s ' % (addressVersion,
streamNumber ,
ripe . encode ( ' hex ' ) ,
publicSigningKey . encode ( ' hex ' ) ,
publicEncryptionKey . encode ( ' hex ' )
)
)
2013-11-20 06:29:37 +00:00
queryreturn = sqlQuery ( ''' SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally= ' yes ' ''' , ripe , addressVersion )
if queryreturn != [ ] : # if this pubkey is already in our database and if we have used it personally:
2014-01-17 01:10:04 +00:00
logger . info ( ' We HAVE used this pubkey personally. Updating time. ' )
2013-11-20 06:29:37 +00:00
t = ( ripe , addressVersion , data , embeddedTime , ' yes ' )
else :
2014-01-17 01:10:04 +00:00
logger . info ( ' We have NOT used this pubkey personally. Inserting in database. ' )
2013-11-20 06:29:37 +00:00
t = ( ripe , addressVersion , data , embeddedTime , ' no ' )
# This will also update the embeddedTime.
sqlExecute ( ''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' , * t )
self . possibleNewPubkey ( ripe = ripe )
if addressVersion == 4 :
2013-12-06 06:52:19 +00:00
"""
There exist a function : shared . decryptAndCheckPubkeyPayload which does something almost
the same as this section of code . There are differences , however ; one being that
decryptAndCheckPubkeyPayload requires that a cryptor object be created each time it is
run which is an expensive operation . This , on the other hand , keeps them saved in
the shared . neededPubkeys dictionary so that if an attacker sends us many
incorrectly - tagged pubkeys , which would force us to try to decrypt them , this code
would run and handle that event quite quickly .
"""
2013-11-20 06:29:37 +00:00
if len ( data ) < 350 : # sanity check.
2014-01-17 01:10:04 +00:00
logger . debug ( ' (within processpubkey) payloadLength less than 350. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
signedData = data [ 8 : readPosition ] # Some of the signed data is not encrypted so let's keep it for now.
tag = data [ readPosition : readPosition + 32 ]
readPosition + = 32
encryptedData = data [ readPosition : ]
if tag not in shared . neededPubkeys :
2014-01-17 01:10:04 +00:00
logger . info ( ' We don \' t need this v4 pubkey. We didn \' t ask for it. ' )
2013-11-20 06:29:37 +00:00
return
# Let us try to decrypt the pubkey
cryptorObject = shared . neededPubkeys [ tag ]
try :
decryptedData = cryptorObject . decrypt ( encryptedData )
except :
# Someone must have encrypted some data with a different key
# but tagged it with a tag for which we are watching.
2014-01-17 01:10:04 +00:00
logger . info ( ' Pubkey decryption was unsuccessful. ' )
2013-11-20 06:29:37 +00:00
return
readPosition = 0
bitfieldBehaviors = decryptedData [ readPosition : readPosition + 4 ]
readPosition + = 4
publicSigningKey = ' \x04 ' + decryptedData [ readPosition : readPosition + 64 ]
# Is it possible for a public key to be invalid such that trying to
# encrypt or check a sig with it will cause an error? If it is, we
# should probably test these keys here.
readPosition + = 64
publicEncryptionKey = ' \x04 ' + decryptedData [ readPosition : readPosition + 64 ]
readPosition + = 64
specifiedNonceTrialsPerByte , specifiedNonceTrialsPerByteLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes , specifiedPayloadLengthExtraBytesLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = specifiedPayloadLengthExtraBytesLength
signedData + = decryptedData [ : readPosition ]
signatureLength , signatureLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = signatureLengthLength
signature = decryptedData [ readPosition : readPosition + signatureLength ]
try :
if not highlevelcrypto . verify ( signedData , signature , publicSigningKey . encode ( ' hex ' ) ) :
2014-01-17 01:10:04 +00:00
logger . info ( ' ECDSA verify failed (within processpubkey) ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' ECDSA verify passed (within processpubkey) ' )
2013-11-20 06:29:37 +00:00
except Exception as err :
2014-01-17 01:10:04 +00:00
logger . info ( ' ECDSA verify failed (within processpubkey) %s ' % err )
2013-11-20 06:29:37 +00:00
return
sha = hashlib . new ( ' sha512 ' )
sha . update ( publicSigningKey + publicEncryptionKey )
ripeHasher = hashlib . new ( ' ripemd160 ' )
ripeHasher . update ( sha . digest ( ) )
ripe = ripeHasher . digest ( )
# We need to make sure that the tag on the outside of the encryption
# is the one generated from hashing these particular keys.
if tag != hashlib . sha512 ( hashlib . sha512 ( encodeVarint ( addressVersion ) + encodeVarint ( streamNumber ) + ripe ) . digest ( ) ) . digest ( ) [ 32 : ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' Someone was trying to act malicious: tag doesn \' t match the keys in this pubkey message. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' within recpubkey, addressVersion: %s , streamNumber: %s \n \
ripe % s \n \
publicSigningKey in hex : % s \n \
publicEncryptionKey in hex : % s ' % (addressVersion,
streamNumber ,
ripe . encode ( ' hex ' ) ,
publicSigningKey . encode ( ' hex ' ) ,
publicEncryptionKey . encode ( ' hex ' )
)
)
2013-11-20 06:29:37 +00:00
t = ( ripe , addressVersion , signedData , embeddedTime , ' yes ' )
sqlExecute ( ''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' , * t )
fromAddress = encodeAddress ( addressVersion , streamNumber , ripe )
# That this point we know that we have been waiting on this pubkey.
# This function will command the workerThread to start work on
# the messages that require it.
self . possibleNewPubkey ( address = fromAddress )
# Display timing data
timeRequiredToProcessPubkey = time . time (
) - pubkeyProcessingStartTime
2014-01-17 01:10:04 +00:00
logger . debug ( ' Time required to process this pubkey: %s ' % timeRequiredToProcessPubkey )
2013-11-20 06:29:37 +00:00
2013-11-14 03:45:10 +00:00
def processmsg ( self , data ) :
2013-11-20 06:29:37 +00:00
messageProcessingStartTime = time . time ( )
shared . numberOfMessagesProcessed + = 1
shared . UISignalQueue . put ( (
' updateNumberOfMessagesProcessed ' , ' no data ' ) )
readPosition = 8 # bypass the nonce
2013-11-14 03:45:10 +00:00
embeddedTime , = unpack ( ' >I ' , data [ readPosition : readPosition + 4 ] )
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0 :
embeddedTime , = unpack ( ' >Q ' , data [ readPosition : readPosition + 8 ] )
readPosition + = 8
else :
readPosition + = 4
streamNumberAsClaimedByMsg , streamNumberAsClaimedByMsgLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = streamNumberAsClaimedByMsgLength
inventoryHash = calculateInventoryHash ( data )
initialDecryptionSuccessful = False
# Let's check whether this is a message acknowledgement bound for us.
if data [ readPosition : ] in shared . ackdataForWhichImWatching :
2014-01-17 01:10:04 +00:00
logger . info ( ' This msg IS an acknowledgement bound for me. ' )
2013-11-14 03:45:10 +00:00
del shared . ackdataForWhichImWatching [ data [ readPosition : ] ]
sqlExecute ( ' UPDATE sent SET status=? WHERE ackdata=? ' ,
' ackreceived ' , data [ readPosition : ] )
2014-08-06 02:01:01 +00:00
shared . UISignalQueue . put ( ( ' updateSentItemStatusByAckdata ' , ( data [ readPosition : ] , tr . translateText ( " MainWindow " , ' Acknowledgement of the message received. % 1 ' ) . arg ( l10n . formatTimestamp ( ) ) ) ) )
2013-11-14 03:45:10 +00:00
return
else :
2014-01-17 01:10:04 +00:00
logger . info ( ' This was NOT an acknowledgement bound for me. ' )
2013-11-14 03:45:10 +00:00
# This is not an acknowledgement bound for me. See if it is a message
# bound for me by trying to decrypt it with my private keys.
for key , cryptorObject in shared . myECCryptorObjects . items ( ) :
try :
decryptedData = cryptorObject . decrypt (
data [ readPosition : ] )
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
2014-01-17 01:10:04 +00:00
logger . info ( ' EC decryption successful using key associated with ripe hash: %s ' % key . encode ( ' hex ' ) )
2013-11-14 03:45:10 +00:00
break
except Exception as err :
pass
# print 'cryptorObject.decrypt Exception:', err
if not initialDecryptionSuccessful :
# This is not a message bound for me.
2014-01-17 01:10:04 +00:00
logger . info ( ' Length of time program spent failing to decrypt this message: %s seconds. ' % ( time . time ( ) - messageProcessingStartTime , ) )
2013-11-20 06:29:37 +00:00
return
2013-11-14 03:45:10 +00:00
2013-11-20 06:29:37 +00:00
# This is a message bound for me.
toAddress = shared . myAddressesByHash [
toRipe ] # Look up my address based on the RIPE hash.
readPosition = 0
messageVersion , messageVersionLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = messageVersionLength
if messageVersion != 1 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Cannot understand message versions other than one. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
sendersAddressVersionNumber , sendersAddressVersionNumberLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = sendersAddressVersionNumberLength
if sendersAddressVersionNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Cannot understand sendersAddressVersionNumber = 0. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
if sendersAddressVersionNumber > 4 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Sender \' s address version number %s not yet supported. Ignoring message. ' % sendersAddressVersionNumber )
2013-11-20 06:29:37 +00:00
return
if len ( decryptedData ) < 170 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
sendersStreamNumber , sendersStreamNumberLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
if sendersStreamNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s stream number is 0. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
readPosition + = sendersStreamNumberLength
behaviorBitfield = decryptedData [ readPosition : readPosition + 4 ]
readPosition + = 4
pubSigningKey = ' \x04 ' + decryptedData [
readPosition : readPosition + 64 ]
readPosition + = 64
pubEncryptionKey = ' \x04 ' + decryptedData [
readPosition : readPosition + 64 ]
readPosition + = 64
if sendersAddressVersionNumber > = 3 :
requiredAverageProofOfWorkNonceTrialsPerByte , varintLength = decodeVarint (
2013-11-14 03:45:10 +00:00
decryptedData [ readPosition : readPosition + 10 ] )
2013-11-20 06:29:37 +00:00
readPosition + = varintLength
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s requiredAverageProofOfWorkNonceTrialsPerByte is %s ' % requiredAverageProofOfWorkNonceTrialsPerByte )
2013-11-20 06:29:37 +00:00
requiredPayloadLengthExtraBytes , varintLength = decodeVarint (
2013-11-14 03:45:10 +00:00
decryptedData [ readPosition : readPosition + 10 ] )
2013-11-20 06:29:37 +00:00
readPosition + = varintLength
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s requiredPayloadLengthExtraBytes is %s ' % requiredPayloadLengthExtraBytes )
2013-11-20 06:29:37 +00:00
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
if toRipe != decryptedData [ readPosition : readPosition + 20 ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack. \n \
See : http : / / world . std . com / ~ dtd / sign_encrypt / sign_encrypt7 . html \n \
your toRipe : % s \n \
embedded destination toRipe : % s ' % (toRipe.encode( ' hex ' ), decryptedData[readPosition:readPosition + 20].encode( ' hex ' ))
)
2013-11-20 06:29:37 +00:00
return
readPosition + = 20
messageEncodingType , messageEncodingTypeLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = messageEncodingTypeLength
messageLength , messageLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = messageLengthLength
message = decryptedData [ readPosition : readPosition + messageLength ]
# print 'First 150 characters of message:', repr(message[:150])
readPosition + = messageLength
ackLength , ackLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = ackLengthLength
ackData = decryptedData [ readPosition : readPosition + ackLength ]
readPosition + = ackLength
positionOfBottomOfAckData = readPosition # needed to mark the end of what is covered by the signature
signatureLength , signatureLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = signatureLengthLength
signature = decryptedData [
readPosition : readPosition + signatureLength ]
try :
if not highlevelcrypto . verify ( decryptedData [ : positionOfBottomOfAckData ] , signature , pubSigningKey . encode ( ' hex ' ) ) :
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify failed ' )
2013-11-14 03:45:10 +00:00
return
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify passed ' )
2013-11-20 06:29:37 +00:00
except Exception as err :
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify failed %s ' % err )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . debug ( ' As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s . The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing. ' %
( helper_bitcoin . calculateBitcoinAddressFromPubkey ( pubSigningKey ) , helper_bitcoin . calculateTestnetAddressFromPubkey ( pubSigningKey ) )
)
2013-11-20 06:29:37 +00:00
# calculate the fromRipe.
sha = hashlib . new ( ' sha512 ' )
sha . update ( pubSigningKey + pubEncryptionKey )
ripe = hashlib . new ( ' ripemd160 ' )
ripe . update ( sha . digest ( ) )
fromAddress = encodeAddress (
sendersAddressVersionNumber , sendersStreamNumber , ripe . digest ( ) )
# Let's store the public key in case we want to reply to this
# person.
if sendersAddressVersionNumber < = 3 :
sqlExecute (
''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' ,
ripe . digest ( ) ,
sendersAddressVersionNumber ,
' \xFF \xFF \xFF \xFF \xFF \xFF \xFF \xFF ' + ' \xFF \xFF \xFF \xFF ' + decryptedData [ messageVersionLength : endOfThePublicKeyPosition ] ,
int ( time . time ( ) ) ,
' yes ' )
# This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the POW
# and send it.
self . possibleNewPubkey ( ripe = ripe . digest ( ) )
elif sendersAddressVersionNumber > = 4 :
sqlExecute (
''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' ,
ripe . digest ( ) ,
sendersAddressVersionNumber ,
' \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x01 ' + decryptedData [ messageVersionLength : endOfThePublicKeyPosition ] ,
int ( time . time ( ) ) ,
' yes ' )
# This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the POW
# and send it.
self . possibleNewPubkey ( address = fromAddress )
# If this message is bound for one of my version 3 addresses (or
# higher), then we must check to make sure it meets our demanded
2014-01-17 01:10:04 +00:00
# proof of work requirement. If this is bound for one of my chan
# addresses then we skip this check; the minimum network POW is
# fine.
if decodeAddress ( toAddress ) [ 1 ] > = 3 and not shared . safeConfigGetBoolean ( toAddress , ' chan ' ) : # If the toAddress version number is 3 or higher and not one of my chan addresses:
2013-11-20 06:29:37 +00:00
if not shared . isAddressInMyAddressBookSubscriptionsListOrWhitelist ( fromAddress ) : # If I'm not friendly with this person:
requiredNonceTrialsPerByte = shared . config . getint (
toAddress , ' noncetrialsperbyte ' )
requiredPayloadLengthExtraBytes = shared . config . getint (
toAddress , ' payloadlengthextrabytes ' )
if not shared . isProofOfWorkSufficient ( data , requiredNonceTrialsPerByte , requiredPayloadLengthExtraBytes ) :
print ' Proof of work in msg message insufficient only because it does not meet our higher requirement. '
return
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
if shared . config . get ( ' bitmessagesettings ' , ' blackwhitelist ' ) == ' black ' : # If we are using a blacklist
queryreturn = sqlQuery (
''' SELECT label FROM blacklist where address=? and enabled= ' 1 ' ''' ,
fromAddress )
if queryreturn != [ ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' Message ignored because address is in blacklist. ' )
2013-11-20 06:29:37 +00:00
blockMessage = True
else : # We're using a whitelist
queryreturn = sqlQuery (
''' SELECT label FROM whitelist where address=? and enabled= ' 1 ' ''' ,
fromAddress )
if queryreturn == [ ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' Message ignored because address not in whitelist. ' )
2013-11-20 06:29:37 +00:00
blockMessage = True
2014-07-26 17:15:28 +00:00
toLabel = shared . config . get ( toAddress , ' label ' )
if toLabel == ' ' :
toLabel = toAddress
if messageEncodingType == 2 :
subject , body = self . decodeType2Message ( message )
logger . info ( ' Message subject (first 100 characters): %s ' % repr ( subject ) [ : 100 ] )
elif messageEncodingType == 1 :
body = message
subject = ' '
elif messageEncodingType == 0 :
logger . info ( ' messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them. ' )
subject = ' '
body = ' '
else :
body = ' Unknown encoding type. \n \n ' + repr ( message )
subject = ' '
# Let us make sure that we haven't already received this message
if helper_inbox . isMessageAlreadyInInbox ( toAddress , fromAddress , subject , body , messageEncodingType ) :
logger . info ( ' This msg is already in our inbox. Ignoring it. ' )
blockMessage = True
2013-11-20 06:29:37 +00:00
if not blockMessage :
if messageEncodingType != 0 :
t = ( inventoryHash , toAddress , fromAddress , subject , int (
time . time ( ) ) , body , ' inbox ' , messageEncodingType , 0 )
helper_inbox . insert ( t )
shared . UISignalQueue . put ( ( ' displayNewInboxMessage ' , (
inventoryHash , toAddress , fromAddress , subject , body ) ) )
# If we are behaving as an API then we might need to run an
# outside command to let some program know that a new message
# has arrived.
if shared . safeConfigGetBoolean ( ' bitmessagesettings ' , ' apienabled ' ) :
try :
apiNotifyPath = shared . config . get (
' bitmessagesettings ' , ' apinotifypath ' )
except :
apiNotifyPath = ' '
if apiNotifyPath != ' ' :
call ( [ apiNotifyPath , " newMessage " ] )
# Let us now check and see whether our receiving address is
# behaving as a mailing list
if shared . safeConfigGetBoolean ( toAddress , ' mailinglist ' ) :
try :
mailingListName = shared . config . get (
toAddress , ' mailinglistname ' )
except :
mailingListName = ' '
# Let us send out this message as a broadcast
subject = self . addMailingListNameToSubject (
subject , mailingListName )
# Let us now send this message out as a broadcast
message = time . strftime ( " %a , % Y- % m- %d % H: % M: % S UTC " , time . gmtime (
) ) + ' Message ostensibly from ' + fromAddress + ' : \n \n ' + body
fromAddress = toAddress # The fromAddress for the broadcast that we are about to send is the toAddress (my address) for the msg message we are currently processing.
ackdataForBroadcast = OpenSSL . rand (
32 ) # We don't actually need the ackdataForBroadcast for acknowledgement since this is a broadcast message but we can use it to update the user interface when the POW is done generating.
toAddress = ' [Broadcast subscribers] '
ripe = ' '
t = ( ' ' , toAddress , ripe , fromAddress , subject , message , ackdataForBroadcast , int (
time . time ( ) ) , ' broadcastqueued ' , 1 , 1 , ' sent ' , 2 )
helper_sent . insert ( t )
shared . UISignalQueue . put ( ( ' displayNewSentMessage ' , (
toAddress , ' [Broadcast subscribers] ' , fromAddress , subject , message , ackdataForBroadcast ) ) )
shared . workerQueue . put ( ( ' sendbroadcast ' , ' ' ) )
if self . ackDataHasAVaildHeader ( ackData ) :
2014-05-02 14:46:36 +00:00
if ackData [ 4 : 16 ] == addDataPadding ( ' getpubkey ' ) :
2013-11-20 06:29:37 +00:00
shared . checkAndSharegetpubkeyWithPeers ( ackData [ 24 : ] )
2014-05-02 14:46:36 +00:00
elif ackData [ 4 : 16 ] == addDataPadding ( ' pubkey ' ) :
2013-11-20 06:29:37 +00:00
shared . checkAndSharePubkeyWithPeers ( ackData [ 24 : ] )
2014-05-02 14:46:36 +00:00
elif ackData [ 4 : 16 ] == addDataPadding ( ' msg ' ) :
2013-11-20 06:29:37 +00:00
shared . checkAndShareMsgWithPeers ( ackData [ 24 : ] )
2014-05-02 14:46:36 +00:00
elif ackData [ 4 : 16 ] == addDataPadding ( ' broadcast ' ) :
2013-11-20 06:29:37 +00:00
shared . checkAndShareBroadcastWithPeers ( ackData [ 24 : ] )
# Display timing data
timeRequiredToAttemptToDecryptMessage = time . time (
) - messageProcessingStartTime
shared . successfullyDecryptMessageTimings . append (
timeRequiredToAttemptToDecryptMessage )
sum = 0
for item in shared . successfullyDecryptMessageTimings :
sum + = item
2014-01-17 01:10:04 +00:00
logger . debug ( ' Time to decrypt this message successfully: %s \n \
Average time for all message decryption successes since startup : % s . ' %
( timeRequiredToAttemptToDecryptMessage , sum / len ( shared . successfullyDecryptMessageTimings ) )
)
2013-11-20 06:29:37 +00:00
def processbroadcast ( self , data ) :
messageProcessingStartTime = time . time ( )
shared . numberOfBroadcastsProcessed + = 1
shared . UISignalQueue . put ( (
' updateNumberOfBroadcastsProcessed ' , ' no data ' ) )
inventoryHash = calculateInventoryHash ( data )
readPosition = 8 # bypass the nonce
embeddedTime , = unpack ( ' >I ' , data [ readPosition : readPosition + 4 ] )
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0 :
embeddedTime , = unpack ( ' >Q ' , data [ readPosition : readPosition + 8 ] )
readPosition + = 8
else :
readPosition + = 4
broadcastVersion , broadcastVersionLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = broadcastVersionLength
if broadcastVersion < 1 or broadcastVersion > 3 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' Cannot decode incoming broadcast versions higher than 3. Assuming the sender isn \' t being silly, you should upgrade Bitmessage because this message shall be ignored. ' )
2013-11-20 06:29:37 +00:00
return
if broadcastVersion == 1 :
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table
sendersAddressVersion , sendersAddressVersionLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
if sendersAddressVersion < = 1 or sendersAddressVersion > = 3 :
# Cannot decode senderAddressVersion higher than 2. Assuming
# the sender isn\'t being silly, you should upgrade Bitmessage
# because this message shall be ignored.
2013-11-14 03:45:10 +00:00
return
2013-11-20 06:29:37 +00:00
readPosition + = sendersAddressVersionLength
if sendersAddressVersion == 2 :
sendersStream , sendersStreamLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = sendersStreamLength
behaviorBitfield = data [ readPosition : readPosition + 4 ]
readPosition + = 4
sendersPubSigningKey = ' \x04 ' + \
data [ readPosition : readPosition + 64 ]
readPosition + = 64
sendersPubEncryptionKey = ' \x04 ' + \
data [ readPosition : readPosition + 64 ]
readPosition + = 64
endOfPubkeyPosition = readPosition
sendersHash = data [ readPosition : readPosition + 20 ]
if sendersHash not in shared . broadcastSendersForWhichImWatching :
# Display timing data
2014-01-17 01:10:04 +00:00
logger . debug ( ' Time spent deciding that we are not interested in this v1 broadcast: %s ' % ( time . time ( ) - messageProcessingStartTime , ) )
2013-11-20 06:29:37 +00:00
return
# At this point, this message claims to be from sendersHash and
# we are interested in it. We still have to hash the public key
# to make sure it is truly the key that matches the hash, and
# also check the signiture.
readPosition + = 20
sha = hashlib . new ( ' sha512 ' )
sha . update ( sendersPubSigningKey + sendersPubEncryptionKey )
ripe = hashlib . new ( ' ripemd160 ' )
ripe . update ( sha . digest ( ) )
if ripe . digest ( ) != sendersHash :
# The sender of this message lied.
return
messageEncodingType , messageEncodingTypeLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
if messageEncodingType == 0 :
return
readPosition + = messageEncodingTypeLength
messageLength , messageLengthLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = messageLengthLength
message = data [ readPosition : readPosition + messageLength ]
readPosition + = messageLength
readPositionAtBottomOfMessage = readPosition
signatureLength , signatureLengthLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = signatureLengthLength
signature = data [ readPosition : readPosition + signatureLength ]
try :
if not highlevelcrypto . verify ( data [ 12 : readPositionAtBottomOfMessage ] , signature , sendersPubSigningKey . encode ( ' hex ' ) ) :
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify failed ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify passed ' )
2013-11-20 06:29:37 +00:00
except Exception as err :
2014-01-17 01:10:04 +00:00
logger . debug ( ' ECDSA verify failed %s ' % err )
2013-11-20 06:29:37 +00:00
return
# verify passed
fromAddress = encodeAddress (
sendersAddressVersion , sendersStream , ripe . digest ( ) )
2014-01-17 01:10:04 +00:00
logger . debug ( ' fromAddress: %s ' % fromAddress )
2013-11-20 06:29:37 +00:00
# Let's store the public key in case we want to reply to this person.
# We don't have the correct nonce or time (which would let us
# send out a pubkey message) so we'll just fill it with 1's. We
# won't be able to send this pubkey to others (without doing
# the proof of work ourselves, which this program is programmed
# to not do.)
sqlExecute (
''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' ,
ripe . digest ( ) ,
sendersAddressVersion ,
' \xFF \xFF \xFF \xFF \xFF \xFF \xFF \xFF ' + ' \xFF \xFF \xFF \xFF ' + data [ beginningOfPubkeyPosition : endOfPubkeyPosition ] ,
int ( time . time ( ) ) ,
' yes ' )
# This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the
# POW and send it.
self . possibleNewPubkey ( ripe = ripe . digest ( ) )
if messageEncodingType == 2 :
subject , body = decodeType2Message ( message )
2014-01-17 01:10:04 +00:00
logger . info ( ' Broadcast subject (first 100 characters): %s ' % repr ( subject ) [ : 100 ] )
2013-11-20 06:29:37 +00:00
elif messageEncodingType == 1 :
body = message
subject = ' '
elif messageEncodingType == 0 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' messageEncodingType == 0. Doing nothing with the message. ' )
2013-11-20 06:29:37 +00:00
else :
body = ' Unknown encoding type. \n \n ' + repr ( message )
subject = ' '
toAddress = ' [Broadcast subscribers] '
if messageEncodingType != 0 :
2014-07-26 17:15:28 +00:00
# Let us make sure that we haven't already received this message
if helper_inbox . isMessageAlreadyInInbox ( toAddress , fromAddress , subject , body , messageEncodingType ) :
logger . info ( ' This broadcast is already in our inbox. Ignoring it. ' )
else :
t = ( inventoryHash , toAddress , fromAddress , subject , int (
time . time ( ) ) , body , ' inbox ' , messageEncodingType , 0 )
helper_inbox . insert ( t )
shared . UISignalQueue . put ( ( ' displayNewInboxMessage ' , (
inventoryHash , toAddress , fromAddress , subject , body ) ) )
# If we are behaving as an API then we might need to run an