2013-11-14 03:45:10 +00:00
import time
import threading
import shared
import hashlib
import random
from struct import unpack , pack
import sys
import string
from subprocess import call # used when the API must execute an outside program
2014-08-27 07:14:32 +00:00
import traceback
2016-03-23 22:26:57 +00:00
from binascii import hexlify
2013-11-14 03:45:10 +00:00
2014-08-27 07:14:32 +00:00
from pyelliptic . openssl import OpenSSL
2013-11-14 03:45:10 +00:00
import highlevelcrypto
from addresses import *
2017-02-22 08:34:54 +00:00
from bmconfigparser import BMConfigParser
2013-11-14 03:45:10 +00:00
import helper_generic
2014-05-02 14:46:36 +00:00
from helper_generic import addDataPadding
2013-11-14 03:45:10 +00:00
import helper_bitcoin
import helper_inbox
2016-11-14 19:23:58 +00:00
import helper_msgcoding
2013-11-14 03:45:10 +00:00
import helper_sent
from helper_sql import *
2017-09-30 09:19:44 +00:00
from helper_ackPayload import genAckPayload
2017-01-11 13:27:19 +00:00
import protocol
2017-02-08 12:41:56 +00:00
import queues
2017-01-14 22:20:15 +00:00
import state
2013-11-14 03:45:10 +00:00
import tr
from debug import logger
2014-08-06 02:01:01 +00:00
import l10n
2013-11-14 03:45:10 +00:00
class objectProcessor ( threading . Thread ) :
"""
The objectProcessor thread , of which there is only one , receives network
2016-03-18 17:56:40 +00:00
objects ( msg , broadcast , pubkey , getpubkey ) from the receiveDataThreads .
2013-11-14 03:45:10 +00:00
"""
def __init__ ( self ) :
2015-11-18 15:22:17 +00:00
threading . Thread . __init__ ( self , name = " objectProcessor " )
2013-12-02 06:35:34 +00:00
"""
It may be the case that the last time Bitmessage was running , the user
closed it before it finished processing everything in the
objectProcessorQueue . Assuming that Bitmessage wasn ' t closed forcefully,
it should have saved the data in the queue into the objectprocessorqueue
table . Let ' s pull it out.
"""
queryreturn = sqlQuery (
''' SELECT objecttype, data FROM objectprocessorqueue ''' )
2016-01-22 10:17:10 +00:00
for row in queryreturn :
objectType , data = row
2017-02-08 12:41:56 +00:00
queues . objectProcessorQueue . put ( ( objectType , data ) )
2013-12-02 06:35:34 +00:00
sqlExecute ( ''' DELETE FROM objectprocessorqueue ''' )
logger . debug ( ' Loaded %s objects from disk into the objectProcessorQueue. ' % str ( len ( queryreturn ) ) )
2013-11-14 03:45:10 +00:00
def run ( self ) :
while True :
2017-02-08 12:41:56 +00:00
objectType , data = queues . objectProcessorQueue . get ( )
2013-11-14 03:45:10 +00:00
2017-04-04 08:43:29 +00:00
self . checkackdata ( data )
2014-08-27 07:14:32 +00:00
try :
if objectType == 0 : # getpubkey
self . processgetpubkey ( data )
elif objectType == 1 : #pubkey
self . processpubkey ( data )
elif objectType == 2 : #msg
self . processmsg ( data )
elif objectType == 3 : #broadcast
self . processbroadcast ( data )
elif objectType == ' checkShutdownVariable ' : # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable.
pass
else :
2017-07-05 07:01:40 +00:00
if isinstance ( objectType , int ) :
logger . info ( ' Don \' t know how to handle object type 0x %08X ' , objectType )
else :
logger . info ( ' Don \' t know how to handle object type %s ' , objectType )
2017-05-15 10:23:16 +00:00
except helper_msgcoding . DecompressionSizeException as e :
logger . error ( " The object is too big after decompression (stopped decompressing at %i b, your configured limit %i b). Ignoring " , e . size , BMConfigParser ( ) . safeGetInt ( " zlib " , " maxsize " ) )
2014-08-27 07:14:32 +00:00
except varintDecodeError as e :
logger . debug ( " There was a problem with a varint while processing an object. Some details: %s " % e )
except Exception as e :
logger . critical ( " Critical error within objectProcessorThread: \n %s " % traceback . format_exc ( ) )
2013-11-20 06:29:37 +00:00
2017-01-14 22:20:15 +00:00
if state . shutdown :
2013-12-02 06:35:34 +00:00
time . sleep ( .5 ) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute ( ) as sql :
2017-02-08 12:41:56 +00:00
while queues . objectProcessorQueue . curSize > 0 :
objectType , data = queues . objectProcessorQueue . get ( )
2013-12-02 06:35:34 +00:00
sql . execute ( ''' INSERT INTO objectprocessorqueue VALUES (?,?) ''' ,
objectType , data )
numberOfObjectsThatWereInTheObjectProcessorQueue + = 1
logger . debug ( ' Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting. ' % str ( numberOfObjectsThatWereInTheObjectProcessorQueue ) )
2017-01-14 22:20:15 +00:00
state . shutdown = 2
2013-12-02 06:35:34 +00:00
break
2017-04-04 08:43:29 +00:00
def checkackdata ( self , data ) :
# Let's check whether this is a message acknowledgement bound for us.
if len ( data ) < 32 :
return
2017-09-30 09:19:44 +00:00
# bypass nonce and time, retain object type/version/stream + body
readPosition = 16
2017-09-25 09:12:00 +00:00
if data [ readPosition : ] in shared . ackdataForWhichImWatching :
2017-04-04 08:43:29 +00:00
logger . info ( ' This object is an acknowledgement bound for me. ' )
2017-09-25 09:12:00 +00:00
del shared . ackdataForWhichImWatching [ data [ readPosition : ] ]
2017-04-04 08:43:29 +00:00
sqlExecute ( ' UPDATE sent SET status=?, lastactiontime=? WHERE ackdata=? ' ,
' ackreceived ' ,
int ( time . time ( ) ) ,
2017-09-25 09:12:00 +00:00
data [ readPosition : ] )
queues . UISignalQueue . put ( ( ' updateSentItemStatusByAckdata ' , ( data [ readPosition : ] , tr . _translate ( " MainWindow " , ' Acknowledgement of the message received % 1 ' ) . arg ( l10n . formatTimestamp ( ) ) ) ) )
2017-04-04 08:43:29 +00:00
else :
logger . debug ( ' This object is not an acknowledgement bound for me. ' )
2013-12-02 06:35:34 +00:00
2013-11-20 06:29:37 +00:00
def processgetpubkey ( self , data ) :
2017-04-04 08:44:53 +00:00
if len ( data ) > 200 :
logger . info ( ' getpubkey is abnormally long. Sanity check failed. Ignoring object. ' )
return
2014-08-27 07:14:32 +00:00
readPosition = 20 # bypass the nonce, time, and object type
2013-11-20 06:29:37 +00:00
requestedAddressVersionNumber , addressVersionLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = addressVersionLength
streamNumber , streamNumberLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = streamNumberLength
if requestedAddressVersionNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is zero. That doesn \' t make any sense. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
elif requestedAddressVersionNumber == 1 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is 1 which isn \' t supported anymore. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
elif requestedAddressVersionNumber > 4 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The requestedAddressVersionNumber of the pubkey request is too high. Can \' t understand. Ignoring it. ' )
2013-11-20 06:29:37 +00:00
return
myAddress = ' '
if requestedAddressVersionNumber < = 3 :
requestedHash = data [ readPosition : readPosition + 20 ]
if len ( requestedHash ) != 20 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The length of the requested hash is not 20 bytes. Something is wrong. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
2016-03-23 22:26:57 +00:00
logger . info ( ' the hash requested in this getpubkey request is: %s ' % hexlify ( requestedHash ) )
2013-11-20 06:29:37 +00:00
if requestedHash in shared . myAddressesByHash : # if this address hash is one of mine
myAddress = shared . myAddressesByHash [ requestedHash ]
elif requestedAddressVersionNumber > = 4 :
requestedTag = data [ readPosition : readPosition + 32 ]
if len ( requestedTag ) != 32 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' The length of the requested tag is not 32 bytes. Something is wrong. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
2016-03-23 22:26:57 +00:00
logger . debug ( ' the tag requested in this getpubkey request is: %s ' % hexlify ( requestedTag ) )
2013-11-20 06:29:37 +00:00
if requestedTag in shared . myAddressesByTag :
myAddress = shared . myAddressesByTag [ requestedTag ]
if myAddress == ' ' :
2014-01-17 01:10:04 +00:00
logger . info ( ' This getpubkey request is not for any of my keys. ' )
2013-11-20 06:29:37 +00:00
return
if decodeAddress ( myAddress ) [ 1 ] != requestedAddressVersionNumber :
2014-01-17 01:10:04 +00:00
logger . warning ( ' (Within the processgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn \' t match my actual address version number. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
if decodeAddress ( myAddress ) [ 2 ] != streamNumber :
2014-01-17 01:10:04 +00:00
logger . warning ( ' (Within the processgetpubkey function) Someone requested one of my pubkeys but the stream number on which we heard this getpubkey object doesn \' t match this address \' stream number. Ignoring. ' )
2013-11-20 06:29:37 +00:00
return
2017-01-11 13:27:19 +00:00
if BMConfigParser ( ) . safeGetBoolean ( myAddress , ' chan ' ) :
2014-01-17 01:10:04 +00:00
logger . info ( ' Ignoring getpubkey request because it is for one of my chan addresses. The other party should already have the pubkey. ' )
2013-11-20 06:29:37 +00:00
return
try :
2017-01-11 13:27:19 +00:00
lastPubkeySendTime = int ( BMConfigParser ( ) . get (
2013-11-20 06:29:37 +00:00
myAddress , ' lastpubkeysendtime ' ) )
except :
lastPubkeySendTime = 0
2014-08-27 07:14:32 +00:00
if lastPubkeySendTime > time . time ( ) - 2419200 : # If the last time we sent our pubkey was more recent than 28 days ago...
2014-01-17 01:10:04 +00:00
logger . info ( ' Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s ' % lastPubkeySendTime )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
logger . info ( ' Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out. ' )
2013-11-20 06:29:37 +00:00
if requestedAddressVersionNumber == 2 :
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( (
2013-11-20 06:29:37 +00:00
' doPOWForMyV2Pubkey ' , requestedHash ) )
elif requestedAddressVersionNumber == 3 :
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( (
2013-11-20 06:29:37 +00:00
' sendOutOrStoreMyV3Pubkey ' , requestedHash ) )
elif requestedAddressVersionNumber == 4 :
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( (
2013-11-20 06:29:37 +00:00
' sendOutOrStoreMyV4Pubkey ' , myAddress ) )
def processpubkey ( self , data ) :
pubkeyProcessingStartTime = time . time ( )
shared . numberOfPubkeysProcessed + = 1
2017-10-19 06:39:09 +00:00
queues . UISignalQueue . put ( (
' updateNumberOfPubkeysProcessed ' , ' no data ' ) )
2014-08-27 07:14:32 +00:00
embeddedTime , = unpack ( ' >Q ' , data [ 8 : 16 ] )
readPosition = 20 # bypass the nonce, time, and object type
2013-11-20 06:29:37 +00:00
addressVersion , varintLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = varintLength
streamNumber , varintLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = varintLength
if addressVersion == 0 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' (Within processpubkey) addressVersion of 0 doesn \' t make sense. ' )
2013-11-20 06:29:37 +00:00
return
if addressVersion > 4 or addressVersion == 1 :
2014-01-17 01:10:04 +00:00
logger . info ( ' This version of Bitmessage cannot handle version %s addresses. ' % addressVersion )
2013-11-20 06:29:37 +00:00
return
if addressVersion == 2 :
if len ( data ) < 146 : # sanity check. This is the minimum possible length.
2014-01-17 01:10:04 +00:00
logger . debug ( ' (within processpubkey) payloadLength less than 146. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
bitfieldBehaviors = data [ readPosition : readPosition + 4 ]
readPosition + = 4
publicSigningKey = data [ readPosition : readPosition + 64 ]
# Is it possible for a public key to be invalid such that trying to
2014-12-25 08:57:34 +00:00
# encrypt or sign with it will cause an error? If it is, it would
# be easiest to test them here.
2013-11-20 06:29:37 +00:00
readPosition + = 64
publicEncryptionKey = data [ readPosition : readPosition + 64 ]
if len ( publicEncryptionKey ) < 64 :
2014-01-17 01:10:04 +00:00
logger . debug ( ' publicEncryptionKey length less than 64. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
2014-12-25 08:57:34 +00:00
readPosition + = 64
dataToStore = data [ 20 : readPosition ] # The data we'll store in the pubkeys table.
2013-11-20 06:29:37 +00:00
sha = hashlib . new ( ' sha512 ' )
sha . update (
' \x04 ' + publicSigningKey + ' \x04 ' + publicEncryptionKey )
ripeHasher = hashlib . new ( ' ripemd160 ' )
ripeHasher . update ( sha . digest ( ) )
ripe = ripeHasher . digest ( )
2015-03-09 06:35:32 +00:00
logger . debug ( ' within recpubkey, addressVersion: %s , streamNumber: %s \n \
2014-01-17 01:10:04 +00:00
ripe % s \n \
publicSigningKey in hex : % s \n \
publicEncryptionKey in hex : % s ' % (addressVersion,
streamNumber ,
2016-03-23 22:26:57 +00:00
hexlify ( ripe ) ,
hexlify ( publicSigningKey ) ,
hexlify ( publicEncryptionKey )
2014-01-17 01:10:04 +00:00
)
)
2013-11-20 06:29:37 +00:00
2014-08-27 07:14:32 +00:00
2015-03-09 06:35:32 +00:00
address = encodeAddress ( addressVersion , streamNumber , ripe )
queryreturn = sqlQuery (
''' SELECT usedpersonally FROM pubkeys WHERE address=? AND usedpersonally= ' yes ' ''' , address )
2013-11-20 06:29:37 +00:00
if queryreturn != [ ] : # if this pubkey is already in our database and if we have used it personally:
2014-01-17 01:10:04 +00:00
logger . info ( ' We HAVE used this pubkey personally. Updating time. ' )
2015-03-09 06:35:32 +00:00
t = ( address , addressVersion , dataToStore , int ( time . time ( ) ) , ' yes ' )
2013-11-20 06:29:37 +00:00
else :
2014-01-17 01:10:04 +00:00
logger . info ( ' We have NOT used this pubkey personally. Inserting in database. ' )
2015-03-09 06:35:32 +00:00
t = ( address , addressVersion , dataToStore , int ( time . time ( ) ) , ' no ' )
2013-11-20 06:29:37 +00:00
sqlExecute ( ''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' , * t )
2015-03-09 06:35:32 +00:00
self . possibleNewPubkey ( address )
2013-11-20 06:29:37 +00:00
if addressVersion == 3 :
if len ( data ) < 170 : # sanity check.
2014-01-17 01:10:04 +00:00
logger . warning ( ' (within processpubkey) payloadLength less than 170. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
bitfieldBehaviors = data [ readPosition : readPosition + 4 ]
readPosition + = 4
publicSigningKey = ' \x04 ' + data [ readPosition : readPosition + 64 ]
readPosition + = 64
publicEncryptionKey = ' \x04 ' + data [ readPosition : readPosition + 64 ]
readPosition + = 64
specifiedNonceTrialsPerByte , specifiedNonceTrialsPerByteLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes , specifiedPayloadLengthExtraBytesLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = specifiedPayloadLengthExtraBytesLength
endOfSignedDataPosition = readPosition
2014-12-25 08:57:34 +00:00
dataToStore = data [ 20 : readPosition ] # The data we'll store in the pubkeys table.
2013-11-20 06:29:37 +00:00
signatureLength , signatureLengthLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = signatureLengthLength
signature = data [ readPosition : readPosition + signatureLength ]
2016-03-23 22:26:57 +00:00
if highlevelcrypto . verify ( data [ 8 : endOfSignedDataPosition ] , signature , hexlify ( publicSigningKey ) ) :
2014-12-25 08:57:34 +00:00
logger . debug ( ' ECDSA verify passed (within processpubkey) ' )
2014-08-27 07:14:32 +00:00
else :
2014-12-25 08:57:34 +00:00
logger . warning ( ' ECDSA verify failed (within processpubkey) ' )
return
2013-11-20 06:29:37 +00:00
sha = hashlib . new ( ' sha512 ' )
sha . update ( publicSigningKey + publicEncryptionKey )
ripeHasher = hashlib . new ( ' ripemd160 ' )
ripeHasher . update ( sha . digest ( ) )
ripe = ripeHasher . digest ( )
2014-01-17 01:10:04 +00:00
2015-03-09 06:35:32 +00:00
logger . debug ( ' within recpubkey, addressVersion: %s , streamNumber: %s \n \
2014-01-17 01:10:04 +00:00
ripe % s \n \
publicSigningKey in hex : % s \n \
publicEncryptionKey in hex : % s ' % (addressVersion,
streamNumber ,
2016-03-23 22:26:57 +00:00
hexlify ( ripe ) ,
hexlify ( publicSigningKey ) ,
hexlify ( publicEncryptionKey )
2014-01-17 01:10:04 +00:00
)
)
2013-11-20 06:29:37 +00:00
2015-03-09 06:35:32 +00:00
address = encodeAddress ( addressVersion , streamNumber , ripe )
queryreturn = sqlQuery ( ''' SELECT usedpersonally FROM pubkeys WHERE address=? AND usedpersonally= ' yes ' ''' , address )
2013-11-20 06:29:37 +00:00
if queryreturn != [ ] : # if this pubkey is already in our database and if we have used it personally:
2014-01-17 01:10:04 +00:00
logger . info ( ' We HAVE used this pubkey personally. Updating time. ' )
2015-03-09 06:35:32 +00:00
t = ( address , addressVersion , dataToStore , int ( time . time ( ) ) , ' yes ' )
2013-11-20 06:29:37 +00:00
else :
2014-01-17 01:10:04 +00:00
logger . info ( ' We have NOT used this pubkey personally. Inserting in database. ' )
2015-03-09 06:35:32 +00:00
t = ( address , addressVersion , dataToStore , int ( time . time ( ) ) , ' no ' )
2013-11-20 06:29:37 +00:00
sqlExecute ( ''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' , * t )
2015-03-09 06:35:32 +00:00
self . possibleNewPubkey ( address )
2013-11-20 06:29:37 +00:00
if addressVersion == 4 :
if len ( data ) < 350 : # sanity check.
2014-01-17 01:10:04 +00:00
logger . debug ( ' (within processpubkey) payloadLength less than 350. Sanity check failed. ' )
2013-11-20 06:29:37 +00:00
return
2014-08-27 07:14:32 +00:00
2013-11-20 06:29:37 +00:00
tag = data [ readPosition : readPosition + 32 ]
2017-01-14 22:20:15 +00:00
if tag not in state . neededPubkeys :
2014-01-17 01:10:04 +00:00
logger . info ( ' We don \' t need this v4 pubkey. We didn \' t ask for it. ' )
2013-11-20 06:29:37 +00:00
return
2014-01-17 01:10:04 +00:00
2014-08-27 07:14:32 +00:00
# Let us try to decrypt the pubkey
2017-01-14 22:20:15 +00:00
toAddress , cryptorObject = state . neededPubkeys [ tag ]
2014-08-27 07:14:32 +00:00
if shared . decryptAndCheckPubkeyPayload ( data , toAddress ) == ' successful ' :
# At this point we know that we have been waiting on this pubkey.
# This function will command the workerThread to start work on
# the messages that require it.
2015-03-09 06:35:32 +00:00
self . possibleNewPubkey ( toAddress )
2013-11-20 06:29:37 +00:00
# Display timing data
timeRequiredToProcessPubkey = time . time (
) - pubkeyProcessingStartTime
2014-01-17 01:10:04 +00:00
logger . debug ( ' Time required to process this pubkey: %s ' % timeRequiredToProcessPubkey )
2013-11-20 06:29:37 +00:00
2013-11-14 03:45:10 +00:00
def processmsg ( self , data ) :
2013-11-20 06:29:37 +00:00
messageProcessingStartTime = time . time ( )
shared . numberOfMessagesProcessed + = 1
2017-10-19 06:39:09 +00:00
queues . UISignalQueue . put ( (
' updateNumberOfMessagesProcessed ' , ' no data ' ) )
2014-08-27 07:14:32 +00:00
readPosition = 20 # bypass the nonce, time, and object type
2014-12-25 08:57:34 +00:00
msgVersion , msgVersionLength = decodeVarint ( data [ readPosition : readPosition + 9 ] )
if msgVersion != 1 :
logger . info ( ' Cannot understand message versions other than one. Ignoring message. ' )
return
readPosition + = msgVersionLength
2014-08-27 07:14:32 +00:00
2013-11-14 03:45:10 +00:00
streamNumberAsClaimedByMsg , streamNumberAsClaimedByMsgLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = streamNumberAsClaimedByMsgLength
inventoryHash = calculateInventoryHash ( data )
initialDecryptionSuccessful = False
# This is not an acknowledgement bound for me. See if it is a message
# bound for me by trying to decrypt it with my private keys.
2014-08-27 07:14:32 +00:00
2017-11-30 19:08:14 +00:00
for key , cryptorObject in sorted ( shared . myECCryptorObjects . items ( ) , key = lambda x : random . random ( ) ) :
2013-11-14 03:45:10 +00:00
try :
2016-02-18 15:01:06 +00:00
if initialDecryptionSuccessful : # continue decryption attempts to avoid timing attacks
cryptorObject . decrypt ( data [ readPosition : ] )
else :
decryptedData = cryptorObject . decrypt ( data [ readPosition : ] )
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
2016-03-23 22:26:57 +00:00
logger . info ( ' EC decryption successful using key associated with ripe hash: %s . ' % hexlify ( key ) )
2013-11-14 03:45:10 +00:00
except Exception as err :
2014-12-25 08:57:34 +00:00
pass
2013-11-14 03:45:10 +00:00
if not initialDecryptionSuccessful :
# This is not a message bound for me.
2014-01-17 01:10:04 +00:00
logger . info ( ' Length of time program spent failing to decrypt this message: %s seconds. ' % ( time . time ( ) - messageProcessingStartTime , ) )
2013-11-20 06:29:37 +00:00
return
2013-11-14 03:45:10 +00:00
2013-11-20 06:29:37 +00:00
# This is a message bound for me.
toAddress = shared . myAddressesByHash [
toRipe ] # Look up my address based on the RIPE hash.
readPosition = 0
sendersAddressVersionNumber , sendersAddressVersionNumberLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = sendersAddressVersionNumberLength
if sendersAddressVersionNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Cannot understand sendersAddressVersionNumber = 0. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
if sendersAddressVersionNumber > 4 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Sender \' s address version number %s not yet supported. Ignoring message. ' % sendersAddressVersionNumber )
2013-11-20 06:29:37 +00:00
return
if len ( decryptedData ) < 170 :
2014-01-17 01:10:04 +00:00
logger . info ( ' Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
sendersStreamNumber , sendersStreamNumberLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
if sendersStreamNumber == 0 :
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s stream number is 0. Ignoring message. ' )
2013-11-20 06:29:37 +00:00
return
readPosition + = sendersStreamNumberLength
behaviorBitfield = decryptedData [ readPosition : readPosition + 4 ]
readPosition + = 4
pubSigningKey = ' \x04 ' + decryptedData [
readPosition : readPosition + 64 ]
readPosition + = 64
pubEncryptionKey = ' \x04 ' + decryptedData [
readPosition : readPosition + 64 ]
readPosition + = 64
if sendersAddressVersionNumber > = 3 :
requiredAverageProofOfWorkNonceTrialsPerByte , varintLength = decodeVarint (
2013-11-14 03:45:10 +00:00
decryptedData [ readPosition : readPosition + 10 ] )
2013-11-20 06:29:37 +00:00
readPosition + = varintLength
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s requiredAverageProofOfWorkNonceTrialsPerByte is %s ' % requiredAverageProofOfWorkNonceTrialsPerByte )
2013-11-20 06:29:37 +00:00
requiredPayloadLengthExtraBytes , varintLength = decodeVarint (
2013-11-14 03:45:10 +00:00
decryptedData [ readPosition : readPosition + 10 ] )
2013-11-20 06:29:37 +00:00
readPosition + = varintLength
2014-01-17 01:10:04 +00:00
logger . info ( ' sender \' s requiredPayloadLengthExtraBytes is %s ' % requiredPayloadLengthExtraBytes )
2013-11-20 06:29:37 +00:00
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
if toRipe != decryptedData [ readPosition : readPosition + 20 ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack. \n \
See : http : / / world . std . com / ~ dtd / sign_encrypt / sign_encrypt7 . html \n \
your toRipe : % s \n \
2016-03-23 22:26:57 +00:00
embedded destination toRipe : % s ' % (hexlify(toRipe), hexlify(decryptedData[readPosition:readPosition + 20]))
2014-01-17 01:10:04 +00:00
)
2013-11-20 06:29:37 +00:00
return
readPosition + = 20
messageEncodingType , messageEncodingTypeLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = messageEncodingTypeLength
messageLength , messageLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = messageLengthLength
message = decryptedData [ readPosition : readPosition + messageLength ]
# print 'First 150 characters of message:', repr(message[:150])
readPosition + = messageLength
ackLength , ackLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = ackLengthLength
ackData = decryptedData [ readPosition : readPosition + ackLength ]
readPosition + = ackLength
positionOfBottomOfAckData = readPosition # needed to mark the end of what is covered by the signature
signatureLength , signatureLengthLength = decodeVarint (
decryptedData [ readPosition : readPosition + 10 ] )
readPosition + = signatureLengthLength
signature = decryptedData [
readPosition : readPosition + signatureLength ]
2014-12-25 08:57:34 +00:00
signedData = data [ 8 : 20 ] + encodeVarint ( 1 ) + encodeVarint ( streamNumberAsClaimedByMsg ) + decryptedData [ : positionOfBottomOfAckData ]
2014-08-27 07:14:32 +00:00
2016-03-23 22:26:57 +00:00
if not highlevelcrypto . verify ( signedData , signature , hexlify ( pubSigningKey ) ) :
2014-08-27 07:14:32 +00:00
logger . debug ( ' ECDSA verify failed ' )
2013-11-20 06:29:37 +00:00
return
2014-08-27 07:14:32 +00:00
logger . debug ( ' ECDSA verify passed ' )
2014-01-17 01:10:04 +00:00
logger . debug ( ' As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s . The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing. ' %
( helper_bitcoin . calculateBitcoinAddressFromPubkey ( pubSigningKey ) , helper_bitcoin . calculateTestnetAddressFromPubkey ( pubSigningKey ) )
)
2015-02-21 02:03:20 +00:00
sigHash = hashlib . sha512 ( hashlib . sha512 ( signature ) . digest ( ) ) . digest ( ) [ 32 : ] # Used to detect and ignore duplicate messages in our inbox
2013-11-20 06:29:37 +00:00
# calculate the fromRipe.
sha = hashlib . new ( ' sha512 ' )
sha . update ( pubSigningKey + pubEncryptionKey )
ripe = hashlib . new ( ' ripemd160 ' )
ripe . update ( sha . digest ( ) )
fromAddress = encodeAddress (
sendersAddressVersionNumber , sendersStreamNumber , ripe . digest ( ) )
2014-12-25 08:57:34 +00:00
2013-11-20 06:29:37 +00:00
# Let's store the public key in case we want to reply to this
# person.
2014-12-25 08:57:34 +00:00
sqlExecute (
''' INSERT INTO pubkeys VALUES (?,?,?,?,?) ''' ,
2015-03-09 06:35:32 +00:00
fromAddress ,
2014-12-25 08:57:34 +00:00
sendersAddressVersionNumber ,
decryptedData [ : endOfThePublicKeyPosition ] ,
int ( time . time ( ) ) ,
' yes ' )
# Check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the POW
# and send it.
2015-03-09 06:35:32 +00:00
self . possibleNewPubkey ( fromAddress )
2014-12-25 08:57:34 +00:00
2013-11-20 06:29:37 +00:00
# If this message is bound for one of my version 3 addresses (or
# higher), then we must check to make sure it meets our demanded
2014-01-17 01:10:04 +00:00
# proof of work requirement. If this is bound for one of my chan
# addresses then we skip this check; the minimum network POW is
# fine.
2017-01-11 13:27:19 +00:00
if decodeAddress ( toAddress ) [ 1 ] > = 3 and not BMConfigParser ( ) . safeGetBoolean ( toAddress , ' chan ' ) : # If the toAddress version number is 3 or higher and not one of my chan addresses:
2013-11-20 06:29:37 +00:00
if not shared . isAddressInMyAddressBookSubscriptionsListOrWhitelist ( fromAddress ) : # If I'm not friendly with this person:
2017-01-11 13:27:19 +00:00
requiredNonceTrialsPerByte = BMConfigParser ( ) . getint (
2013-11-20 06:29:37 +00:00
toAddress , ' noncetrialsperbyte ' )
2017-01-11 13:27:19 +00:00
requiredPayloadLengthExtraBytes = BMConfigParser ( ) . getint (
2013-11-20 06:29:37 +00:00
toAddress , ' payloadlengthextrabytes ' )
2017-01-11 16:00:00 +00:00
if not protocol . isProofOfWorkSufficient ( data , requiredNonceTrialsPerByte , requiredPayloadLengthExtraBytes ) :
2014-08-27 07:14:32 +00:00
logger . info ( ' Proof of work in msg is insufficient only because it does not meet our higher requirement. ' )
2013-11-20 06:29:37 +00:00
return
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
2017-01-11 13:27:19 +00:00
if BMConfigParser ( ) . get ( ' bitmessagesettings ' , ' blackwhitelist ' ) == ' black ' : # If we are using a blacklist
2013-11-20 06:29:37 +00:00
queryreturn = sqlQuery (
''' SELECT label FROM blacklist where address=? and enabled= ' 1 ' ''' ,
fromAddress )
if queryreturn != [ ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' Message ignored because address is in blacklist. ' )
2013-11-20 06:29:37 +00:00
blockMessage = True
else : # We're using a whitelist
queryreturn = sqlQuery (
''' SELECT label FROM whitelist where address=? and enabled= ' 1 ' ''' ,
fromAddress )
if queryreturn == [ ] :
2014-01-17 01:10:04 +00:00
logger . info ( ' Message ignored because address not in whitelist. ' )
2013-11-20 06:29:37 +00:00
blockMessage = True
2016-11-14 19:23:58 +00:00
2017-01-11 13:27:19 +00:00
toLabel = BMConfigParser ( ) . get ( toAddress , ' label ' )
2014-07-26 17:15:28 +00:00
if toLabel == ' ' :
toLabel = toAddress
2018-02-13 12:24:37 +00:00
try :
decodedMessage = helper_msgcoding . MsgDecode ( messageEncodingType , message )
except helper_msgcoding . MsgDecodeException :
return
2016-11-14 19:23:58 +00:00
subject = decodedMessage . subject
body = decodedMessage . body
2014-07-26 17:15:28 +00:00
# Let us make sure that we haven't already received this message
2015-02-21 02:03:20 +00:00
if helper_inbox . isMessageAlreadyInInbox ( sigHash ) :
2014-07-26 17:15:28 +00:00
logger . info ( ' This msg is already in our inbox. Ignoring it. ' )
blockMessage = True
2013-11-20 06:29:37 +00:00
if not blockMessage :
if messageEncodingType != 0 :
t = ( inventoryHash , toAddress , fromAddress , subject , int (
2015-02-21 02:03:20 +00:00
time . time ( ) ) , body , ' inbox ' , messageEncodingType , 0 , sigHash )
2013-11-20 06:29:37 +00:00
helper_inbox . insert ( t )
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' displayNewInboxMessage ' , (
2013-11-20 06:29:37 +00:00
inventoryHash , toAddress , fromAddress , subject , body ) ) )
# If we are behaving as an API then we might need to run an
# outside command to let some program know that a new message
# has arrived.
2017-01-11 13:27:19 +00:00
if BMConfigParser ( ) . safeGetBoolean ( ' bitmessagesettings ' , ' apienabled ' ) :
2013-11-20 06:29:37 +00:00
try :
2017-01-11 13:27:19 +00:00
apiNotifyPath = BMConfigParser ( ) . get (
2013-11-20 06:29:37 +00:00
' bitmessagesettings ' , ' apinotifypath ' )
except :
apiNotifyPath = ' '
if apiNotifyPath != ' ' :
call ( [ apiNotifyPath , " newMessage " ] )
# Let us now check and see whether our receiving address is
# behaving as a mailing list
2017-01-11 13:27:19 +00:00
if BMConfigParser ( ) . safeGetBoolean ( toAddress , ' mailinglist ' ) and messageEncodingType != 0 :
2013-11-20 06:29:37 +00:00
try :
2017-01-11 13:27:19 +00:00
mailingListName = BMConfigParser ( ) . get (
2013-11-20 06:29:37 +00:00
toAddress , ' mailinglistname ' )
except :
mailingListName = ' '
# Let us send out this message as a broadcast
subject = self . addMailingListNameToSubject (
subject , mailingListName )
# Let us now send this message out as a broadcast
message = time . strftime ( " %a , % Y- % m- %d % H: % M: % S UTC " , time . gmtime (
) ) + ' Message ostensibly from ' + fromAddress + ' : \n \n ' + body
fromAddress = toAddress # The fromAddress for the broadcast that we are about to send is the toAddress (my address) for the msg message we are currently processing.
2018-02-08 23:49:08 +00:00
# We don't actually need the ackdata for acknowledgement since this is a broadcast message but we can use it to update the user interface when the POW is done generating.
streamNumber = decodeAddress ( fromAddress ) [ 2 ]
2017-09-30 09:19:44 +00:00
ackdata = genAckPayload ( streamNumber , 0 )
2013-11-20 06:29:37 +00:00
toAddress = ' [Broadcast subscribers] '
ripe = ' '
2015-03-09 06:35:32 +00:00
# We really should have a discussion about how to
# set the TTL for mailing list broadcasts. This is obviously
# hard-coded.
TTL = 2 * 7 * 24 * 60 * 60 # 2 weeks
t = ( ' ' ,
toAddress ,
ripe ,
fromAddress ,
subject ,
message ,
2018-02-08 23:49:08 +00:00
ackdata ,
2015-03-09 06:35:32 +00:00
int ( time . time ( ) ) , # sentTime (this doesn't change)
int ( time . time ( ) ) , # lastActionTime
0 ,
' broadcastqueued ' ,
0 ,
' sent ' ,
2016-11-14 19:23:58 +00:00
messageEncodingType ,
2015-03-09 06:35:32 +00:00
TTL )
2013-11-20 06:29:37 +00:00
helper_sent . insert ( t )
2017-02-08 12:41:56 +00:00
queues . UISignalQueue . put ( ( ' displayNewSentMessage ' , (
2018-02-08 23:49:08 +00:00
toAddress , ' [Broadcast subscribers] ' , fromAddress , subject , message , ackdata ) ) )
2017-02-08 12:41:56 +00:00
queues . workerQueue . put ( ( ' sendbroadcast ' , ' ' ) )
2013-11-20 06:29:37 +00:00
2016-02-12 23:27:37 +00:00
# Don't send ACK if invalid, blacklisted senders, invisible messages, disabled or chan
2016-02-12 22:35:28 +00:00
if self . ackDataHasAValidHeader ( ackData ) and \
not blockMessage and \
messageEncodingType != 0 and \
2017-01-11 13:27:19 +00:00
not BMConfigParser ( ) . safeGetBoolean ( toAddress , ' dontsendack ' ) and \
not BMConfigParser ( ) . safeGetBoolean ( toAddress , ' chan ' ) :
2014-08-27 07:14:32 +00:00
shared . checkAndShareObjectWithPeers ( ackData [ 24 : ] )
2013-11-20 06:29:37 +00:00
# Display timing data
timeRequiredToAttemptToDecryptMessage = time . time (
) - messageProcessingStartTime
shared . successfullyDecryptMessageTimings . append (
timeRequiredToAttemptToDecryptMessage )
sum = 0
for item in shared . successfullyDecryptMessageTimings :
sum + = item
2014-01-17 01:10:04 +00:00
logger . debug ( ' Time to decrypt this message successfully: %s \n \
Average time for all message decryption successes since startup : % s . ' %
( timeRequiredToAttemptToDecryptMessage , sum / len ( shared . successfullyDecryptMessageTimings ) )
)
2013-11-20 06:29:37 +00:00
def processbroadcast ( self , data ) :
messageProcessingStartTime = time . time ( )
shared . numberOfBroadcastsProcessed + = 1
2017-10-19 06:39:09 +00:00
queues . UISignalQueue . put ( (
' updateNumberOfBroadcastsProcessed ' , ' no data ' ) )
2013-11-20 06:29:37 +00:00
inventoryHash = calculateInventoryHash ( data )
2014-08-27 07:14:32 +00:00
readPosition = 20 # bypass the nonce, time, and object type
2013-11-20 06:29:37 +00:00
broadcastVersion , broadcastVersionLength = decodeVarint (
data [ readPosition : readPosition + 9 ] )
readPosition + = broadcastVersionLength
2014-12-25 08:57:34 +00:00
if broadcastVersion < 4 or broadcastVersion > 5 :
logger . info ( ' Cannot decode incoming broadcast versions less than 4 or higher than 5. Assuming the sender isn \' t being silly, you should upgrade Bitmessage because this message shall be ignored. ' )
2013-11-20 06:29:37 +00:00
return
2014-12-25 08:57:34 +00:00
cleartextStreamNumber , cleartextStreamNumberLength = decodeVarint (
data [ readPosition : readPosition + 10 ] )
readPosition + = cleartextStreamNumberLength
if broadcastVersion == 4 :
2014-08-27 07:14:32 +00:00
"""
2014-12-25 08:57:34 +00:00
v4 broadcasts are encrypted the same way the msgs are encrypted . To see if we are interested in a
v4 broadcast , we try to decrypt it . This was replaced with v5 broadcasts which include a tag which
2014-08-27 07:14:32 +00:00
we check instead , just like we do with v4 pubkeys .
"""
2014-12-25 08:57:34 +00:00
signedData = data [ 8 : readPosition ]
2013-11-20 06:29:37 +00:00
initialDecryptionSuccessful = False
2017-11-30 19:08:14 +00:00
for key , cryptorObject in sorted ( shared . MyECSubscriptionCryptorObjects . items ( ) , key = lambda x : random . random ( ) ) :
2013-11-20 06:29:37 +00:00