Bitmessage Protocol Version Three

This commit is contained in:
Jonathan Warren 2014-08-27 03:14:32 -04:00
parent 111d60dde8
commit c306062282
20 changed files with 990 additions and 959 deletions

View File

@ -68,23 +68,47 @@ def encodeVarint(integer):
if integer >= 18446744073709551616:
print 'varint cannot be >= 18446744073709551616'
raise SystemExit
class varintDecodeError(Exception):
pass
def decodeVarint(data):
"""
Decodes an encoded varint to an integer and returns it.
Per protocol v3, the encoded value must be encoded with
the minimum amount of data possible or else it is malformed.
"""
if len(data) == 0:
return (0,0)
firstByte, = unpack('>B',data[0:1])
if firstByte < 253:
# encodes 0 to 252
return (firstByte,1) #the 1 is the length of the varint
if firstByte == 253:
a, = unpack('>H',data[1:3])
return (a,3)
# encodes 253 to 65535
if len(data) < 3:
raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 3.' % (firstByte, len(data)))
encodedValue, = unpack('>H',data[1:3])
if encodedValue < 253:
raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.')
return (encodedValue,3)
if firstByte == 254:
a, = unpack('>I',data[1:5])
return (a,5)
# encodes 65536 to 4294967295
if len(data) < 5:
raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 5.' % (firstByte, len(data)))
encodedValue, = unpack('>I',data[1:5])
if encodedValue < 65536:
raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.')
return (encodedValue,5)
if firstByte == 255:
a, = unpack('>Q',data[1:9])
return (a,9)
# encodes 4294967296 to 18446744073709551615
if len(data) < 9:
raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 9.' % (firstByte, len(data)))
encodedValue, = unpack('>Q',data[1:9])
if encodedValue < 4294967296:
raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.')
return (encodedValue,9)
def calculateInventoryHash(data):
@ -163,7 +187,12 @@ def decodeAddress(address):
#else:
# print 'checksum PASSED'
addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9])
try:
addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9])
except varintDecodeError as e:
print e
status = 'varintmalformed'
return status,0,0,""
#print 'addressVersionNumber', addressVersionNumber
#print 'bytesUsedByVersionNumber', bytesUsedByVersionNumber
@ -176,32 +205,42 @@ def decodeAddress(address):
status = 'versiontoohigh'
return status,0,0,""
streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:])
try:
streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:])
except varintDecodeError as e:
print e
status = 'varintmalformed'
return status,0,0,""
#print streamNumber
status = 'success'
if addressVersionNumber == 1:
return status,addressVersionNumber,streamNumber,data[-24:-4]
elif addressVersionNumber == 2 or addressVersionNumber == 3:
if len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 19:
return status,addressVersionNumber,streamNumber,'\x00'+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 20:
return status,addressVersionNumber,streamNumber,data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 18:
return status,addressVersionNumber,streamNumber,'\x00\x00'+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) < 18:
embeddedRipeData = data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
if len(embeddedRipeData) == 19:
return status,addressVersionNumber,streamNumber,'\x00'+embeddedRipeData
elif len(embeddedRipeData) == 20:
return status,addressVersionNumber,streamNumber,embeddedRipeData
elif len(embeddedRipeData) == 18:
return status,addressVersionNumber,streamNumber,'\x00\x00'+embeddedRipeData
elif len(embeddedRipeData) < 18:
return 'ripetooshort',0,0,""
elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) > 20:
elif len(embeddedRipeData) > 20:
return 'ripetoolong',0,0,""
else:
return 'otherproblem',0,0,""
elif addressVersionNumber == 4:
if len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) > 20:
embeddedRipeData = data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
if embeddedRipeData[0:1] == '\x00':
# In order to enforce address non-malleability, encoded RIPE data must have NULL bytes removed from the front
return 'encodingproblem',0,0,""
elif len(embeddedRipeData) > 20:
return 'ripetoolong',0,0,""
elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) < 4:
elif len(embeddedRipeData) < 4:
return 'ripetooshort',0,0,""
else:
x00string = '\x00' * (20 - len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]))
return status,addressVersionNumber,streamNumber,x00string+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]
x00string = '\x00' * (20 - len(embeddedRipeData))
return status,addressVersionNumber,streamNumber,x00string+embeddedRipeData
def addBMIfNotPresent(address):
address = str(address).strip()

View File

@ -17,7 +17,7 @@ import json
import shared
import time
from addresses import decodeAddress,addBMIfNotPresent,decodeVarint,calculateInventoryHash
from addresses import decodeAddress,addBMIfNotPresent,decodeVarint,calculateInventoryHash,varintDecodeError
import helper_inbox
import helper_sent
import hashlib
@ -139,6 +139,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
raise APIError(9, 'Invalid characters in address: ' + address)
if status == 'versiontoohigh':
raise APIError(10, 'Address version number too high (or zero) in address: ' + address)
if status == 'varintmalformed':
raise APIError(26, 'Malformed varint in address: ' + address)
raise APIError(7, 'Could not decode address: ' + address + ' : ' + status)
if addressVersionNumber < 2 or addressVersionNumber > 4:
raise APIError(11, 'The address version number currently must be 2, 3 or 4. Others aren\'t supported. Check the address.')
@ -777,9 +779,10 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
encryptedPayload = pack('>Q', nonce) + encryptedPayload
toStreamNumber = decodeVarint(encryptedPayload[16:26])[0]
inventoryHash = calculateInventoryHash(encryptedPayload)
objectType = 'msg'
objectType = 2
TTL = 2.5 * 24 * 60 * 60
shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time()),'')
objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'')
shared.inventorySets[toStreamNumber].add(inventoryHash)
with shared.printLock:
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
@ -814,10 +817,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
pubkeyReadPosition += addressVersionLength
pubkeyStreamNumber = decodeVarint(payload[pubkeyReadPosition:pubkeyReadPosition+10])[0]
inventoryHash = calculateInventoryHash(payload)
objectType = 'pubkey'
objectType = 1
#todo: support v4 pubkeys
TTL = 28 * 24 * 60 * 60
shared.inventory[inventoryHash] = (
objectType, pubkeyStreamNumber, payload, int(time.time()),'')
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'')
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
with shared.printLock:
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
@ -839,7 +843,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
# use it we'll need to fill out a field in our inventory database
# which is blank by default (first20bytesofencryptedmessage).
queryreturn = sqlQuery(
'''SELECT hash, payload FROM inventory WHERE tag = '' and objecttype = 'msg' ; ''')
'''SELECT hash, payload FROM inventory WHERE tag = '' and objecttype = 2 ; ''')
with SqlBulkExecute() as sql:
for row in queryreturn:
hash01, payload = row
@ -906,6 +910,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
return self._handle_request(method, params)
except APIError as e:
return str(e)
except varintDecodeError as e:
logger.error(e)
return "Data contains a malformed varint. Some details: %s" % e
except Exception as e:
logger.exception(e)
return "API Error 0021: Unexpected API Failure - %s" % str(e)

View File

@ -742,6 +742,8 @@ def sendMessage(sender="", recv="", broadcast=None, subject="", body="", reply=F
err += "Some data encoded in the address is too short. There might be something wrong with the software of your acquaintance."
elif status == "ripetoolong":
err += "Some data encoded in the address is too long. There might be something wrong with the software of your acquaintance."
elif status == "varintmalformed":
err += "Some data encoded in the address is malformed. There might be something wrong with the software of your acquaintance."
else:
err += "It is unknown what is wrong with the address."
d.scrollbox(unicode(err), exit_label="Continue")

View File

@ -42,6 +42,8 @@ from api import MySimpleXMLRPCRequestHandler
from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections
import shared
import helper_startup
helper_startup.loadConfig()
from helper_sql import sqlQuery
import threading
@ -154,9 +156,9 @@ selfInitiatedConnections = {}
if shared.useVeryEasyProofOfWorkForTesting:
shared.networkDefaultProofOfWorkNonceTrialsPerByte = int(
shared.networkDefaultProofOfWorkNonceTrialsPerByte / 16)
shared.networkDefaultProofOfWorkNonceTrialsPerByte / 100)
shared.networkDefaultPayloadLengthExtraBytes = int(
shared.networkDefaultPayloadLengthExtraBytes / 7000)
shared.networkDefaultPayloadLengthExtraBytes / 100)
class Main:
def start(self, daemon=False):

View File

@ -1842,6 +1842,17 @@ class MyForm(QtGui.QMainWindow):
subject = str(self.ui.lineEditSubject.text().toUtf8())
message = str(
self.ui.textEditMessage.document().toPlainText().toUtf8())
"""
The whole network message must fit in 2^18 bytes. Let's assume 500
bytes of overhead. If someone wants to get that too an exact
number you are welcome to but I think that it would be a better
use of time to support message continuation so that users can
send messages of any length.
"""
if len(message) > (2 ** 18 - 500):
QMessageBox.about(self, _translate("MainWindow", "Message too long"), _translate(
"MainWindow", "The message that you are trying to send is too long by %1 bytes. (The maximum is 261644 bytes). Please cut it down before sending.").arg(len(message) - (2 ** 18 - 500)))
return
if self.ui.radioButtonSpecific.isChecked(): # To send a message to specific people (rather than broadcast)
toAddressesList = [s.strip()
for s in toAddresses.replace(',', ';').split(';')]
@ -1873,6 +1884,9 @@ class MyForm(QtGui.QMainWindow):
elif status == 'ripetoolong':
self.statusBar().showMessage(_translate(
"MainWindow", "Error: Some data encoded in the address %1 is too long. There might be something wrong with the software of your acquaintance.").arg(toAddress))
elif status == 'varintmalformed':
self.statusBar().showMessage(_translate(
"MainWindow", "Error: Some data encoded in the address %1 is malformed. There might be something wrong with the software of your acquaintance.").arg(toAddress))
else:
self.statusBar().showMessage(_translate(
"MainWindow", "Error: Something is wrong with the address %1.").arg(toAddress))
@ -2211,10 +2225,10 @@ class MyForm(QtGui.QMainWindow):
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
tag = doubleHashOfAddressData[32:]
queryreturn = sqlQuery(
'''select payload from inventory where objecttype='broadcast' and tag=?''', tag)
'''select payload from inventory where objecttype=3 and tag=?''', tag)
for row in queryreturn:
payload, = row
objectType = 'broadcast'
objectType = 3
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(payload)
shared.objectProcessorQueue.put((objectType,payload))
@ -3620,6 +3634,9 @@ class AddAddressDialog(QtGui.QDialog):
elif status == 'ripetoolong':
self.ui.labelAddressCheck.setText(_translate(
"MainWindow", "Some data encoded in the address is too long."))
elif status == 'varintmalformed':
self.ui.labelAddressCheck.setText(_translate(
"MainWindow", "Some data encoded in the address is malformed."))
elif status == 'success':
self.ui.labelAddressCheck.setText(
_translate("MainWindow", "Address is valid."))
@ -3658,6 +3675,9 @@ class NewSubscriptionDialog(QtGui.QDialog):
elif status == 'ripetoolong':
self.ui.labelAddressCheck.setText(_translate(
"MainWindow", "Some data encoded in the address is too long."))
elif status == 'varintmalformed':
self.ui.labelAddressCheck.setText(_translate(
"MainWindow", "Some data encoded in the address is malformed."))
elif status == 'success':
self.ui.labelAddressCheck.setText(
_translate("MainWindow", "Address is valid."))
@ -3670,7 +3690,7 @@ class NewSubscriptionDialog(QtGui.QDialog):
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
tag = doubleHashOfAddressData[32:]
queryreturn = sqlQuery(
'''select hash from inventory where objecttype='broadcast' and tag=?''', tag)
'''select hash from inventory where objecttype=3 and tag=?''', tag)
if len(queryreturn) == 0:
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
_translate("MainWindow", "There are no recent broadcasts from this address to display."))

View File

@ -1,19 +1,19 @@
from setuptools import setup
name = "Bitmessage"
version = "0.4.2"
version = "0.4.4"
mainscript = ["bitmessagemain.py"]
setup(
name = name,
version = version,
app = mainscript,
setup_requires = ["py2app"],
options = dict(
py2app = dict(
resources = ["images", "translations"],
includes = ['sip', 'PyQt4._qt'],
iconfile = "images/bitmessage.icns"
)
)
name = name,
version = version,
app = mainscript,
setup_requires = ["py2app"],
options = dict(
py2app = dict(
resources = ["images", "translations"],
includes = ['sip', 'PyQt4._qt'],
iconfile = "images/bitmessage.icns"
)
)
)

View File

@ -7,8 +7,9 @@ from struct import unpack, pack
import sys
import string
from subprocess import call # used when the API must execute an outside program
from pyelliptic.openssl import OpenSSL
import traceback
from pyelliptic.openssl import OpenSSL
import highlevelcrypto
from addresses import *
import helper_generic
@ -51,18 +52,23 @@ class objectProcessor(threading.Thread):
while True:
objectType, data = shared.objectProcessorQueue.get()
if objectType == 'getpubkey':
self.processgetpubkey(data)
elif objectType == 'pubkey':
self.processpubkey(data)
elif objectType == 'msg':
self.processmsg(data)
elif objectType == 'broadcast':
self.processbroadcast(data)
elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable.
pass
else:
logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType))
try:
if objectType == 0: # getpubkey
self.processgetpubkey(data)
elif objectType == 1: #pubkey
self.processpubkey(data)
elif objectType == 2: #msg
self.processmsg(data)
elif objectType == 3: #broadcast
self.processbroadcast(data)
elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable.
pass
else:
logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType))
except varintDecodeError as e:
logger.debug("There was a problem with a varint while processing an object. Some details: %s" % e)
except Exception as e:
logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc())
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
@ -83,17 +89,7 @@ class objectProcessor(threading.Thread):
break
def processgetpubkey(self, data):
readPosition = 8 # bypass the nonce
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8])
readPosition += 8
else:
readPosition += 4
readPosition = 20 # bypass the nonce, time, and object type
requestedAddressVersionNumber, addressVersionLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += addressVersionLength
@ -147,7 +143,7 @@ class objectProcessor(threading.Thread):
myAddress, 'lastpubkeysendtime'))
except:
lastPubkeySendTime = 0
if lastPubkeySendTime > time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was more recent than 28 days ago...
if lastPubkeySendTime > time.time() - 2419200: # If the last time we sent our pubkey was more recent than 28 days ago...
logger.info('Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s' % lastPubkeySendTime)
return
logger.info('Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.')
@ -166,17 +162,8 @@ class objectProcessor(threading.Thread):
shared.numberOfPubkeysProcessed += 1
shared.UISignalQueue.put((
'updateNumberOfPubkeysProcessed', 'no data'))
readPosition = 8 # bypass the nonce
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8])
readPosition += 8
else:
readPosition += 4
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass the nonce, time, and object type
addressVersion, varintLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += varintLength
@ -225,15 +212,25 @@ class objectProcessor(threading.Thread):
queryreturn = sqlQuery(
'''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion)
"""
With the changes in protocol v3, we have to be careful to store pubkey data
in the database the same way we did before to maintain backwards compatibility
with what is in people's databases already. This means that for v2 keys, we
must store the nonce, the time, and then everything else starting with the
address version.
"""
dataToStore = '\x00' * 8 # fake nonce
dataToStore += data[8:16] # the time
dataToStore += data[20:] # everything else
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
logger.info('We HAVE used this pubkey personally. Updating time.')
t = (ripe, addressVersion, data, embeddedTime, 'yes')
t = (ripe, addressVersion, dataToStore, int(time.time()), 'yes')
else:
logger.info('We have NOT used this pubkey personally. Inserting in database.')
t = (ripe, addressVersion, data, embeddedTime, 'no')
# This will also update the embeddedTime.
t = (ripe, addressVersion, dataToStore, int(time.time()), 'no')
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
# shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe)))
self.possibleNewPubkey(ripe = ripe)
if addressVersion == 3:
if len(data) < 170: # sanity check.
@ -242,9 +239,6 @@ class objectProcessor(threading.Thread):
bitfieldBehaviors = data[readPosition:readPosition + 4]
readPosition += 4
publicSigningKey = '\x04' + data[readPosition:readPosition + 64]
# Is it possible for a public key to be invalid such that trying to
# encrypt or sign with it will cause an error? If it is, we should
# probably test these keys here.
readPosition += 64
publicEncryptionKey = '\x04' + data[readPosition:readPosition + 64]
readPosition += 64
@ -259,14 +253,32 @@ class objectProcessor(threading.Thread):
data[readPosition:readPosition + 10])
readPosition += signatureLengthLength
signature = data[readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(data[8:endOfSignedDataPosition], signature, publicSigningKey.encode('hex')):
logger.warning('ECDSA verify failed (within processpubkey)')
"""
With the changes in protocol v3, to maintain backwards compatibility, signatures will be sent
the 'old' way during an upgrade period and then a 'new' simpler way after that. We will therefore
check the sig both ways.
Old way:
signedData = timePubkeyWasSigned(4 bytes) + addressVersion through extra_bytes
New way:
signedData = all of the payload data, from the time down through the extra_bytes
The timePubkeyWasSigned will be calculated by subtracting 28 days form the embedded expiresTime.
"""
expiresTime, = unpack('>Q', data[8:16])
TTL = 28 * 24 * 60 * 60
signedData = pack('>I', (expiresTime - TTL)) # the time that the pubkey was signed. 4 bytes.
signedData += data[20:endOfSignedDataPosition] # the address version down through the payloadLengthExtraBytes
if highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')):
logger.info('ECDSA verify passed (within processpubkey, old method)')
else:
logger.warning('ECDSA verify failed (within processpubkey, old method)')
# let us try the newer signature method
if highlevelcrypto.verify(data[8:endOfSignedDataPosition], signature, publicSigningKey.encode('hex')):
logger.info('ECDSA verify passed (within processpubkey, new method)')
else:
logger.warning('ECDSA verify failed (within processpubkey, new method)')
return
logger.info('ECDSA verify passed (within processpubkey)')
except Exception as err:
logger.warning('ECDSA verify failed (within processpubkey) %s' % err)
return
sha = hashlib.new('sha512')
sha.update(publicSigningKey + publicEncryptionKey)
@ -286,109 +298,45 @@ class objectProcessor(threading.Thread):
)
)
"""
With the changes in protocol v3, we have to be careful to store pubkey data
in the database the same way we did before to maintain backwards compatibility
with what is in people's databases already. This means that for v3 keys, we
must store the nonce, the time, and then everything else starting with the
address version.
"""
dataToStore = '\x00' * 8 # fake nonce
dataToStore += data[8:16] # the time
dataToStore += data[20:] # everything else
queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion)
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
logger.info('We HAVE used this pubkey personally. Updating time.')
t = (ripe, addressVersion, data, embeddedTime, 'yes')
t = (ripe, addressVersion, dataToStore, int(time.time()), 'yes')
else:
logger.info('We have NOT used this pubkey personally. Inserting in database.')
t = (ripe, addressVersion, data, embeddedTime, 'no')
# This will also update the embeddedTime.
t = (ripe, addressVersion, dataToStore, int(time.time()), 'no')
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
self.possibleNewPubkey(ripe = ripe)
if addressVersion == 4:
"""
There exist a function: shared.decryptAndCheckPubkeyPayload which does something almost
the same as this section of code. There are differences, however; one being that
decryptAndCheckPubkeyPayload requires that a cryptor object be created each time it is
run which is an expensive operation. This, on the other hand, keeps them saved in
the shared.neededPubkeys dictionary so that if an attacker sends us many
incorrectly-tagged pubkeys, which would force us to try to decrypt them, this code
would run and handle that event quite quickly.
"""
if len(data) < 350: # sanity check.
logger.debug('(within processpubkey) payloadLength less than 350. Sanity check failed.')
return
signedData = data[8:readPosition] # Some of the signed data is not encrypted so let's keep it for now.
tag = data[readPosition:readPosition + 32]
readPosition += 32
encryptedData = data[readPosition:]
if tag not in shared.neededPubkeys:
logger.info('We don\'t need this v4 pubkey. We didn\'t ask for it.')
return
# Let us try to decrypt the pubkey
cryptorObject = shared.neededPubkeys[tag]
try:
decryptedData = cryptorObject.decrypt(encryptedData)
except:
# Someone must have encrypted some data with a different key
# but tagged it with a tag for which we are watching.
logger.info('Pubkey decryption was unsuccessful.')
return
readPosition = 0
bitfieldBehaviors = decryptedData[readPosition:readPosition + 4]
readPosition += 4
publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64]
# Is it possible for a public key to be invalid such that trying to
# encrypt or check a sig with it will cause an error? If it is, we
# should probably test these keys here.
readPosition += 64
publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64]
readPosition += 64
specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedPayloadLengthExtraBytesLength
signedData += decryptedData[:readPosition]
signatureLength, signatureLengthLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += signatureLengthLength
signature = decryptedData[readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')):
logger.info('ECDSA verify failed (within processpubkey)')
return
logger.info('ECDSA verify passed (within processpubkey)')
except Exception as err:
logger.info('ECDSA verify failed (within processpubkey) %s' % err)
return
sha = hashlib.new('sha512')
sha.update(publicSigningKey + publicEncryptionKey)
ripeHasher = hashlib.new('ripemd160')
ripeHasher.update(sha.digest())
ripe = ripeHasher.digest()
# We need to make sure that the tag on the outside of the encryption
# is the one generated from hashing these particular keys.
if tag != hashlib.sha512(hashlib.sha512(encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:]:
logger.info('Someone was trying to act malicious: tag doesn\'t match the keys in this pubkey message. Ignoring it.')
return
logger.info('within recpubkey, addressVersion: %s, streamNumber: %s \n\
ripe %s\n\
publicSigningKey in hex: %s\n\
publicEncryptionKey in hex: %s' % (addressVersion,
streamNumber,
ripe.encode('hex'),
publicSigningKey.encode('hex'),
publicEncryptionKey.encode('hex')
)
)
t = (ripe, addressVersion, signedData, embeddedTime, 'yes')
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
fromAddress = encodeAddress(addressVersion, streamNumber, ripe)
# That this point we know that we have been waiting on this pubkey.
# This function will command the workerThread to start work on
# the messages that require it.
self.possibleNewPubkey(address = fromAddress)
# Let us try to decrypt the pubkey
toAddress, cryptorObject = shared.neededPubkeys[tag]
if shared.decryptAndCheckPubkeyPayload(data, toAddress) == 'successful':
# At this point we know that we have been waiting on this pubkey.
# This function will command the workerThread to start work on
# the messages that require it.
self.possibleNewPubkey(address=toAddress)
# Display timing data
timeRequiredToProcessPubkey = time.time(
@ -401,28 +349,28 @@ class objectProcessor(threading.Thread):
shared.numberOfMessagesProcessed += 1
shared.UISignalQueue.put((
'updateNumberOfMessagesProcessed', 'no data'))
readPosition = 8 # bypass the nonce
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8])
readPosition += 8
else:
readPosition += 4
readPosition = 20 # bypass the nonce, time, and object type
"""
In protocol v2, the next byte(s) was the streamNumber. But starting after
the protocol v3 upgrade period, the next byte(s) will be a msg version
number followed by the streamNumber.
"""
#msgVersionOutsideEncryption, msgVersionOutsideEncryptionLength = decodeVarint(data[readPosition:readPosition + 9])
#readPosition += msgVersionOutsideEncryptionLength
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += streamNumberAsClaimedByMsgLength
inventoryHash = calculateInventoryHash(data)
initialDecryptionSuccessful = False
# Let's check whether this is a message acknowledgement bound for us.
if data[readPosition:] in shared.ackdataForWhichImWatching:
if data[-32:] in shared.ackdataForWhichImWatching:
logger.info('This msg IS an acknowledgement bound for me.')
del shared.ackdataForWhichImWatching[data[readPosition:]]
del shared.ackdataForWhichImWatching[data[-32:]]
sqlExecute('UPDATE sent SET status=? WHERE ackdata=?',
'ackreceived', data[readPosition:])
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (data[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(l10n.formatTimestamp()))))
'ackreceived', data[-32:])
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (data[-32:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(l10n.formatTimestamp()))))
return
else:
logger.info('This was NOT an acknowledgement bound for me.')
@ -430,17 +378,35 @@ class objectProcessor(threading.Thread):
# This is not an acknowledgement bound for me. See if it is a message
# bound for me by trying to decrypt it with my private keys.
# This can be simplified quite a bit after 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT
for key, cryptorObject in shared.myECCryptorObjects.items():
try:
decryptedData = cryptorObject.decrypt(
data[readPosition:])
decryptedData = cryptorObject.decrypt(data[readPosition:])
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
logger.info('EC decryption successful using key associated with ripe hash: %s' % key.encode('hex'))
logger.info('EC decryption successful using key associated with ripe hash: %s. msg did NOT specify version.' % key.encode('hex'))
# We didn't bypass a msg version above as it is commented out.
# But the decryption was successful. Which means that there
# wasn't a msg version byte include in this msg.
msgObjectContainedVersion = False
break
except Exception as err:
pass
# print 'cryptorObject.decrypt Exception:', err
# What if a client sent us a msg with
# a msg version included? We didn't bypass it above. So
# let's try to decrypt the msg assuming that it is present.
try:
decryptedData = cryptorObject.decrypt(data[readPosition+1:]) # notice that we offset by 1 byte compared to the attempt above.
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
logger.info('EC decryption successful using key associated with ripe hash: %s. msg DID specifiy version.' % key.encode('hex'))
# There IS a msg version byte include in this msg.
msgObjectContainedVersion = True
break
except Exception as err:
pass
if not initialDecryptionSuccessful:
# This is not a message bound for me.
logger.info('Length of time program spent failing to decrypt this message: %s seconds.' % (time.time() - messageProcessingStartTime,))
@ -450,12 +416,15 @@ class objectProcessor(threading.Thread):
toAddress = shared.myAddressesByHash[
toRipe] # Look up my address based on the RIPE hash.
readPosition = 0
messageVersion, messageVersionLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += messageVersionLength
if messageVersion != 1:
logger.info('Cannot understand message versions other than one. Ignoring message.')
return
if not msgObjectContainedVersion: # by which I mean "if the msg object didn't have the msg version outside of the encryption". This confusingness will be removed after the protocol v3 upgrade period.
messageVersionWithinEncryption, messageVersionWithinEncryptionLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += messageVersionWithinEncryptionLength
if messageVersionWithinEncryption != 1:
logger.info('Cannot understand message versions other than one. Ignoring message.')
return
else:
messageVersionWithinEncryptionLength = 0 # This variable can disappear after the protocol v3 upgrade period is complete.
sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += sendersAddressVersionNumberLength
@ -520,14 +489,17 @@ class objectProcessor(threading.Thread):
readPosition += signatureLengthLength
signature = decryptedData[
readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(decryptedData[:positionOfBottomOfAckData], signature, pubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
except Exception as err:
logger.debug('ECDSA verify failed %s' % err)
if not msgObjectContainedVersion:
# protocol v2. This can be removed after the end of the protocol v3 upgrade period.
signedData = decryptedData[:positionOfBottomOfAckData]
else:
# protocol v3
signedData = data[8:20] + encodeVarint(1) + encodeVarint(streamNumberAsClaimedByMsg) + decryptedData[:positionOfBottomOfAckData]
if not highlevelcrypto.verify(signedData, signature, pubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' %
(helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey))
)
@ -546,7 +518,7 @@ class objectProcessor(threading.Thread):
'''INSERT INTO pubkeys VALUES (?,?,?,?,?)''',
ripe.digest(),
sendersAddressVersionNumber,
'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionLength:endOfThePublicKeyPosition],
'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionWithinEncryptionLength:endOfThePublicKeyPosition],
int(time.time()),
'yes')
# This will check to see whether we happen to be awaiting this
@ -558,7 +530,7 @@ class objectProcessor(threading.Thread):
'''INSERT INTO pubkeys VALUES (?,?,?,?,?)''',
ripe.digest(),
sendersAddressVersionNumber,
'\x00\x00\x00\x00\x00\x00\x00\x01' + decryptedData[messageVersionLength:endOfThePublicKeyPosition],
'\x00\x00\x00\x00\x00\x00\x00\x01' + decryptedData[messageVersionWithinEncryptionLength:endOfThePublicKeyPosition],
int(time.time()),
'yes')
# This will check to see whether we happen to be awaiting this
@ -577,7 +549,7 @@ class objectProcessor(threading.Thread):
requiredPayloadLengthExtraBytes = shared.config.getint(
toAddress, 'payloadlengthextrabytes')
if not shared.isProofOfWorkSufficient(data, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes):
print 'Proof of work in msg message insufficient only because it does not meet our higher requirement.'
logger.info('Proof of work in msg is insufficient only because it does not meet our higher requirement.')
return
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist
@ -667,14 +639,7 @@ class objectProcessor(threading.Thread):
shared.workerQueue.put(('sendbroadcast', ''))
if self.ackDataHasAVaildHeader(ackData):
if ackData[4:16] == addDataPadding('getpubkey'):
shared.checkAndSharegetpubkeyWithPeers(ackData[24:])
elif ackData[4:16] == addDataPadding('pubkey'):
shared.checkAndSharePubkeyWithPeers(ackData[24:])
elif ackData[4:16] == addDataPadding('msg'):
shared.checkAndShareMsgWithPeers(ackData[24:])
elif ackData[4:16] == addDataPadding('broadcast'):
shared.checkAndShareBroadcastWithPeers(ackData[24:])
shared.checkAndShareObjectWithPeers(ackData[24:])
# Display timing data
timeRequiredToAttemptToDecryptMessage = time.time(
@ -696,155 +661,26 @@ class objectProcessor(threading.Thread):
shared.UISignalQueue.put((
'updateNumberOfBroadcastsProcessed', 'no data'))
inventoryHash = calculateInventoryHash(data)
readPosition = 8 # bypass the nonce
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
# This section is used for the transition from 32 bit time to 64 bit
# time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8])
readPosition += 8
else:
readPosition += 4
readPosition = 20 # bypass the nonce, time, and object type
broadcastVersion, broadcastVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += broadcastVersionLength
if broadcastVersion < 1 or broadcastVersion > 3:
logger.debug('Cannot decode incoming broadcast versions higher than 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
if broadcastVersion < 1 or broadcastVersion > 5:
logger.info('Cannot decode incoming broadcast versions higher than 5. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
return
if broadcastVersion == 1:
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
if sendersAddressVersion <= 1 or sendersAddressVersion >= 3:
# Cannot decode senderAddressVersion higher than 2. Assuming
# the sender isn\'t being silly, you should upgrade Bitmessage
# because this message shall be ignored.
return
readPosition += sendersAddressVersionLength
if sendersAddressVersion == 2:
sendersStream, sendersStreamLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += sendersStreamLength
behaviorBitfield = data[readPosition:readPosition + 4]
readPosition += 4
sendersPubSigningKey = '\x04' + \
data[readPosition:readPosition + 64]
readPosition += 64
sendersPubEncryptionKey = '\x04' + \
data[readPosition:readPosition + 64]
readPosition += 64
endOfPubkeyPosition = readPosition
sendersHash = data[readPosition:readPosition + 20]
if sendersHash not in shared.broadcastSendersForWhichImWatching:
# Display timing data
logger.debug('Time spent deciding that we are not interested in this v1 broadcast: %s' % (time.time() - messageProcessingStartTime,))
return
# At this point, this message claims to be from sendersHash and
# we are interested in it. We still have to hash the public key
# to make sure it is truly the key that matches the hash, and
# also check the signiture.
readPosition += 20
sha = hashlib.new('sha512')
sha.update(sendersPubSigningKey + sendersPubEncryptionKey)
ripe = hashlib.new('ripemd160')
ripe.update(sha.digest())
if ripe.digest() != sendersHash:
# The sender of this message lied.
return
messageEncodingType, messageEncodingTypeLength = decodeVarint(
data[readPosition:readPosition + 9])
if messageEncodingType == 0:
return
readPosition += messageEncodingTypeLength
messageLength, messageLengthLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += messageLengthLength
message = data[readPosition:readPosition + messageLength]
readPosition += messageLength
readPositionAtBottomOfMessage = readPosition
signatureLength, signatureLengthLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += signatureLengthLength
signature = data[readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(data[12:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
except Exception as err:
logger.debug('ECDSA verify failed %s' % err)
return
# verify passed
fromAddress = encodeAddress(
sendersAddressVersion, sendersStream, ripe.digest())
logger.debug('fromAddress: %s' % fromAddress)
# Let's store the public key in case we want to reply to this person.
# We don't have the correct nonce or time (which would let us
# send out a pubkey message) so we'll just fill it with 1's. We
# won't be able to send this pubkey to others (without doing
# the proof of work ourselves, which this program is programmed
# to not do.)
sqlExecute(
'''INSERT INTO pubkeys VALUES (?,?,?,?,?)''',
ripe.digest(),
sendersAddressVersion,
'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[beginningOfPubkeyPosition:endOfPubkeyPosition],
int(time.time()),
'yes')
# This will check to see whether we happen to be awaiting this
# pubkey in order to send a message. If we are, it will do the
# POW and send it.
self.possibleNewPubkey(ripe=ripe.digest())
if messageEncodingType == 2:
subject, body = decodeType2Message(message)
logger.info('Broadcast subject (first 100 characters): %s' % repr(subject)[:100])
elif messageEncodingType == 1:
body = message
subject = ''
elif messageEncodingType == 0:
logger.debug('messageEncodingType == 0. Doing nothing with the message.')
else:
body = 'Unknown encoding type.\n\n' + repr(message)
subject = ''
toAddress = '[Broadcast subscribers]'
if messageEncodingType != 0:
# Let us make sure that we haven't already received this message
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
logger.info('This broadcast is already in our inbox. Ignoring it.')
else:
t = (inventoryHash, toAddress, fromAddress, subject, int(
time.time()), body, 'inbox', messageEncodingType, 0)
helper_inbox.insert(t)
shared.UISignalQueue.put(('displayNewInboxMessage', (
inventoryHash, toAddress, fromAddress, subject, body)))
# If we are behaving as an API then we might need to run an
# outside command to let some program know that a new
# message has arrived.
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
try:
apiNotifyPath = shared.config.get(
'bitmessagesettings', 'apinotifypath')
except:
apiNotifyPath = ''
if apiNotifyPath != '':
call([apiNotifyPath, "newBroadcast"])
# Display timing data
logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
if broadcastVersion == 2:
logger.info('Version 1 broadcasts are no longer supported. Not processing it at all.')
if broadcastVersion in [2,4]:
"""
v2 or v4 broadcasts are encrypted the same way the msgs were encrypted. To see if we are interested in a
v2 broadcast, we try to decrypt it. This was replaced with v3 (and later v5) broadcasts which include a tag which
we check instead, just like we do with v4 pubkeys.
v2 and v3 broadcasts should be completely obsolete after the protocol v3 upgrade period and some code can be simplified.
"""
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += cleartextStreamNumberLength
signedData = data[8:readPosition] # This doesn't end up being used if the broadcastVersion is 2
initialDecryptionSuccessful = False
for key, cryptorObject in shared.MyECSubscriptionCryptorObjects.items():
try:
@ -862,9 +698,13 @@ class objectProcessor(threading.Thread):
return
# At this point this is a broadcast I have decrypted and thus am
# interested in.
signedBroadcastVersion, readPosition = decodeVarint(
decryptedData[:10])
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table
readPosition = 0
if broadcastVersion == 2:
signedBroadcastVersion, signedBroadcastVersionLength = decodeVarint(
decryptedData[:10])
readPosition += signedBroadcastVersionLength
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table. This variable can be disposed of after the protocol v3 upgrade period because it will necessarily be at the beginning of the decryptedData; ie it will definitely equal 0
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
decryptedData[readPosition:readPosition + 9])
if sendersAddressVersion < 2 or sendersAddressVersion > 3:
@ -920,14 +760,14 @@ class objectProcessor(threading.Thread):
readPosition += signatureLengthLength
signature = decryptedData[
readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
except Exception as err:
logger.debug('ECDSA verify failed %s' % err)
if broadcastVersion == 2: # this can be removed after the protocol v3 upgrade period
signedData = decryptedData[:readPositionAtBottomOfMessage]
else:
signedData += decryptedData[:readPositionAtBottomOfMessage]
if not highlevelcrypto.verify(signedData, signature, sendersPubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
# verify passed
# Let's store the public key in case we want to reply to this
@ -988,7 +828,8 @@ class objectProcessor(threading.Thread):
# Display timing data
logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
if broadcastVersion == 3:
if broadcastVersion in [3,5]:
# broadcast version 3 should be completely obsolete after the end of the protocol v3 upgrade period
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += cleartextStreamNumberLength
@ -998,21 +839,28 @@ class objectProcessor(threading.Thread):
logger.debug('We\'re not interested in this broadcast.')
return
# We are interested in this broadcast because of its tag.
signedData = data[8:readPosition] # We're going to add some more data which is signed further down.
cryptorObject = shared.MyECSubscriptionCryptorObjects[embeddedTag]
try:
decryptedData = cryptorObject.decrypt(data[readPosition:])
logger.debug('EC decryption successful')
except Exception as err:
logger.debug('Broadcast version 3 decryption Unsuccessful.')
logger.debug('Broadcast version %s decryption Unsuccessful.' % broadcastVersion)
return
signedBroadcastVersion, readPosition = decodeVarint(
decryptedData[:10])
# broadcast version 3 includes the broadcast version at the beginning
# of the decryptedData. Broadcast version 4 doesn't.
readPosition = 0
if broadcastVersion == 3:
signedBroadcastVersion, signedBroadcastVersionLength = decodeVarint(
decryptedData[:10])
readPosition += signedBroadcastVersionLength
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
decryptedData[readPosition:readPosition + 9])
if sendersAddressVersion < 4:
logger.info('Cannot decode senderAddressVersion less than 4 for broadcast version number 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
logger.info('Cannot decode senderAddressVersion less than 4 for broadcast version number 3 or 4. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
return
readPosition += sendersAddressVersionLength
sendersStream, sendersStreamLength = decodeVarint(
@ -1067,14 +915,14 @@ class objectProcessor(threading.Thread):
readPosition += signatureLengthLength
signature = decryptedData[
readPosition:readPosition + signatureLength]
try:
if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
except Exception as err:
logger.debug('ECDSA verify failed %s' % err)
if broadcastVersion == 3: # broadcastVersion 3 should be completely unused after the end of the protocol v3 upgrade period
signedData = decryptedData[:readPositionAtBottomOfMessage]
elif broadcastVersion == 5:
signedData += decryptedData[:readPositionAtBottomOfMessage]
if not highlevelcrypto.verify(signedData, signature, sendersPubSigningKey.encode('hex')):
logger.debug('ECDSA verify failed')
return
logger.debug('ECDSA verify passed')
# verify passed
fromAddress = encodeAddress(
@ -1168,23 +1016,21 @@ class objectProcessor(threading.Thread):
logger.info('The length of ackData is unreasonably short. Not sending ackData.')
return False
magic,command,payload_length,checksum = shared.Header.unpack(ackData[:shared.Header.size])
magic,command,payloadLength,checksum = shared.Header.unpack(ackData[:shared.Header.size])
if magic != 0xE9BEB4D9:
logger.info('Ackdata magic bytes were wrong. Not sending ackData.')
return False
payload = ackData[shared.Header.size:]
if len(payload) != payload_length:
if len(payload) != payloadLength:
logger.info('ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.')
return False
if payload_length > 180000000: # If the size of the message is greater than 180MB, ignore it.
if payloadLength > 2 ** 18: # 256 KiB
return False
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
logger.info('ackdata checksum wrong. Not sending ackdata.')
return False
if (command != addDataPadding('getpubkey') and
command != addDataPadding('pubkey') and
command != addDataPadding('msg') and
command != addDataPadding('broadcast')):
command = command.rstrip('\x00')
if command != 'object':
return False
return True

View File

@ -8,6 +8,7 @@ import socket
import random
from struct import unpack, pack
import sys
import traceback
#import string
#from subprocess import call # used when the API must execute an outside program
#from pyelliptic.openssl import OpenSSL
@ -21,7 +22,6 @@ from helper_generic import addDataPadding, isHostInPrivateIPRange
from helper_sql import *
#import tr
from debug import logger
#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToGetPerPeer, shared.neededPubkeys
# This thread is created either by the synSenderThread(for outgoing
# connections) or the singleListenerThread(for incoming connections).
@ -117,7 +117,7 @@ class receiveDataThread(threading.Thread):
if magic != 0xE9BEB4D9:
self.data = ""
return
if payloadLength > 20000000:
if payloadLength > 2 ** 18: # 256 KiB
logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength)
self.data = self.data[payloadLength + shared.Header.size:]
del magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
@ -146,33 +146,30 @@ class receiveDataThread(threading.Thread):
with shared.printLock:
print 'remoteCommand', repr(command), ' from', self.peer
#TODO: Use a dispatcher here
if not self.connectionIsOrWasFullyEstablished:
if command == 'version':
self.recversion(payload)
elif command == 'verack':
self.recverack()
else:
if command == 'addr':
self.recaddr(payload)
elif command == 'getpubkey':
shared.checkAndSharegetpubkeyWithPeers(payload)
elif command == 'pubkey':
self.recpubkey(payload)
elif command == 'inv':
self.recinv(payload)
elif command == 'getdata':
self.recgetdata(payload)
elif command == 'msg':
self.recmsg(payload)
elif command == 'broadcast':
self.recbroadcast(payload)
elif command == 'ping':
self.sendpong(payload)
#elif command == 'pong':
# pass
#elif command == 'alert':
# pass
try:
#TODO: Use a dispatcher here
if not self.connectionIsOrWasFullyEstablished:
if command == 'version':
self.recversion(payload)
elif command == 'verack':
self.recverack()
else:
if command == 'addr':
self.recaddr(payload)
elif command == 'inv':
self.recinv(payload)
elif command == 'getdata':
self.recgetdata(payload)
elif command == 'object':
self.recobject(payload)
elif command == 'ping':
self.sendpong(payload)
#elif command == 'pong':
# pass
except varintDecodeError as e:
logger.debug("There was a problem with a varint while processing a message from the wire. Some details: %s" % e)
except Exception as e:
logger.critical("Critical error in a receiveDataThread: \n%s" % traceback.format_exc())
del payload
self.data = self.data[payloadLength + shared.Header.size:] # take this message out and then process the next message
@ -273,12 +270,10 @@ class receiveDataThread(threading.Thread):
self.sendBigInv()
def sendBigInv(self):
# Select all hashes which are younger than two days old and in this
# stream.
# Select all hashes for objects in this stream.
queryreturn = sqlQuery(
'''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''',
int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers,
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys,
'''SELECT hash FROM inventory WHERE expirestime>? and streamnumber=?''',
int(time.time()),
self.streamNumber)
bigInvList = {}
for row in queryreturn:
@ -290,8 +285,8 @@ class receiveDataThread(threading.Thread):
with shared.inventoryLock:
for hash, storedValue in shared.inventory.items():
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
objectType, streamNumber, payload, receivedTime, tag = storedValue
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
objectType, streamNumber, payload, expiresTime, tag = storedValue
if streamNumber == self.streamNumber and expiresTime > int(time.time()):
bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0
payload = ''
@ -327,78 +322,27 @@ class receiveDataThread(threading.Thread):
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
time.sleep(sleepTime)
# We have received a broadcast message
def recbroadcast(self, data):
def recobject(self, data):
self.messageProcessingStartTime = time.time()
shared.checkAndShareBroadcastWithPeers(data)
lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data)