From c306062282d2041a6121e9891fabf3195e601eac Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Wed, 27 Aug 2014 03:14:32 -0400 Subject: [PATCH] Bitmessage Protocol Version Three --- src/addresses.py | 81 +++-- src/api.py | 19 +- src/bitmessagecurses/__init__.py | 2 + src/bitmessagemain.py | 6 +- src/bitmessageqt/__init__.py | 26 +- src/build_osx.py | 24 +- src/class_objectProcessor.py | 544 +++++++++++-------------------- src/class_receiveDataThread.py | 173 ++++------ src/class_singleCleaner.py | 50 ++- src/class_singleListener.py | 10 +- src/class_singleWorker.py | 465 +++++++++++++++----------- src/class_sqlThread.py | 67 +++- src/defaultKnownNodes.py | 11 +- src/helper_bootstrap.py | 2 +- src/helper_startup.py | 4 +- src/highlevelcrypto.py | 5 +- src/message_data_reader.py | 6 +- src/proofofwork.py | 2 +- src/pyelliptic/ecc.py | 13 +- src/shared.py | 439 ++++++++++++++----------- 20 files changed, 990 insertions(+), 959 deletions(-) diff --git a/src/addresses.py b/src/addresses.py index b9135d04..7307c58e 100644 --- a/src/addresses.py +++ b/src/addresses.py @@ -68,23 +68,47 @@ def encodeVarint(integer): if integer >= 18446744073709551616: print 'varint cannot be >= 18446744073709551616' raise SystemExit + +class varintDecodeError(Exception): + pass def decodeVarint(data): + """ + Decodes an encoded varint to an integer and returns it. + Per protocol v3, the encoded value must be encoded with + the minimum amount of data possible or else it is malformed. + """ + if len(data) == 0: return (0,0) firstByte, = unpack('>B',data[0:1]) if firstByte < 253: + # encodes 0 to 252 return (firstByte,1) #the 1 is the length of the varint if firstByte == 253: - a, = unpack('>H',data[1:3]) - return (a,3) + # encodes 253 to 65535 + if len(data) < 3: + raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 3.' % (firstByte, len(data))) + encodedValue, = unpack('>H',data[1:3]) + if encodedValue < 253: + raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.') + return (encodedValue,3) if firstByte == 254: - a, = unpack('>I',data[1:5]) - return (a,5) + # encodes 65536 to 4294967295 + if len(data) < 5: + raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 5.' % (firstByte, len(data))) + encodedValue, = unpack('>I',data[1:5]) + if encodedValue < 65536: + raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.') + return (encodedValue,5) if firstByte == 255: - a, = unpack('>Q',data[1:9]) - return (a,9) - + # encodes 4294967296 to 18446744073709551615 + if len(data) < 9: + raise varintDecodeError('The first byte of this varint as an integer is %s but the total length is only %s. It needs to be at least 9.' % (firstByte, len(data))) + encodedValue, = unpack('>Q',data[1:9]) + if encodedValue < 4294967296: + raise varintDecodeError('This varint does not encode the value with the lowest possible number of bytes.') + return (encodedValue,9) def calculateInventoryHash(data): @@ -163,7 +187,12 @@ def decodeAddress(address): #else: # print 'checksum PASSED' - addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9]) + try: + addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9]) + except varintDecodeError as e: + print e + status = 'varintmalformed' + return status,0,0,"" #print 'addressVersionNumber', addressVersionNumber #print 'bytesUsedByVersionNumber', bytesUsedByVersionNumber @@ -176,32 +205,42 @@ def decodeAddress(address): status = 'versiontoohigh' return status,0,0,"" - streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:]) + try: + streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:]) + except varintDecodeError as e: + print e + status = 'varintmalformed' + return status,0,0,"" #print streamNumber status = 'success' if addressVersionNumber == 1: return status,addressVersionNumber,streamNumber,data[-24:-4] elif addressVersionNumber == 2 or addressVersionNumber == 3: - if len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 19: - return status,addressVersionNumber,streamNumber,'\x00'+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] - elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 20: - return status,addressVersionNumber,streamNumber,data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] - elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) == 18: - return status,addressVersionNumber,streamNumber,'\x00\x00'+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] - elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) < 18: + embeddedRipeData = data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] + if len(embeddedRipeData) == 19: + return status,addressVersionNumber,streamNumber,'\x00'+embeddedRipeData + elif len(embeddedRipeData) == 20: + return status,addressVersionNumber,streamNumber,embeddedRipeData + elif len(embeddedRipeData) == 18: + return status,addressVersionNumber,streamNumber,'\x00\x00'+embeddedRipeData + elif len(embeddedRipeData) < 18: return 'ripetooshort',0,0,"" - elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) > 20: + elif len(embeddedRipeData) > 20: return 'ripetoolong',0,0,"" else: return 'otherproblem',0,0,"" elif addressVersionNumber == 4: - if len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) > 20: + embeddedRipeData = data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] + if embeddedRipeData[0:1] == '\x00': + # In order to enforce address non-malleability, encoded RIPE data must have NULL bytes removed from the front + return 'encodingproblem',0,0,"" + elif len(embeddedRipeData) > 20: return 'ripetoolong',0,0,"" - elif len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4]) < 4: + elif len(embeddedRipeData) < 4: return 'ripetooshort',0,0,"" else: - x00string = '\x00' * (20 - len(data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4])) - return status,addressVersionNumber,streamNumber,x00string+data[bytesUsedByVersionNumber+bytesUsedByStreamNumber:-4] + x00string = '\x00' * (20 - len(embeddedRipeData)) + return status,addressVersionNumber,streamNumber,x00string+embeddedRipeData def addBMIfNotPresent(address): address = str(address).strip() diff --git a/src/api.py b/src/api.py index bd28d923..417eba35 100644 --- a/src/api.py +++ b/src/api.py @@ -17,7 +17,7 @@ import json import shared import time -from addresses import decodeAddress,addBMIfNotPresent,decodeVarint,calculateInventoryHash +from addresses import decodeAddress,addBMIfNotPresent,decodeVarint,calculateInventoryHash,varintDecodeError import helper_inbox import helper_sent import hashlib @@ -139,6 +139,8 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): raise APIError(9, 'Invalid characters in address: ' + address) if status == 'versiontoohigh': raise APIError(10, 'Address version number too high (or zero) in address: ' + address) + if status == 'varintmalformed': + raise APIError(26, 'Malformed varint in address: ' + address) raise APIError(7, 'Could not decode address: ' + address + ' : ' + status) if addressVersionNumber < 2 or addressVersionNumber > 4: raise APIError(11, 'The address version number currently must be 2, 3 or 4. Others aren\'t supported. Check the address.') @@ -777,9 +779,10 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): encryptedPayload = pack('>Q', nonce) + encryptedPayload toStreamNumber = decodeVarint(encryptedPayload[16:26])[0] inventoryHash = calculateInventoryHash(encryptedPayload) - objectType = 'msg' + objectType = 2 + TTL = 2.5 * 24 * 60 * 60 shared.inventory[inventoryHash] = ( - objectType, toStreamNumber, encryptedPayload, int(time.time()),'') + objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'') shared.inventorySets[toStreamNumber].add(inventoryHash) with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex') @@ -814,10 +817,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): pubkeyReadPosition += addressVersionLength pubkeyStreamNumber = decodeVarint(payload[pubkeyReadPosition:pubkeyReadPosition+10])[0] inventoryHash = calculateInventoryHash(payload) - objectType = 'pubkey' + objectType = 1 #todo: support v4 pubkeys + TTL = 28 * 24 * 60 * 60 shared.inventory[inventoryHash] = ( - objectType, pubkeyStreamNumber, payload, int(time.time()),'') + objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'') shared.inventorySets[pubkeyStreamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex') @@ -839,7 +843,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): # use it we'll need to fill out a field in our inventory database # which is blank by default (first20bytesofencryptedmessage). queryreturn = sqlQuery( - '''SELECT hash, payload FROM inventory WHERE tag = '' and objecttype = 'msg' ; ''') + '''SELECT hash, payload FROM inventory WHERE tag = '' and objecttype = 2 ; ''') with SqlBulkExecute() as sql: for row in queryreturn: hash01, payload = row @@ -906,6 +910,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): return self._handle_request(method, params) except APIError as e: return str(e) + except varintDecodeError as e: + logger.error(e) + return "Data contains a malformed varint. Some details: %s" % e except Exception as e: logger.exception(e) return "API Error 0021: Unexpected API Failure - %s" % str(e) diff --git a/src/bitmessagecurses/__init__.py b/src/bitmessagecurses/__init__.py index 1c88d222..91e72355 100644 --- a/src/bitmessagecurses/__init__.py +++ b/src/bitmessagecurses/__init__.py @@ -742,6 +742,8 @@ def sendMessage(sender="", recv="", broadcast=None, subject="", body="", reply=F err += "Some data encoded in the address is too short. There might be something wrong with the software of your acquaintance." elif status == "ripetoolong": err += "Some data encoded in the address is too long. There might be something wrong with the software of your acquaintance." + elif status == "varintmalformed": + err += "Some data encoded in the address is malformed. There might be something wrong with the software of your acquaintance." else: err += "It is unknown what is wrong with the address." d.scrollbox(unicode(err), exit_label="Continue") diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 491b82f0..2cbd308b 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -42,6 +42,8 @@ from api import MySimpleXMLRPCRequestHandler from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections import shared +import helper_startup +helper_startup.loadConfig() from helper_sql import sqlQuery import threading @@ -154,9 +156,9 @@ selfInitiatedConnections = {} if shared.useVeryEasyProofOfWorkForTesting: shared.networkDefaultProofOfWorkNonceTrialsPerByte = int( - shared.networkDefaultProofOfWorkNonceTrialsPerByte / 16) + shared.networkDefaultProofOfWorkNonceTrialsPerByte / 100) shared.networkDefaultPayloadLengthExtraBytes = int( - shared.networkDefaultPayloadLengthExtraBytes / 7000) + shared.networkDefaultPayloadLengthExtraBytes / 100) class Main: def start(self, daemon=False): diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index e00011a9..2dcc04ca 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -1842,6 +1842,17 @@ class MyForm(QtGui.QMainWindow): subject = str(self.ui.lineEditSubject.text().toUtf8()) message = str( self.ui.textEditMessage.document().toPlainText().toUtf8()) + """ + The whole network message must fit in 2^18 bytes. Let's assume 500 + bytes of overhead. If someone wants to get that too an exact + number you are welcome to but I think that it would be a better + use of time to support message continuation so that users can + send messages of any length. + """ + if len(message) > (2 ** 18 - 500): + QMessageBox.about(self, _translate("MainWindow", "Message too long"), _translate( + "MainWindow", "The message that you are trying to send is too long by %1 bytes. (The maximum is 261644 bytes). Please cut it down before sending.").arg(len(message) - (2 ** 18 - 500))) + return if self.ui.radioButtonSpecific.isChecked(): # To send a message to specific people (rather than broadcast) toAddressesList = [s.strip() for s in toAddresses.replace(',', ';').split(';')] @@ -1873,6 +1884,9 @@ class MyForm(QtGui.QMainWindow): elif status == 'ripetoolong': self.statusBar().showMessage(_translate( "MainWindow", "Error: Some data encoded in the address %1 is too long. There might be something wrong with the software of your acquaintance.").arg(toAddress)) + elif status == 'varintmalformed': + self.statusBar().showMessage(_translate( + "MainWindow", "Error: Some data encoded in the address %1 is malformed. There might be something wrong with the software of your acquaintance.").arg(toAddress)) else: self.statusBar().showMessage(_translate( "MainWindow", "Error: Something is wrong with the address %1.").arg(toAddress)) @@ -2211,10 +2225,10 @@ class MyForm(QtGui.QMainWindow): addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest() tag = doubleHashOfAddressData[32:] queryreturn = sqlQuery( - '''select payload from inventory where objecttype='broadcast' and tag=?''', tag) + '''select payload from inventory where objecttype=3 and tag=?''', tag) for row in queryreturn: payload, = row - objectType = 'broadcast' + objectType = 3 with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(payload) shared.objectProcessorQueue.put((objectType,payload)) @@ -3620,6 +3634,9 @@ class AddAddressDialog(QtGui.QDialog): elif status == 'ripetoolong': self.ui.labelAddressCheck.setText(_translate( "MainWindow", "Some data encoded in the address is too long.")) + elif status == 'varintmalformed': + self.ui.labelAddressCheck.setText(_translate( + "MainWindow", "Some data encoded in the address is malformed.")) elif status == 'success': self.ui.labelAddressCheck.setText( _translate("MainWindow", "Address is valid.")) @@ -3658,6 +3675,9 @@ class NewSubscriptionDialog(QtGui.QDialog): elif status == 'ripetoolong': self.ui.labelAddressCheck.setText(_translate( "MainWindow", "Some data encoded in the address is too long.")) + elif status == 'varintmalformed': + self.ui.labelAddressCheck.setText(_translate( + "MainWindow", "Some data encoded in the address is malformed.")) elif status == 'success': self.ui.labelAddressCheck.setText( _translate("MainWindow", "Address is valid.")) @@ -3670,7 +3690,7 @@ class NewSubscriptionDialog(QtGui.QDialog): addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest() tag = doubleHashOfAddressData[32:] queryreturn = sqlQuery( - '''select hash from inventory where objecttype='broadcast' and tag=?''', tag) + '''select hash from inventory where objecttype=3 and tag=?''', tag) if len(queryreturn) == 0: self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText( _translate("MainWindow", "There are no recent broadcasts from this address to display.")) diff --git a/src/build_osx.py b/src/build_osx.py index f2bf8378..ce70cd54 100644 --- a/src/build_osx.py +++ b/src/build_osx.py @@ -1,19 +1,19 @@ from setuptools import setup name = "Bitmessage" -version = "0.4.2" +version = "0.4.4" mainscript = ["bitmessagemain.py"] setup( - name = name, - version = version, - app = mainscript, - setup_requires = ["py2app"], - options = dict( - py2app = dict( - resources = ["images", "translations"], - includes = ['sip', 'PyQt4._qt'], - iconfile = "images/bitmessage.icns" - ) - ) + name = name, + version = version, + app = mainscript, + setup_requires = ["py2app"], + options = dict( + py2app = dict( + resources = ["images", "translations"], + includes = ['sip', 'PyQt4._qt'], + iconfile = "images/bitmessage.icns" + ) + ) ) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 326ba98a..d37a9836 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -7,8 +7,9 @@ from struct import unpack, pack import sys import string from subprocess import call # used when the API must execute an outside program -from pyelliptic.openssl import OpenSSL +import traceback +from pyelliptic.openssl import OpenSSL import highlevelcrypto from addresses import * import helper_generic @@ -51,18 +52,23 @@ class objectProcessor(threading.Thread): while True: objectType, data = shared.objectProcessorQueue.get() - if objectType == 'getpubkey': - self.processgetpubkey(data) - elif objectType == 'pubkey': - self.processpubkey(data) - elif objectType == 'msg': - self.processmsg(data) - elif objectType == 'broadcast': - self.processbroadcast(data) - elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable. - pass - else: - logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType)) + try: + if objectType == 0: # getpubkey + self.processgetpubkey(data) + elif objectType == 1: #pubkey + self.processpubkey(data) + elif objectType == 2: #msg + self.processmsg(data) + elif objectType == 3: #broadcast + self.processbroadcast(data) + elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable. + pass + else: + logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType)) + except varintDecodeError as e: + logger.debug("There was a problem with a varint while processing an object. Some details: %s" % e) + except Exception as e: + logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc()) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. @@ -83,17 +89,7 @@ class objectProcessor(threading.Thread): break def processgetpubkey(self, data): - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - + readPosition = 20 # bypass the nonce, time, and object type requestedAddressVersionNumber, addressVersionLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += addressVersionLength @@ -147,7 +143,7 @@ class objectProcessor(threading.Thread): myAddress, 'lastpubkeysendtime')) except: lastPubkeySendTime = 0 - if lastPubkeySendTime > time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was more recent than 28 days ago... + if lastPubkeySendTime > time.time() - 2419200: # If the last time we sent our pubkey was more recent than 28 days ago... logger.info('Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s' % lastPubkeySendTime) return logger.info('Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.') @@ -166,17 +162,8 @@ class objectProcessor(threading.Thread): shared.numberOfPubkeysProcessed += 1 shared.UISignalQueue.put(( 'updateNumberOfPubkeysProcessed', 'no data')) - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass the nonce, time, and object type addressVersion, varintLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += varintLength @@ -225,15 +212,25 @@ class objectProcessor(threading.Thread): queryreturn = sqlQuery( '''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion) + + """ + With the changes in protocol v3, we have to be careful to store pubkey data + in the database the same way we did before to maintain backwards compatibility + with what is in people's databases already. This means that for v2 keys, we + must store the nonce, the time, and then everything else starting with the + address version. + """ + dataToStore = '\x00' * 8 # fake nonce + dataToStore += data[8:16] # the time + dataToStore += data[20:] # everything else + if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: logger.info('We HAVE used this pubkey personally. Updating time.') - t = (ripe, addressVersion, data, embeddedTime, 'yes') + t = (ripe, addressVersion, dataToStore, int(time.time()), 'yes') else: logger.info('We have NOT used this pubkey personally. Inserting in database.') - t = (ripe, addressVersion, data, embeddedTime, 'no') - # This will also update the embeddedTime. + t = (ripe, addressVersion, dataToStore, int(time.time()), 'no') sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) - # shared.workerQueue.put(('newpubkey',(addressVersion,streamNumber,ripe))) self.possibleNewPubkey(ripe = ripe) if addressVersion == 3: if len(data) < 170: # sanity check. @@ -242,9 +239,6 @@ class objectProcessor(threading.Thread): bitfieldBehaviors = data[readPosition:readPosition + 4] readPosition += 4 publicSigningKey = '\x04' + data[readPosition:readPosition + 64] - # Is it possible for a public key to be invalid such that trying to - # encrypt or sign with it will cause an error? If it is, we should - # probably test these keys here. readPosition += 64 publicEncryptionKey = '\x04' + data[readPosition:readPosition + 64] readPosition += 64 @@ -259,14 +253,32 @@ class objectProcessor(threading.Thread): data[readPosition:readPosition + 10]) readPosition += signatureLengthLength signature = data[readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(data[8:endOfSignedDataPosition], signature, publicSigningKey.encode('hex')): - logger.warning('ECDSA verify failed (within processpubkey)') + """ + With the changes in protocol v3, to maintain backwards compatibility, signatures will be sent + the 'old' way during an upgrade period and then a 'new' simpler way after that. We will therefore + check the sig both ways. + Old way: + signedData = timePubkeyWasSigned(4 bytes) + addressVersion through extra_bytes + New way: + signedData = all of the payload data, from the time down through the extra_bytes + + The timePubkeyWasSigned will be calculated by subtracting 28 days form the embedded expiresTime. + """ + expiresTime, = unpack('>Q', data[8:16]) + TTL = 28 * 24 * 60 * 60 + signedData = pack('>I', (expiresTime - TTL)) # the time that the pubkey was signed. 4 bytes. + signedData += data[20:endOfSignedDataPosition] # the address version down through the payloadLengthExtraBytes + + if highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')): + logger.info('ECDSA verify passed (within processpubkey, old method)') + else: + logger.warning('ECDSA verify failed (within processpubkey, old method)') + # let us try the newer signature method + if highlevelcrypto.verify(data[8:endOfSignedDataPosition], signature, publicSigningKey.encode('hex')): + logger.info('ECDSA verify passed (within processpubkey, new method)') + else: + logger.warning('ECDSA verify failed (within processpubkey, new method)') return - logger.info('ECDSA verify passed (within processpubkey)') - except Exception as err: - logger.warning('ECDSA verify failed (within processpubkey) %s' % err) - return sha = hashlib.new('sha512') sha.update(publicSigningKey + publicEncryptionKey) @@ -286,109 +298,45 @@ class objectProcessor(threading.Thread): ) ) + + """ + With the changes in protocol v3, we have to be careful to store pubkey data + in the database the same way we did before to maintain backwards compatibility + with what is in people's databases already. This means that for v3 keys, we + must store the nonce, the time, and then everything else starting with the + address version. + """ + dataToStore = '\x00' * 8 # fake nonce + dataToStore += data[8:16] # the time + dataToStore += data[20:] # everything else + queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion) if queryreturn != []: # if this pubkey is already in our database and if we have used it personally: logger.info('We HAVE used this pubkey personally. Updating time.') - t = (ripe, addressVersion, data, embeddedTime, 'yes') + t = (ripe, addressVersion, dataToStore, int(time.time()), 'yes') else: logger.info('We have NOT used this pubkey personally. Inserting in database.') - t = (ripe, addressVersion, data, embeddedTime, 'no') - # This will also update the embeddedTime. + t = (ripe, addressVersion, dataToStore, int(time.time()), 'no') sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) self.possibleNewPubkey(ripe = ripe) if addressVersion == 4: - """ - There exist a function: shared.decryptAndCheckPubkeyPayload which does something almost - the same as this section of code. There are differences, however; one being that - decryptAndCheckPubkeyPayload requires that a cryptor object be created each time it is - run which is an expensive operation. This, on the other hand, keeps them saved in - the shared.neededPubkeys dictionary so that if an attacker sends us many - incorrectly-tagged pubkeys, which would force us to try to decrypt them, this code - would run and handle that event quite quickly. - """ if len(data) < 350: # sanity check. logger.debug('(within processpubkey) payloadLength less than 350. Sanity check failed.') return - signedData = data[8:readPosition] # Some of the signed data is not encrypted so let's keep it for now. + tag = data[readPosition:readPosition + 32] - readPosition += 32 - encryptedData = data[readPosition:] if tag not in shared.neededPubkeys: logger.info('We don\'t need this v4 pubkey. We didn\'t ask for it.') return - - # Let us try to decrypt the pubkey - cryptorObject = shared.neededPubkeys[tag] - try: - decryptedData = cryptorObject.decrypt(encryptedData) - except: - # Someone must have encrypted some data with a different key - # but tagged it with a tag for which we are watching. - logger.info('Pubkey decryption was unsuccessful.') - return - - readPosition = 0 - bitfieldBehaviors = decryptedData[readPosition:readPosition + 4] - readPosition += 4 - publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64] - # Is it possible for a public key to be invalid such that trying to - # encrypt or check a sig with it will cause an error? If it is, we - # should probably test these keys here. - readPosition += 64 - publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedNonceTrialsPerByteLength - specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedPayloadLengthExtraBytesLength - signedData += decryptedData[:readPosition] - signatureLength, signatureLengthLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += signatureLengthLength - signature = decryptedData[readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')): - logger.info('ECDSA verify failed (within processpubkey)') - return - logger.info('ECDSA verify passed (within processpubkey)') - except Exception as err: - logger.info('ECDSA verify failed (within processpubkey) %s' % err) - return - - sha = hashlib.new('sha512') - sha.update(publicSigningKey + publicEncryptionKey) - ripeHasher = hashlib.new('ripemd160') - ripeHasher.update(sha.digest()) - ripe = ripeHasher.digest() - - # We need to make sure that the tag on the outside of the encryption - # is the one generated from hashing these particular keys. - if tag != hashlib.sha512(hashlib.sha512(encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:]: - logger.info('Someone was trying to act malicious: tag doesn\'t match the keys in this pubkey message. Ignoring it.') - return - logger.info('within recpubkey, addressVersion: %s, streamNumber: %s \n\ - ripe %s\n\ - publicSigningKey in hex: %s\n\ - publicEncryptionKey in hex: %s' % (addressVersion, - streamNumber, - ripe.encode('hex'), - publicSigningKey.encode('hex'), - publicEncryptionKey.encode('hex') - ) - ) - - t = (ripe, addressVersion, signedData, embeddedTime, 'yes') - sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) - - fromAddress = encodeAddress(addressVersion, streamNumber, ripe) - # That this point we know that we have been waiting on this pubkey. - # This function will command the workerThread to start work on - # the messages that require it. - self.possibleNewPubkey(address = fromAddress) + # Let us try to decrypt the pubkey + toAddress, cryptorObject = shared.neededPubkeys[tag] + if shared.decryptAndCheckPubkeyPayload(data, toAddress) == 'successful': + # At this point we know that we have been waiting on this pubkey. + # This function will command the workerThread to start work on + # the messages that require it. + self.possibleNewPubkey(address=toAddress) # Display timing data timeRequiredToProcessPubkey = time.time( @@ -401,28 +349,28 @@ class objectProcessor(threading.Thread): shared.numberOfMessagesProcessed += 1 shared.UISignalQueue.put(( 'updateNumberOfMessagesProcessed', 'no data')) - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 + readPosition = 20 # bypass the nonce, time, and object type + + """ + In protocol v2, the next byte(s) was the streamNumber. But starting after + the protocol v3 upgrade period, the next byte(s) will be a msg version + number followed by the streamNumber. + """ + #msgVersionOutsideEncryption, msgVersionOutsideEncryptionLength = decodeVarint(data[readPosition:readPosition + 9]) + #readPosition += msgVersionOutsideEncryptionLength + streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint( data[readPosition:readPosition + 9]) readPosition += streamNumberAsClaimedByMsgLength inventoryHash = calculateInventoryHash(data) initialDecryptionSuccessful = False # Let's check whether this is a message acknowledgement bound for us. - if data[readPosition:] in shared.ackdataForWhichImWatching: + if data[-32:] in shared.ackdataForWhichImWatching: logger.info('This msg IS an acknowledgement bound for me.') - del shared.ackdataForWhichImWatching[data[readPosition:]] + del shared.ackdataForWhichImWatching[data[-32:]] sqlExecute('UPDATE sent SET status=? WHERE ackdata=?', - 'ackreceived', data[readPosition:]) - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (data[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(l10n.formatTimestamp())))) + 'ackreceived', data[-32:]) + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (data[-32:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(l10n.formatTimestamp())))) return else: logger.info('This was NOT an acknowledgement bound for me.') @@ -430,17 +378,35 @@ class objectProcessor(threading.Thread): # This is not an acknowledgement bound for me. See if it is a message # bound for me by trying to decrypt it with my private keys. + + # This can be simplified quite a bit after 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT for key, cryptorObject in shared.myECCryptorObjects.items(): try: - decryptedData = cryptorObject.decrypt( - data[readPosition:]) + decryptedData = cryptorObject.decrypt(data[readPosition:]) toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data. initialDecryptionSuccessful = True - logger.info('EC decryption successful using key associated with ripe hash: %s' % key.encode('hex')) + logger.info('EC decryption successful using key associated with ripe hash: %s. msg did NOT specify version.' % key.encode('hex')) + + # We didn't bypass a msg version above as it is commented out. + # But the decryption was successful. Which means that there + # wasn't a msg version byte include in this msg. + msgObjectContainedVersion = False break except Exception as err: - pass - # print 'cryptorObject.decrypt Exception:', err + # What if a client sent us a msg with + # a msg version included? We didn't bypass it above. So + # let's try to decrypt the msg assuming that it is present. + try: + decryptedData = cryptorObject.decrypt(data[readPosition+1:]) # notice that we offset by 1 byte compared to the attempt above. + toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data. + initialDecryptionSuccessful = True + logger.info('EC decryption successful using key associated with ripe hash: %s. msg DID specifiy version.' % key.encode('hex')) + + # There IS a msg version byte include in this msg. + msgObjectContainedVersion = True + break + except Exception as err: + pass if not initialDecryptionSuccessful: # This is not a message bound for me. logger.info('Length of time program spent failing to decrypt this message: %s seconds.' % (time.time() - messageProcessingStartTime,)) @@ -450,12 +416,15 @@ class objectProcessor(threading.Thread): toAddress = shared.myAddressesByHash[ toRipe] # Look up my address based on the RIPE hash. readPosition = 0 - messageVersion, messageVersionLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += messageVersionLength - if messageVersion != 1: - logger.info('Cannot understand message versions other than one. Ignoring message.') - return + if not msgObjectContainedVersion: # by which I mean "if the msg object didn't have the msg version outside of the encryption". This confusingness will be removed after the protocol v3 upgrade period. + messageVersionWithinEncryption, messageVersionWithinEncryptionLength = decodeVarint( + decryptedData[readPosition:readPosition + 10]) + readPosition += messageVersionWithinEncryptionLength + if messageVersionWithinEncryption != 1: + logger.info('Cannot understand message versions other than one. Ignoring message.') + return + else: + messageVersionWithinEncryptionLength = 0 # This variable can disappear after the protocol v3 upgrade period is complete. sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint( decryptedData[readPosition:readPosition + 10]) readPosition += sendersAddressVersionNumberLength @@ -520,14 +489,17 @@ class objectProcessor(threading.Thread): readPosition += signatureLengthLength signature = decryptedData[ readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(decryptedData[:positionOfBottomOfAckData], signature, pubSigningKey.encode('hex')): - logger.debug('ECDSA verify failed') - return - logger.debug('ECDSA verify passed') - except Exception as err: - logger.debug('ECDSA verify failed %s' % err) + if not msgObjectContainedVersion: + # protocol v2. This can be removed after the end of the protocol v3 upgrade period. + signedData = decryptedData[:positionOfBottomOfAckData] + else: + # protocol v3 + signedData = data[8:20] + encodeVarint(1) + encodeVarint(streamNumberAsClaimedByMsg) + decryptedData[:positionOfBottomOfAckData] + + if not highlevelcrypto.verify(signedData, signature, pubSigningKey.encode('hex')): + logger.debug('ECDSA verify failed') return + logger.debug('ECDSA verify passed') logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' % (helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey)) ) @@ -546,7 +518,7 @@ class objectProcessor(threading.Thread): '''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', ripe.digest(), sendersAddressVersionNumber, - '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionLength:endOfThePublicKeyPosition], + '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + decryptedData[messageVersionWithinEncryptionLength:endOfThePublicKeyPosition], int(time.time()), 'yes') # This will check to see whether we happen to be awaiting this @@ -558,7 +530,7 @@ class objectProcessor(threading.Thread): '''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', ripe.digest(), sendersAddressVersionNumber, - '\x00\x00\x00\x00\x00\x00\x00\x01' + decryptedData[messageVersionLength:endOfThePublicKeyPosition], + '\x00\x00\x00\x00\x00\x00\x00\x01' + decryptedData[messageVersionWithinEncryptionLength:endOfThePublicKeyPosition], int(time.time()), 'yes') # This will check to see whether we happen to be awaiting this @@ -577,7 +549,7 @@ class objectProcessor(threading.Thread): requiredPayloadLengthExtraBytes = shared.config.getint( toAddress, 'payloadlengthextrabytes') if not shared.isProofOfWorkSufficient(data, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes): - print 'Proof of work in msg message insufficient only because it does not meet our higher requirement.' + logger.info('Proof of work in msg is insufficient only because it does not meet our higher requirement.') return blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists. if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist @@ -667,14 +639,7 @@ class objectProcessor(threading.Thread): shared.workerQueue.put(('sendbroadcast', '')) if self.ackDataHasAVaildHeader(ackData): - if ackData[4:16] == addDataPadding('getpubkey'): - shared.checkAndSharegetpubkeyWithPeers(ackData[24:]) - elif ackData[4:16] == addDataPadding('pubkey'): - shared.checkAndSharePubkeyWithPeers(ackData[24:]) - elif ackData[4:16] == addDataPadding('msg'): - shared.checkAndShareMsgWithPeers(ackData[24:]) - elif ackData[4:16] == addDataPadding('broadcast'): - shared.checkAndShareBroadcastWithPeers(ackData[24:]) + shared.checkAndShareObjectWithPeers(ackData[24:]) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( @@ -696,155 +661,26 @@ class objectProcessor(threading.Thread): shared.UISignalQueue.put(( 'updateNumberOfBroadcastsProcessed', 'no data')) inventoryHash = calculateInventoryHash(data) - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - + readPosition = 20 # bypass the nonce, time, and object type broadcastVersion, broadcastVersionLength = decodeVarint( data[readPosition:readPosition + 9]) readPosition += broadcastVersionLength - if broadcastVersion < 1 or broadcastVersion > 3: - logger.debug('Cannot decode incoming broadcast versions higher than 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.') + if broadcastVersion < 1 or broadcastVersion > 5: + logger.info('Cannot decode incoming broadcast versions higher than 5. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.') return if broadcastVersion == 1: - beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table - sendersAddressVersion, sendersAddressVersionLength = decodeVarint( - data[readPosition:readPosition + 9]) - if sendersAddressVersion <= 1 or sendersAddressVersion >= 3: - # Cannot decode senderAddressVersion higher than 2. Assuming - # the sender isn\'t being silly, you should upgrade Bitmessage - # because this message shall be ignored. - return - readPosition += sendersAddressVersionLength - if sendersAddressVersion == 2: - sendersStream, sendersStreamLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += sendersStreamLength - behaviorBitfield = data[readPosition:readPosition + 4] - readPosition += 4 - sendersPubSigningKey = '\x04' + \ - data[readPosition:readPosition + 64] - readPosition += 64 - sendersPubEncryptionKey = '\x04' + \ - data[readPosition:readPosition + 64] - readPosition += 64 - endOfPubkeyPosition = readPosition - sendersHash = data[readPosition:readPosition + 20] - if sendersHash not in shared.broadcastSendersForWhichImWatching: - # Display timing data - logger.debug('Time spent deciding that we are not interested in this v1 broadcast: %s' % (time.time() - messageProcessingStartTime,)) - return - # At this point, this message claims to be from sendersHash and - # we are interested in it. We still have to hash the public key - # to make sure it is truly the key that matches the hash, and - # also check the signiture. - readPosition += 20 - - sha = hashlib.new('sha512') - sha.update(sendersPubSigningKey + sendersPubEncryptionKey) - ripe = hashlib.new('ripemd160') - ripe.update(sha.digest()) - if ripe.digest() != sendersHash: - # The sender of this message lied. - return - messageEncodingType, messageEncodingTypeLength = decodeVarint( - data[readPosition:readPosition + 9]) - if messageEncodingType == 0: - return - readPosition += messageEncodingTypeLength - messageLength, messageLengthLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += messageLengthLength - message = data[readPosition:readPosition + messageLength] - readPosition += messageLength - readPositionAtBottomOfMessage = readPosition - signatureLength, signatureLengthLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += signatureLengthLength - signature = data[readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(data[12:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')): - logger.debug('ECDSA verify failed') - return - logger.debug('ECDSA verify passed') - except Exception as err: - logger.debug('ECDSA verify failed %s' % err) - return - # verify passed - fromAddress = encodeAddress( - sendersAddressVersion, sendersStream, ripe.digest()) - logger.debug('fromAddress: %s' % fromAddress) - - # Let's store the public key in case we want to reply to this person. - # We don't have the correct nonce or time (which would let us - # send out a pubkey message) so we'll just fill it with 1's. We - # won't be able to send this pubkey to others (without doing - # the proof of work ourselves, which this program is programmed - # to not do.) - sqlExecute( - '''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', - ripe.digest(), - sendersAddressVersion, - '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF' + '\xFF\xFF\xFF\xFF' + data[beginningOfPubkeyPosition:endOfPubkeyPosition], - int(time.time()), - 'yes') - # This will check to see whether we happen to be awaiting this - # pubkey in order to send a message. If we are, it will do the - # POW and send it. - self.possibleNewPubkey(ripe=ripe.digest()) - - - if messageEncodingType == 2: - subject, body = decodeType2Message(message) - logger.info('Broadcast subject (first 100 characters): %s' % repr(subject)[:100]) - elif messageEncodingType == 1: - body = message - subject = '' - elif messageEncodingType == 0: - logger.debug('messageEncodingType == 0. Doing nothing with the message.') - else: - body = 'Unknown encoding type.\n\n' + repr(message) - subject = '' - - toAddress = '[Broadcast subscribers]' - if messageEncodingType != 0: - # Let us make sure that we haven't already received this message - if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType): - logger.info('This broadcast is already in our inbox. Ignoring it.') - else: - t = (inventoryHash, toAddress, fromAddress, subject, int( - time.time()), body, 'inbox', messageEncodingType, 0) - helper_inbox.insert(t) - - shared.UISignalQueue.put(('displayNewInboxMessage', ( - inventoryHash, toAddress, fromAddress, subject, body))) - - # If we are behaving as an API then we might need to run an - # outside command to let some program know that a new - # message has arrived. - if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = shared.config.get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - call([apiNotifyPath, "newBroadcast"]) - - # Display timing data - logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) - - if broadcastVersion == 2: + logger.info('Version 1 broadcasts are no longer supported. Not processing it at all.') + if broadcastVersion in [2,4]: + """ + v2 or v4 broadcasts are encrypted the same way the msgs were encrypted. To see if we are interested in a + v2 broadcast, we try to decrypt it. This was replaced with v3 (and later v5) broadcasts which include a tag which + we check instead, just like we do with v4 pubkeys. + v2 and v3 broadcasts should be completely obsolete after the protocol v3 upgrade period and some code can be simplified. + """ cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += cleartextStreamNumberLength + signedData = data[8:readPosition] # This doesn't end up being used if the broadcastVersion is 2 initialDecryptionSuccessful = False for key, cryptorObject in shared.MyECSubscriptionCryptorObjects.items(): try: @@ -862,9 +698,13 @@ class objectProcessor(threading.Thread): return # At this point this is a broadcast I have decrypted and thus am # interested in. - signedBroadcastVersion, readPosition = decodeVarint( - decryptedData[:10]) - beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table + readPosition = 0 + if broadcastVersion == 2: + signedBroadcastVersion, signedBroadcastVersionLength = decodeVarint( + decryptedData[:10]) + readPosition += signedBroadcastVersionLength + + beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table. This variable can be disposed of after the protocol v3 upgrade period because it will necessarily be at the beginning of the decryptedData; ie it will definitely equal 0 sendersAddressVersion, sendersAddressVersionLength = decodeVarint( decryptedData[readPosition:readPosition + 9]) if sendersAddressVersion < 2 or sendersAddressVersion > 3: @@ -920,14 +760,14 @@ class objectProcessor(threading.Thread): readPosition += signatureLengthLength signature = decryptedData[ readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')): - logger.debug('ECDSA verify failed') - return - logger.debug('ECDSA verify passed') - except Exception as err: - logger.debug('ECDSA verify failed %s' % err) + if broadcastVersion == 2: # this can be removed after the protocol v3 upgrade period + signedData = decryptedData[:readPositionAtBottomOfMessage] + else: + signedData += decryptedData[:readPositionAtBottomOfMessage] + if not highlevelcrypto.verify(signedData, signature, sendersPubSigningKey.encode('hex')): + logger.debug('ECDSA verify failed') return + logger.debug('ECDSA verify passed') # verify passed # Let's store the public key in case we want to reply to this @@ -988,7 +828,8 @@ class objectProcessor(threading.Thread): # Display timing data logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) - if broadcastVersion == 3: + if broadcastVersion in [3,5]: + # broadcast version 3 should be completely obsolete after the end of the protocol v3 upgrade period cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += cleartextStreamNumberLength @@ -998,21 +839,28 @@ class objectProcessor(threading.Thread): logger.debug('We\'re not interested in this broadcast.') return # We are interested in this broadcast because of its tag. + signedData = data[8:readPosition] # We're going to add some more data which is signed further down. cryptorObject = shared.MyECSubscriptionCryptorObjects[embeddedTag] try: decryptedData = cryptorObject.decrypt(data[readPosition:]) logger.debug('EC decryption successful') except Exception as err: - logger.debug('Broadcast version 3 decryption Unsuccessful.') + logger.debug('Broadcast version %s decryption Unsuccessful.' % broadcastVersion) return - signedBroadcastVersion, readPosition = decodeVarint( - decryptedData[:10]) + # broadcast version 3 includes the broadcast version at the beginning + # of the decryptedData. Broadcast version 4 doesn't. + readPosition = 0 + if broadcastVersion == 3: + signedBroadcastVersion, signedBroadcastVersionLength = decodeVarint( + decryptedData[:10]) + readPosition += signedBroadcastVersionLength + beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table sendersAddressVersion, sendersAddressVersionLength = decodeVarint( decryptedData[readPosition:readPosition + 9]) if sendersAddressVersion < 4: - logger.info('Cannot decode senderAddressVersion less than 4 for broadcast version number 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.') + logger.info('Cannot decode senderAddressVersion less than 4 for broadcast version number 3 or 4. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.') return readPosition += sendersAddressVersionLength sendersStream, sendersStreamLength = decodeVarint( @@ -1067,14 +915,14 @@ class objectProcessor(threading.Thread): readPosition += signatureLengthLength signature = decryptedData[ readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')): - logger.debug('ECDSA verify failed') - return - logger.debug('ECDSA verify passed') - except Exception as err: - logger.debug('ECDSA verify failed %s' % err) + if broadcastVersion == 3: # broadcastVersion 3 should be completely unused after the end of the protocol v3 upgrade period + signedData = decryptedData[:readPositionAtBottomOfMessage] + elif broadcastVersion == 5: + signedData += decryptedData[:readPositionAtBottomOfMessage] + if not highlevelcrypto.verify(signedData, signature, sendersPubSigningKey.encode('hex')): + logger.debug('ECDSA verify failed') return + logger.debug('ECDSA verify passed') # verify passed fromAddress = encodeAddress( @@ -1168,23 +1016,21 @@ class objectProcessor(threading.Thread): logger.info('The length of ackData is unreasonably short. Not sending ackData.') return False - magic,command,payload_length,checksum = shared.Header.unpack(ackData[:shared.Header.size]) + magic,command,payloadLength,checksum = shared.Header.unpack(ackData[:shared.Header.size]) if magic != 0xE9BEB4D9: logger.info('Ackdata magic bytes were wrong. Not sending ackData.') return False payload = ackData[shared.Header.size:] - if len(payload) != payload_length: + if len(payload) != payloadLength: logger.info('ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.') return False - if payload_length > 180000000: # If the size of the message is greater than 180MB, ignore it. + if payloadLength > 2 ** 18: # 256 KiB return False if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message. logger.info('ackdata checksum wrong. Not sending ackdata.') return False - if (command != addDataPadding('getpubkey') and - command != addDataPadding('pubkey') and - command != addDataPadding('msg') and - command != addDataPadding('broadcast')): + command = command.rstrip('\x00') + if command != 'object': return False return True diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 879faff6..2dfc8135 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -8,6 +8,7 @@ import socket import random from struct import unpack, pack import sys +import traceback #import string #from subprocess import call # used when the API must execute an outside program #from pyelliptic.openssl import OpenSSL @@ -21,7 +22,6 @@ from helper_generic import addDataPadding, isHostInPrivateIPRange from helper_sql import * #import tr from debug import logger -#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToGetPerPeer, shared.neededPubkeys # This thread is created either by the synSenderThread(for outgoing # connections) or the singleListenerThread(for incoming connections). @@ -117,7 +117,7 @@ class receiveDataThread(threading.Thread): if magic != 0xE9BEB4D9: self.data = "" return - if payloadLength > 20000000: + if payloadLength > 2 ** 18: # 256 KiB logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength) self.data = self.data[payloadLength + shared.Header.size:] del magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after @@ -146,33 +146,30 @@ class receiveDataThread(threading.Thread): with shared.printLock: print 'remoteCommand', repr(command), ' from', self.peer - #TODO: Use a dispatcher here - if not self.connectionIsOrWasFullyEstablished: - if command == 'version': - self.recversion(payload) - elif command == 'verack': - self.recverack() - else: - if command == 'addr': - self.recaddr(payload) - elif command == 'getpubkey': - shared.checkAndSharegetpubkeyWithPeers(payload) - elif command == 'pubkey': - self.recpubkey(payload) - elif command == 'inv': - self.recinv(payload) - elif command == 'getdata': - self.recgetdata(payload) - elif command == 'msg': - self.recmsg(payload) - elif command == 'broadcast': - self.recbroadcast(payload) - elif command == 'ping': - self.sendpong(payload) - #elif command == 'pong': - # pass - #elif command == 'alert': - # pass + try: + #TODO: Use a dispatcher here + if not self.connectionIsOrWasFullyEstablished: + if command == 'version': + self.recversion(payload) + elif command == 'verack': + self.recverack() + else: + if command == 'addr': + self.recaddr(payload) + elif command == 'inv': + self.recinv(payload) + elif command == 'getdata': + self.recgetdata(payload) + elif command == 'object': + self.recobject(payload) + elif command == 'ping': + self.sendpong(payload) + #elif command == 'pong': + # pass + except varintDecodeError as e: + logger.debug("There was a problem with a varint while processing a message from the wire. Some details: %s" % e) + except Exception as e: + logger.critical("Critical error in a receiveDataThread: \n%s" % traceback.format_exc()) del payload self.data = self.data[payloadLength + shared.Header.size:] # take this message out and then process the next message @@ -273,12 +270,10 @@ class receiveDataThread(threading.Thread): self.sendBigInv() def sendBigInv(self): - # Select all hashes which are younger than two days old and in this - # stream. + # Select all hashes for objects in this stream. queryreturn = sqlQuery( - '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''', - int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, - int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, + '''SELECT hash FROM inventory WHERE expirestime>? and streamnumber=?''', + int(time.time()), self.streamNumber) bigInvList = {} for row in queryreturn: @@ -290,8 +285,8 @@ class receiveDataThread(threading.Thread): with shared.inventoryLock: for hash, storedValue in shared.inventory.items(): if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - objectType, streamNumber, payload, receivedTime, tag = storedValue - if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: + objectType, streamNumber, payload, expiresTime, tag = storedValue + if streamNumber == self.streamNumber and expiresTime > int(time.time()): bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' @@ -327,78 +322,27 @@ class receiveDataThread(threading.Thread): print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' time.sleep(sleepTime) - # We have received a broadcast message - def recbroadcast(self, data): + + def recobject(self, data): self.messageProcessingStartTime = time.time() - - shared.checkAndShareBroadcastWithPeers(data) - + lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data) + """ - Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. Sleeping - will help guarantee that we can process messages faster than a remote - node can send them. If we fall behind, the attacker could observe that - we are are slowing down the rate at which we request objects from the + Sleeping will help guarantee that we can process messages faster than a + remote node can send them. If we fall behind, the attacker could observe + that we are are slowing down the rate at which we request objects from the network which would indicate that we own a particular address (whichever one to which they are sending all of their attack messages). Note that if an attacker connects to a target with many connections, this mitigation mechanism might not be sufficient. """ - if len(data) > 100000000: # Size is greater than 100 megabytes - lengthOfTimeWeShouldUseToProcessThisMessage = 100 # seconds. - elif len(data) > 10000000: # Between 100 and 10 megabytes - lengthOfTimeWeShouldUseToProcessThisMessage = 20 # seconds. - elif len(data) > 1000000: # Between 10 and 1 megabyte - lengthOfTimeWeShouldUseToProcessThisMessage = 3 # seconds. - else: # Less than 1 megabyte - lengthOfTimeWeShouldUseToProcessThisMessage = .6 # seconds. - - sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ - (time.time() - self.messageProcessingStartTime) - self._sleepForTimingAttackMitigation(sleepTime) - - # We have received a msg message. - def recmsg(self, data): - self.messageProcessingStartTime = time.time() - - shared.checkAndShareMsgWithPeers(data) - - """ - Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. Sleeping - will help guarantee that we can process messages faster than a remote - node can send them. If we fall behind, the attacker could observe that - we are are slowing down the rate at which we request objects from the - network which would indicate that we own a particular address (whichever - one to which they are sending all of their attack messages). Note - that if an attacker connects to a target with many connections, this - mitigation mechanism might not be sufficient. - """ - if len(data) > 100000000: # Size is greater than 100 megabytes - lengthOfTimeWeShouldUseToProcessThisMessage = 100 # seconds. Actual length of time it took my computer to decrypt and verify the signature of a 100 MB message: 3.7 seconds. - elif len(data) > 10000000: # Between 100 and 10 megabytes - lengthOfTimeWeShouldUseToProcessThisMessage = 20 # seconds. Actual length of time it took my computer to decrypt and verify the signature of a 10 MB message: 0.53 seconds. Actual length of time it takes in practice when processing a real message: 1.44 seconds. - elif len(data) > 1000000: # Between 10 and 1 megabyte - lengthOfTimeWeShouldUseToProcessThisMessage = 3 # seconds. Actual length of time it took my computer to decrypt and verify the signature of a 1 MB message: 0.18 seconds. Actual length of time it takes in practice when processing a real message: 0.30 seconds. - else: # Less than 1 megabyte - lengthOfTimeWeShouldUseToProcessThisMessage = .6 # seconds. Actual length of time it took my computer to decrypt and verify the signature of a 100 KB message: 0.15 seconds. Actual length of time it takes in practice when processing a real message: 0.25 seconds. - - sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ - (time.time() - self.messageProcessingStartTime) - self._sleepForTimingAttackMitigation(sleepTime) - - # We have received a pubkey - def recpubkey(self, data): - self.pubkeyProcessingStartTime = time.time() - - shared.checkAndSharePubkeyWithPeers(data) - - lengthOfTimeWeShouldUseToProcessThisMessage = .1 - sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ - (time.time() - self.pubkeyProcessingStartTime) + sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - (time.time() - self.messageProcessingStartTime) self._sleepForTimingAttackMitigation(sleepTime) + # We have received an inv message def recinv(self, data): - totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates seperately because they take up memory + totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates separately because they take up memory if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0: for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items(): totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value @@ -474,34 +418,27 @@ class receiveDataThread(threading.Thread): shared.numberOfInventoryLookupsPerformed += 1 shared.inventoryLock.acquire() if hash in shared.inventory: - objectType, streamNumber, payload, receivedTime, tag = shared.inventory[ - hash] + objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash] shared.inventoryLock.release() - self.sendData(objectType, payload) + self.sendObject(payload) else: shared.inventoryLock.release() queryreturn = sqlQuery( - '''select objecttype, payload from inventory where hash=?''', - hash) + '''select payload from inventory where hash=? and expirestime>=?''', + hash, + int(time.time())) if queryreturn != []: for row in queryreturn: - objectType, payload = row - self.sendData(objectType, payload) + payload, = row + self.sendObject(payload) else: - print 'Someone asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. That shouldn\'t have happened.' + logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % self.peer) # Our peer has requested (in a getdata message) that we send an object. - def sendData(self, objectType, payload): - if (objectType != 'pubkey' and - objectType != 'getpubkey' and - objectType != 'msg' and - objectType != 'broadcast'): - sys.stderr.write( - 'Error: sendData has been asked to send a strange objectType: %s\n' % str(objectType)) - return + def sendObject(self, payload): with shared.printLock: - print 'sending', objectType - self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket(objectType, payload))) + print 'sending an object.' + self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload))) def _checkIPv4Address(self, host, hostFromAddrMessage): @@ -730,10 +667,10 @@ class receiveDataThread(threading.Thread): """ return self.remoteProtocolVersion, = unpack('>L', data[:4]) - if self.remoteProtocolVersion <= 1: + if self.remoteProtocolVersion < 3: self.sendDataThreadQueue.put((0, 'shutdown','no data')) with shared.printLock: - print 'Closing connection to old protocol version 1 node: ', self.peer + print 'Closing connection to old protocol version', self.remoteProtocolVersion, 'node: ', self.peer return # print 'remoteProtocolVersion', self.remoteProtocolVersion self.myExternalIP = socket.inet_ntoa(data[40:44]) @@ -760,7 +697,7 @@ class receiveDataThread(threading.Thread): return shared.connectedHostsList[ self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. - # If this was an incoming connection, then the sendData thread + # If this was an incoming connection, then the sendDataThread # doesn't know the stream. We have to set it. if not self.initiatedConnection: self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber)) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 74ecedac..31c697d5 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -9,20 +9,23 @@ import tr#anslate from helper_sql import * from debug import logger -'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. +""" +The singleCleaner class is a timer-driven thread that cleans data structures +to free memory, resends messages when a remote node doesn't respond, and +sends pong messages to keep connections alive if the network isn't busy. It cleans these data structures in memory: inventory (moves data to the on-disk sql database) inventorySets (clears then reloads data out of sql database) It cleans these tables on the disk: -inventory (clears data more than 2 days and 12 hours old) +inventory (clears expired objects) pubkeys (clears pubkeys older than 4 weeks old which we have not used personally) It resends messages when there has been no response: resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...) resends msg messages in 5 days (then 10 days, then 20 days, etc...) -''' +""" class singleCleaner(threading.Thread): @@ -41,22 +44,21 @@ class singleCleaner(threading.Thread): while True: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. with SqlBulkExecute() as sql: for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime, tag = storedValue - if int(time.time()) - 3600 > receivedTime: - sql.execute( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash, - objectType, - streamNumber, - payload, - receivedTime, - tag) - del shared.inventory[hash] + objectType, streamNumber, payload, expiresTime, tag = storedValue + sql.execute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + expiresTime, + tag) + del shared.inventory[hash] shared.UISignalQueue.put(('updateStatusBar', '')) + shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. # If we are running as a daemon then we are going to fill up the UI @@ -66,15 +68,9 @@ class singleCleaner(threading.Thread): shared.UISignalQueue.queue.clear() if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) - # inventory (moves data from the inventory data structure to - # the on-disk sql database) - # inventory (clears pubkeys after 28 days and everything else - # after 2 days and 12 hours) sqlExecute( - '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime= 4: doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest() privEncryptionKey = doubleHashOfAddressData[:32] # Note that this is the first half of the sha512 hash. tag = doubleHashOfAddressData[32:] - shared.neededPubkeys[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. + shared.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. - # Initialize the shared.ackdataForWhichImWatching data structure using data - # from the sql database. + # Initialize the shared.ackdataForWhichImWatching data structure queryreturn = sqlQuery( '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') for row in queryreturn: @@ -96,9 +95,11 @@ class singleWorker(threading.Thread): myAddress = shared.myAddressesByHash[hash] status, addressVersionNumber, streamNumber, hash = decodeAddress( myAddress) - embeddedTime = int(time.time() + random.randrange( - -300, 300)) # the current time plus or minus five minutes - payload = pack('>I', (embeddedTime)) + + TTL = 28 * 24 * 60 * 60 # 28 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) # 28 days from now plus or minus five minutes + payload = pack('>Q', (embeddedTime)) + payload += '\x00\x00\x00\x01' # object type: pubkey payload += encodeVarint(addressVersionNumber) # Address version number payload += encodeVarint(streamNumber) payload += '\x00\x00\x00\x01' # bitfield of features supported by me (see the wiki). @@ -112,7 +113,6 @@ class singleWorker(threading.Thread): with shared.printLock: sys.stderr.write( 'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err) - return privSigningKeyHex = shared.decodeWalletImportFormat( @@ -128,8 +128,8 @@ class singleWorker(threading.Thread): payload += pubEncryptionKey[1:] # Do the POW for this pubkey message - target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + - 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + TTL = 2.5 * 24 * 60 * 60 # 2.5 days for now; user specifyable later. + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) print '(For pubkey message) Doing proof of work...' initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) @@ -137,7 +137,7 @@ class singleWorker(threading.Thread): payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) - objectType = 'pubkey' + objectType = 1 shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') shared.inventorySets[streamNumber].add(inventoryHash) @@ -174,9 +174,19 @@ class singleWorker(threading.Thread): return status, addressVersionNumber, streamNumber, hash = decodeAddress( myAddress) - embeddedTime = int(time.time() + random.randrange( - -300, 300)) # the current time plus or minus five minutes - payload = pack('>I', (embeddedTime)) + + TTL = 28 * 24 * 60 * 60 # 28 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) # 28 days from now plus or minus five minutes + signedTimeForProtocolV2 = embeddedTime - TTL + """ + According to the protocol specification, the expiresTime along with the pubkey information is + signed. But to be backwards compatible during the upgrade period, we shall sign not the + expiresTime but rather the current time. There must be precisely a 28 day difference + between the two. After the upgrade period we'll switch to signing the whole payload with the + expiresTime time. + """ + payload = pack('>Q', (embeddedTime)) + payload += '\x00\x00\x00\x01' # object type: pubkey payload += encodeVarint(addressVersionNumber) # Address version number payload += encodeVarint(streamNumber) payload += '\x00\x00\x00\x01' # bitfield of features supported by me (see the wiki). @@ -209,13 +219,18 @@ class singleWorker(threading.Thread): myAddress, 'noncetrialsperbyte')) payload += encodeVarint(shared.config.getint( myAddress, 'payloadlengthextrabytes')) - signature = highlevelcrypto.sign(payload, privSigningKeyHex) + + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + signedData = pack('>Q', signedTimeForProtocolV2) + payload[12:] + else: + signedData = payload + + signature = highlevelcrypto.sign(signedData, privSigningKeyHex) payload += encodeVarint(len(signature)) payload += signature # Do the POW for this pubkey message - target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + - 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) print '(For pubkey message) Doing proof of work...' initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) @@ -223,7 +238,7 @@ class singleWorker(threading.Thread): payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) - objectType = 'pubkey' + objectType = 1 shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') shared.inventorySets[streamNumber].add(inventoryHash) @@ -256,9 +271,11 @@ class singleWorker(threading.Thread): return status, addressVersionNumber, streamNumber, hash = decodeAddress( myAddress) - embeddedTime = int(time.time() + random.randrange( - -300, 300)) # the current time plus or minus five minutes + + TTL = 28 * 24 * 60 * 60 # 28 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) # 28 days from now plus or minus five minutes payload = pack('>Q', (embeddedTime)) + payload += '\x00\x00\x00\x01' # object type: pubkey payload += encodeVarint(addressVersionNumber) # Address version number payload += encodeVarint(streamNumber) @@ -290,12 +307,10 @@ class singleWorker(threading.Thread): myAddress, 'noncetrialsperbyte')) dataToEncrypt += encodeVarint(shared.config.getint( myAddress, 'payloadlengthextrabytes')) + - signature = highlevelcrypto.sign(payload + dataToEncrypt, privSigningKeyHex) - dataToEncrypt += encodeVarint(len(signature)) - dataToEncrypt += signature - # Let us encrypt the necessary data. We will use a hash of the data + # When we encrypt, we'll use a hash of the data # contained in an address as a decryption key. This way in order to # read the public keys in a pubkey message, a node must know the address # first. We'll also tag, unencrypted, the pubkey with part of the hash @@ -304,14 +319,33 @@ class singleWorker(threading.Thread): doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( addressVersionNumber) + encodeVarint(streamNumber) + hash).digest()).digest() payload += doubleHashOfAddressData[32:] # the tag + + """ + According to the protocol specification, the expiresTime along with the pubkey information is + signed. But to be backwards compatible during the upgrade period, we shall sign not the + expiresTime but rather the current time. There must be precisely a 28 day difference + between the two. After the upgrade period we'll switch to signing the whole payload from + above appended with dataToEncrypt. + """ + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + dataToSign = pack('>Q', (embeddedTime - TTL)) + dataToSign += encodeVarint(addressVersionNumber) # Address version number + dataToSign += encodeVarint(streamNumber) + dataToSign += dataToEncrypt + else: + dataToSign = payload + dataToEncrypt + + signature = highlevelcrypto.sign(dataToSign, privSigningKeyHex) + dataToEncrypt += encodeVarint(len(signature)) + dataToEncrypt += signature + privEncryptionKey = doubleHashOfAddressData[:32] pubEncryptionKey = highlevelcrypto.pointMult(privEncryptionKey) payload += highlevelcrypto.encrypt( dataToEncrypt, pubEncryptionKey.encode('hex')) # Do the POW for this pubkey message - target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + - 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) print '(For pubkey message) Doing proof of work...' initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) @@ -319,7 +353,7 @@ class singleWorker(threading.Thread): payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) - objectType = 'pubkey' + objectType = 1 shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) shared.inventorySets[streamNumber].add(inventoryHash) @@ -372,12 +406,22 @@ class singleWorker(threading.Thread): pubEncryptionKey = highlevelcrypto.privToPub( privEncryptionKeyHex).decode('hex') - payload = pack('>Q', (int(time.time()) + random.randrange( - -300, 300))) # the current time plus or minus five minutes - if addressVersionNumber <= 3: - payload += encodeVarint(2) # broadcast version - else: - payload += encodeVarint(3) # broadcast version + TTL = 2.5 * 24 * 60 * 60 # 2.5 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) + payload = pack('>Q', embeddedTime) + payload += '\x00\x00\x00\x03' # object type: broadcast + + if int(time.time()) < 1416175200: # Before Sun, 16 Nov 2014 22:00:00 GMT + if addressVersionNumber <= 3: + payload += encodeVarint(2) # broadcast version + else: + payload += encodeVarint(3) # broadcast version + else: # After Sun, 16 Nov 2014 22:00:00 GMT + if addressVersionNumber <= 3: + payload += encodeVarint(4) # broadcast version + else: + payload += encodeVarint(5) # broadcast version + payload += encodeVarint(streamNumber) if addressVersionNumber >= 4: doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( @@ -387,10 +431,13 @@ class singleWorker(threading.Thread): else: tag = '' - if addressVersionNumber <= 3: - dataToEncrypt = encodeVarint(2) # broadcast version - else: - dataToEncrypt = encodeVarint(3) # broadcast version + dataToEncrypt = "" + # the broadcast version is not included here after the end of the protocol v3 upgrade period + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + if addressVersionNumber <= 3: + dataToEncrypt += encodeVarint(2) # broadcast version + else: + dataToEncrypt += encodeVarint(3) # broadcast version dataToEncrypt += encodeVarint(addressVersionNumber) dataToEncrypt += encodeVarint(streamNumber) dataToEncrypt += '\x00\x00\x00\x01' # behavior bitfield @@ -402,8 +449,13 @@ class singleWorker(threading.Thread): dataToEncrypt += '\x02' # message encoding type dataToEncrypt += encodeVarint(len('Subject:' + subject + '\n' + 'Body:' + body)) #Type 2 is simple UTF-8 message encoding per the documentation on the wiki. dataToEncrypt += 'Subject:' + subject + '\n' + 'Body:' + body + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + dataToSign = dataToEncrypt + else: + dataToSign = payload + dataToEncrypt + signature = highlevelcrypto.sign( - dataToEncrypt, privSigningKeyHex) + dataToSign, privSigningKeyHex) dataToEncrypt += encodeVarint(len(signature)) dataToEncrypt += signature @@ -420,8 +472,8 @@ class singleWorker(threading.Thread): payload += highlevelcrypto.encrypt( dataToEncrypt, pubEncryptionKey.encode('hex')) - target = 2 ** 64 / ((len( - payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + TTL = 2.5 * 24 * 60 * 60 # 2.5 days for now; user specifyable later. + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) print '(For broadcast message) Doing proof of work...' shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast...")))) @@ -430,11 +482,18 @@ class singleWorker(threading.Thread): print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce payload = pack('>Q', nonce) + payload + + # Sanity check. The payload size should never be larger than 256 KiB. There should + # be checks elsewhere in the code to not let the user try to send a message this large + # until we implement message continuation. + if len(payload) > 2 ** 18: # 256 KiB + logger.critical('This broadcast object is too large to send. This should never happen. Object size: %s' % len(payload)) + continue inventoryHash = calculateInventoryHash(payload) - objectType = 'broadcast' + objectType = 3 shared.inventory[inventoryHash] = ( - objectType, streamNumber, payload, int(time.time()),tag) + objectType, streamNumber, payload, embeddedTime, tag) shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex') @@ -454,141 +513,144 @@ class singleWorker(threading.Thread): def sendMsg(self): - # Check to see if there are any messages queued to be sent - queryreturn = sqlQuery( - '''SELECT DISTINCT toaddress FROM sent WHERE (status='msgqueued' AND folder='sent')''') - for row in queryreturn: # For each address to which we need to send a message, check to see if we have its pubkey already. - toaddress, = row - status, toAddressVersion, toStreamNumber, toRipe = decodeAddress(toaddress) - # If we are sending a message to ourselves or a chan then we won't need an entry in the pubkeys table; we can calculate the needed pubkey using the private keys in our keys.dat file. - if shared.config.has_section(toaddress): - sqlExecute( - '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', - toaddress) - continue + while True: # while we have a msg that needs some work + + # Select just one msg that needs work. We'll get a msg + # which is ready to be sent or a msg which we have sent in + # the last 28 days which were previously marked + # as 'toodifficult'. If the user has raised the maximum acceptable + # difficulty then those msgs may now be sendable. queryreturn = sqlQuery( - '''SELECT hash FROM pubkeys WHERE hash=? AND addressversion=?''', toRipe, toAddressVersion) - if queryreturn != []: # If we have the needed pubkey in the pubkey table already, set the status to doingmsgpow (we'll do it further down) + '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='msgqueued' or status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' LIMIT 1''', + int(time.time()) - 2419200) + if len(queryreturn) == 0: # if there is no work to do then + break # break out of this sendMsg loop and + # wait for something to get put in the shared.workerQueue. + row = queryreturn[0] + toaddress, toripe, fromaddress, subject, message, ackdata, status = row + toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress( + toaddress) + fromStatus, fromAddressVersionNumber, fromStreamNumber, fromRipe = decodeAddress( + fromaddress) + + # We may or may not already have the pubkey for this toAddress. Let's check. + if status == 'forcepow': + # if the status of this msg is 'forcepow' then clearly we have the pubkey already + # because the user could not have overridden the message about the POW being + # too difficult without knowing the required difficulty. + pass + # If we are sending a message to ourselves or a chan then we won't need an entry in the pubkeys table; we can calculate the needed pubkey using the private keys in our keys.dat file. + elif shared.config.has_section(toaddress): sqlExecute( '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', toaddress) - else: # We don't have the needed pubkey in the pubkey table already. - if toAddressVersion <= 3: - toTag = '' - else: - toTag = hashlib.sha512(hashlib.sha512(encodeVarint(toAddressVersion)+encodeVarint(toStreamNumber)+toRipe).digest()).digest()[32:] - if toRipe in shared.neededPubkeys or toTag in shared.neededPubkeys: - # We already sent a request for the pubkey - sqlExecute( - '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress) - shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toRipe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) - else: - # We have not yet sent a request for the pubkey - needToRequestPubkey = True - if toAddressVersion >= 4: # If we are trying to send to address version >= 4 then the needed pubkey might be encrypted in the inventory. - # If we have it we'll need to decrypt it and put it in the pubkeys table. - queryreturn = sqlQuery( - '''SELECT payload FROM inventory WHERE objecttype='pubkey' and tag=? ''', toTag) - if queryreturn != []: # if there was a pubkey in our inventory with the correct tag, we need to try to decrypt it. - for row in queryreturn: - data, = row - if shared.decryptAndCheckPubkeyPayload(data, toaddress) == 'successful': - needToRequestPubkey = False - print 'debug. successfully decrypted and checked pubkey from sql inventory.' #testing - sqlExecute( - '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', - toaddress) - break - else: # There was something wrong with this pubkey even though it had the correct tag- almost certainly because of malicious behavior or a badly programmed client. - continue - if needToRequestPubkey: # Obviously we had no success looking in the sql inventory. Let's look through the memory inventory. - with shared.inventoryLock: - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime, tag = storedValue - if objectType == 'pubkey' and tag == toTag: - result = shared.decryptAndCheckPubkeyPayload(payload, toaddress) #if valid, this function also puts it in the pubkeys table. - if result == 'successful': - print 'debug. successfully decrypted and checked pubkey from memory inventory.' - needToRequestPubkey = False - sqlExecute( - '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', - toaddress) - break - if needToRequestPubkey: - sqlExecute( - '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''', - toaddress) - shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toRipe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) - self.requestPubKey(toaddress) - # Get all messages that are ready to be sent, and also all messages - # which we have sent in the last 28 days which were previously marked - # as 'toodifficult'. If the user as raised the maximum acceptable - # difficulty then those messages may now be sendable. - queryreturn = sqlQuery( - '''SELECT toaddress, toripe, fromaddress, subject, message, ackdata, status FROM sent WHERE (status='doingmsgpow' or status='forcepow' or (status='toodifficult' and lastactiontime>?)) and folder='sent' ''', - int(time.time()) - 2419200) - for row in queryreturn: # For each message we need to send.. - toaddress, toripe, fromaddress, subject, message, ackdata, status = row - toStatus, toAddressVersionNumber, toStreamNumber, toHash = decodeAddress( - toaddress) - fromStatus, fromAddressVersionNumber, fromStreamNumber, fromHash = decodeAddress( - fromaddress) - - if not shared.config.has_section(toaddress): - # There is a remote possibility that we may no longer have the - # recipient's pubkey. Let us make sure we still have it or else the - # sendMsg function will appear to freeze. This can happen if the - # user sends a message but doesn't let the POW function finish, - # then leaves their client off for a long time which could cause - # the needed pubkey to expire and be deleted. + status='doingmsgpow' + else: + # Let's see if we already have the pubkey in our pubkeys table queryreturn = sqlQuery( - '''SELECT hash FROM pubkeys WHERE hash=? AND addressversion=?''', - toripe, - toAddressVersionNumber) - if queryreturn == [] and toripe not in shared.neededPubkeys: - # We no longer have the needed pubkey and we haven't requested - # it. - with shared.printLock: - sys.stderr.write( - 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) + '''SELECT hash FROM pubkeys WHERE hash=? AND addressversion=?''', toRipe, toAddressVersionNumber) + if queryreturn != []: # If we have the needed pubkey in the pubkey table already, + # set the status of this msg to doingmsgpow sqlExecute( - '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='doingmsgpow' ''', toaddress) - shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) - self.requestPubKey(toaddress) - continue + '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) + status = 'doingmsgpow' + # mark the pubkey as 'usedpersonally' so that we don't delete it later + sqlExecute( + '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=? and addressversion=?''', + toRipe, + toAddressVersionNumber) + else: # We don't have the needed pubkey in the pubkeys table already. + if toAddressVersionNumber <= 3: + toTag = '' + else: + toTag = hashlib.sha512(hashlib.sha512(encodeVarint(toAddressVersionNumber)+encodeVarint(toStreamNumber)+toRipe).digest()).digest()[32:] + if toRipe in shared.neededPubkeys or toTag in shared.neededPubkeys: + # We already sent a request for the pubkey + sqlExecute( + '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='msgqueued' ''', toaddress) + shared.UISignalQueue.put(('updateSentItemStatusByHash', ( + toRipe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) + continue #on with the next msg on which we can do some work + else: + # We have not yet sent a request for the pubkey + needToRequestPubkey = True + if toAddressVersionNumber >= 4: # If we are trying to send to address version >= 4 then + # the needed pubkey might be encrypted in the inventory. + # If we have it we'll need to decrypt it and put it in + # the pubkeys table. + + # The decryptAndCheckPubkeyPayload function expects that the shared.neededPubkeys + # dictionary already contains the toAddress and cryptor object associated with + # the tag for this toAddress. + doubleHashOfToAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( + toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest() + privEncryptionKey = doubleHashOfToAddressData[:32] # The first half of the sha512 hash. + tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash. + shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))) + + queryreturn = sqlQuery( + '''SELECT payload FROM inventory WHERE objecttype=1 and tag=? ''', toTag) + if queryreturn != []: # if there are any pubkeys in our inventory with the correct tag.. + for row in queryreturn: + payload, = row + if shared.decryptAndCheckPubkeyPayload(payload, toaddress) == 'successful': + needToRequestPubkey = False + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''', + toaddress) + del shared.neededPubkeys[tag] + continue # We'll start back at the beginning, pick up this msg, mark the pubkey as 'usedpersonally', and then send the msg. + #else: # There was something wrong with this pubkey object even + # though it had the correct tag- almost certainly because + # of malicious behavior or a badly programmed client. If + # there are any other pubkeys in our inventory with the correct + # tag then we'll try to decrypt those. + + if needToRequestPubkey: # Obviously we had no success looking in the sql inventory. Let's look through the memory inventory. + with shared.inventoryLock: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, expiresTime, tag = storedValue + if objectType == 1 and tag == toTag: + if shared.decryptAndCheckPubkeyPayload(payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table. + needToRequestPubkey = False + sqlExecute( + '''UPDATE sent SET status='doingmsgpow' WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''', + toaddress) + del shared.neededPubkeys[tag] + continue # We'll start back at the beginning, pick up this msg, mark the pubkey as 'usedpersonally', and then send the msg. + if needToRequestPubkey: + sqlExecute( + '''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''', + toaddress) + shared.UISignalQueue.put(('updateSentItemStatusByHash', ( + toRipe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) + self.requestPubKey(toaddress) + continue #on with the next msg on which we can do some work + + # At this point we know that we have the necessary pubkey in the pubkeys table. + TTL = 2.5 * 24 * 60 * 60 # 2.5 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) # 2.5 days from now plus or minus five minutes + + if not shared.config.has_section(toaddress): # if we aren't sending this to ourselves or a chan shared.ackdataForWhichImWatching[ackdata] = 0 shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key")))) with shared.printLock: print 'Sending a message. First 150 characters of message:', repr(message[:150]) - - # mark the pubkey as 'usedpersonally' so that we don't ever delete - # it. - sqlExecute( - '''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=? and addressversion=?''', - toripe, - toAddressVersionNumber) # Let us fetch the recipient's public key out of our database. If # the required proof of work difficulty is too hard then we'll # abort. queryreturn = sqlQuery( 'SELECT transmitdata FROM pubkeys WHERE hash=? and addressversion=?', - toripe, + toRipe, toAddressVersionNumber) - if queryreturn == []: - with shared.printLock: - sys.stderr.write( - '(within sendMsg) The needed pubkey was not found. This should never happen. Aborting send.\n') - - return for row in queryreturn: pubkeyPayload, = row # The pubkey message is stored the way we originally received it + # under protocol version 2 # which means that we need to read beyond things like the nonce and # time to get to the actual public keys. if toAddressVersionNumber <= 3: @@ -645,6 +707,7 @@ class singleWorker(threading.Thread): requiredAverageProofOfWorkNonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte if requiredPayloadLengthExtraBytes < shared.networkDefaultPayloadLengthExtraBytes: requiredPayloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes + logger.debug('Using averageProofOfWorkNonceTrialsPerByte: %s and payloadLengthExtraBytes: %s.' % (requiredAverageProofOfWorkNonceTrialsPerByte, requiredPayloadLengthExtraBytes)) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.\nReceiver\'s required difficulty: %1 and %2").arg(str(float( requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes))))) if status != 'forcepow': @@ -680,10 +743,10 @@ class singleWorker(threading.Thread): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.")))) - embeddedTime = pack('>Q', (int(time.time()) + random.randrange( - -300, 300))) # the current time plus or minus five minutes. if fromAddressVersionNumber == 2: - payload = '\x01' # Message version. + payload = "" + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + payload += '\x01' # Message version. payload += encodeVarint(fromAddressVersionNumber) payload += encodeVarint(fromStreamNumber) payload += '\x00\x00\x00\x01' # Bitfield of features and behaviors that can be expected from me. (See https://bitmessage.org/wiki/Protocol_specification#Pubkey_bitfield_features ) @@ -714,7 +777,7 @@ class singleWorker(threading.Thread): 1:] # The \x04 on the beginning of the public keys are not sent. This way there is only one acceptable way to encode and send a public key. payload += pubEncryptionKey[1:] - payload += toHash # This hash will be checked by the receiver of the message to verify that toHash belongs to them. This prevents a Surreptitious Forwarding Attack. + payload += toRipe # This hash will be checked by the receiver of the message to verify that toRipe belongs to them. This prevents a Surreptitious Forwarding Attack. payload += '\x02' # Type 2 is simple UTF-8 message encoding as specified on the Protocol Specification on the Bitmessage Wiki. messageToTransmit = 'Subject:' + \ subject + '\n' + 'Body:' + message @@ -724,12 +787,18 @@ class singleWorker(threading.Thread): ackdata, toStreamNumber) # The fullAckPayload is a normal msg protocol message with the proof of work already completed that the receiver of this message can easily send out. payload += encodeVarint(len(fullAckPayload)) payload += fullAckPayload - signature = highlevelcrypto.sign(payload, privSigningKeyHex) + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + dataToSign = payload + else: + dataToSign = encodeVarint(1) + encodeVarint(toStreamNumber) + payload + signature = highlevelcrypto.sign(dataToSign, privSigningKeyHex) payload += encodeVarint(len(signature)) payload += signature if fromAddressVersionNumber >= 3: - payload = '\x01' # Message version. + payload = "" + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + payload += '\x01' # Message version. payload += encodeVarint(fromAddressVersionNumber) payload += encodeVarint(fromStreamNumber) payload += '\x00\x00\x00\x01' # Bitfield of features and behaviors that can be expected from me. (See https://bitmessage.org/wiki/Protocol_specification#Pubkey_bitfield_features ) @@ -774,7 +843,7 @@ class singleWorker(threading.Thread): payload += encodeVarint(shared.config.getint( fromaddress, 'payloadlengthextrabytes')) - payload += toHash # This hash will be checked by the receiver of the message to verify that toHash belongs to them. This prevents a Surreptitious Forwarding Attack. + payload += toRipe # This hash will be checked by the receiver of the message to verify that toRipe belongs to them. This prevents a Surreptitious Forwarding Attack. payload += '\x02' # Type 2 is simple UTF-8 message encoding as specified on the Protocol Specification on the Bitmessage Wiki. messageToTransmit = 'Subject:' + \ subject + '\n' + 'Body:' + message @@ -793,7 +862,11 @@ class singleWorker(threading.Thread): ackdata, toStreamNumber) # The fullAckPayload is a normal msg protocol message with the proof of work already completed that the receiver of this message can easily send out. payload += encodeVarint(len(fullAckPayload)) payload += fullAckPayload - signature = highlevelcrypto.sign(payload, privSigningKeyHex) + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + dataToSign = payload + else: + dataToSign = pack('>Q', embeddedTime) + '\x00\x00\x00\x02' + encodeVarint(1) + encodeVarint(toStreamNumber) + payload + signature = highlevelcrypto.sign(dataToSign, privSigningKeyHex) payload += encodeVarint(len(signature)) payload += signature @@ -805,8 +878,13 @@ class singleWorker(threading.Thread): sqlExecute('''UPDATE sent SET status='badkey' WHERE ackdata=?''', ackdata) shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(l10n.formatTimestamp())))) continue - encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted - target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte) + + encryptedPayload = pack('>Q', embeddedTime) + encryptedPayload += '\x00\x00\x00\x02' # object type: msg + if int(time.time()) >= 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + encryptedPayload += encodeVarint(1) # msg version + encryptedPayload += encodeVarint(toStreamNumber) + encrypted + target = 2 ** 64 / (requiredAverageProofOfWorkNonceTrialsPerByte*(len(encryptedPayload) + 8 + requiredPayloadLengthExtraBytes + ((TTL*(len(encryptedPayload)+8+requiredPayloadLengthExtraBytes))/(2 ** 16)))) with shared.printLock: print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes @@ -821,11 +899,18 @@ class singleWorker(threading.Thread): pass encryptedPayload = pack('>Q', nonce) + encryptedPayload + + # Sanity check. The encryptedPayload size should never be larger than 256 KiB. There should + # be checks elsewhere in the code to not let the user try to send a message this large + # until we implement message continuation. + if len(encryptedPayload) > 2 ** 18: # 256 KiB + logger.critical('This msg object is too large to send. This should never happen. Object size: %s' % len(encryptedPayload)) + continue inventoryHash = calculateInventoryHash(encryptedPayload) - objectType = 'msg' + objectType = 2 shared.inventory[inventoryHash] = ( - objectType, toStreamNumber, encryptedPayload, int(time.time()),'') + objectType, toStreamNumber, encryptedPayload, embeddedTime, '') shared.inventorySets[toStreamNumber].add(inventoryHash) if shared.config.has_section(toaddress): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(l10n.formatTimestamp())))) @@ -837,7 +922,7 @@ class singleWorker(threading.Thread): toStreamNumber, 'advertiseobject', inventoryHash)) # Update the status of the message in the 'sent' table to have a - # 'msgsent' status or 'msgsentnoackexpected' status. + # 'msgsent' or 'msgsentnoackexpected' status. if shared.config.has_section(toaddress): newStatus = 'msgsentnoackexpected' else: @@ -879,11 +964,18 @@ class singleWorker(threading.Thread): if addressVersionNumber <= 3: shared.neededPubkeys[ripe] = 0 elif addressVersionNumber >= 4: + # If the user just clicked 'send' then the tag (and other information) will already + # be in the neededPubkeys dictionary. But if we are recovering from a restart + # of the client then we have to put it in now. privEncryptionKey = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[:32] # Note that this is the first half of the sha512 hash. tag = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[32:] # Note that this is the second half of the sha512 hash. - shared.neededPubkeys[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. - payload = pack('>Q', (int(time.time()) + random.randrange( - -300, 300))) # the current time plus or minus five minutes. + if tag not in shared.neededPubkeys: + shared.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it. + + TTL = 2.5 * 24 * 60 * 60 # 2.5 days + embeddedTime = int(time.time() + random.randrange(-300, 300) + TTL) # 2.5 days from now plus or minus five minutes + payload = pack('>Q', embeddedTime) + payload += '\x00\x00\x00\x00' # object type: getpubkey payload += encodeVarint(addressVersionNumber) payload += encodeVarint(streamNumber) if addressVersionNumber <= 3: @@ -900,19 +992,19 @@ class singleWorker(threading.Thread): shared.UISignalQueue.put(('updateStatusBar', statusbar)) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( ripe, tr.translateText("MainWindow",'Doing work necessary to request encryption key.')))) - target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + - 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + + TTL = 2.5 * 24 * 60 * 60 # 2.5 days + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) with shared.printLock: print 'Found proof of work', trialValue, 'Nonce:', nonce - payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) - objectType = 'getpubkey' + objectType = 1 shared.inventory[inventoryHash] = ( - objectType, streamNumber, payload, int(time.time()),'') + objectType, streamNumber, payload, embeddedTime, '') shared.inventorySets[streamNumber].add(inventoryHash) print 'sending inv (for the getpubkey message)' shared.broadcastToSendDataQueues(( @@ -927,11 +1019,14 @@ class singleWorker(threading.Thread): shared.UISignalQueue.put(('updateSentItemStatusByHash', (ripe, tr.translateText("MainWindow",'Sending public key request. Waiting for reply. Requested at %1').arg(l10n.formatTimestamp())))) def generateFullAckMessage(self, ackdata, toStreamNumber): - embeddedTime = pack('>Q', (int(time.time()) + random.randrange( - -300, 300))) # the current time plus or minus five minutes. - payload = embeddedTime + encodeVarint(toStreamNumber) + ackdata - target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + - 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) + embeddedTime = int(time.time() + random.randrange(-300, 300)) # the current time plus or minus five minutes. + payload = pack('>Q', (embeddedTime)) + payload += '\x00\x00\x00\x02' # object type: msg + if int(time.time()) >= 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + payload += encodeVarint(1) # msg version + payload += encodeVarint(toStreamNumber) + ackdata + TTL = 2.5 * 24 * 60 * 60 # 2.5 days + target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) with shared.printLock: print '(For ack message) Doing proof of work...' @@ -946,4 +1041,4 @@ class singleWorker(threading.Thread): pass payload = pack('>Q', nonce) + payload - return shared.CreatePacket('msg', payload) + return shared.CreatePacket('object', payload) diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 2df6c05f..c53fb33a 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -39,28 +39,33 @@ class sqlThread(threading.Thread): '''CREATE TABLE blacklist (label text, address text, enabled bool)''' ) self.cur.execute( '''CREATE TABLE whitelist (label text, address text, enabled bool)''' ) - # Explanation of what is in the pubkeys table: - # The hash is the RIPEMD160 hash that is encoded in the Bitmessage address. - # transmitdata is literally the data that was included in the Bitmessage pubkey message when it arrived, except for the 24 byte protocol header- ie, it starts with the POW nonce. - # time is the time that the pubkey was broadcast on the network same as with every other type of Bitmessage object. - # usedpersonally is set to "yes" if we have used the key - # personally. This keeps us from deleting it because we may want to - # reply to a message in the future. This field is not a bool - # because we may need more flexability in the future and it doesn't - # take up much more space anyway. + """ + Explanation of what is in the pubkeys table: + The hash is the RIPEMD160 hash that is encoded in the Bitmessage address. + + transmitdata /was/ literally the data that was included in the Bitmessage pubkey message when it arrived, + except for the 24 byte protocol header- ie, it started with the POW nonce. Since protocol v3, to maintain + backwards compability, the data format of the data on disk is staying the same even though the wire format has changed. + + time is the time that the pubkey was broadcast on the network same as with every other type of Bitmessage object. + + usedpersonally is set to "yes" if we have used the key personally. This keeps us from deleting it because we may want to + reply to a message in the future. This field is not a bool because we may need more flexability in the future and it doesn't + take up much more space anyway. + """ self.cur.execute( '''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int, usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''' ) self.cur.execute( - '''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' ) + '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' ) self.cur.execute( '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') self.cur.execute( '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) - self.cur.execute( '''INSERT INTO settings VALUES('version','6')''') + self.cur.execute( '''INSERT INTO settings VALUES('version','7')''') self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( int(time.time()),)) self.cur.execute( - '''CREATE TABLE objectprocessorqueue (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' ) + '''CREATE TABLE objectprocessorqueue (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' ) self.conn.commit() logger.info('Created messages database file') except Exception as err: @@ -247,9 +252,11 @@ class sqlThread(threading.Thread): shared.config.set('bitmessagesettings', 'sendoutgoingconnections', 'True') # Raise the default required difficulty from 1 to 2 + # With the change to protocol v3, this is obsolete. if shared.config.getint('bitmessagesettings', 'settingsversion') == 6: - if int(shared.config.get('bitmessagesettings','defaultnoncetrialsperbyte')) == shared.networkDefaultProofOfWorkNonceTrialsPerByte: + """if int(shared.config.get('bitmessagesettings','defaultnoncetrialsperbyte')) == shared.networkDefaultProofOfWorkNonceTrialsPerByte: shared.config.set('bitmessagesettings','defaultnoncetrialsperbyte', str(shared.networkDefaultProofOfWorkNonceTrialsPerByte * 2)) + """ shared.config.set('bitmessagesettings', 'settingsversion', '7') with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) @@ -278,7 +285,7 @@ class sqlThread(threading.Thread): with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) - #Adjusting time period to stop sending messages + #Add settings to support no longer resending messages after a certain period of time even if we never get an ack if shared.config.getint('bitmessagesettings', 'settingsversion') == 7: shared.config.set( 'bitmessagesettings', 'stopresendingafterxdays', '') @@ -302,9 +309,39 @@ class sqlThread(threading.Thread): item = '''update settings set value=? WHERE key='version';''' parameters = (6,) self.cur.execute(item, parameters) + + # changes related to protocol v3 + # In table inventory and objectprocessorqueue, objecttype is now an integer (it was a human-friendly string previously) + item = '''SELECT value FROM settings WHERE key='version';''' + parameters = '' + self.cur.execute(item, parameters) + currentVersion = int(self.cur.fetchall()[0][0]) + if currentVersion == 6: + logger.debug('In messages.dat database, dropping and recreating the inventory table.') + self.cur.execute( '''DROP TABLE inventory''') + self.cur.execute( '''CREATE TABLE inventory (hash blob, objecttype int, streamnumber int, payload blob, expirestime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' ) + self.cur.execute( '''DROP TABLE objectprocessorqueue''') + self.cur.execute( '''CREATE TABLE objectprocessorqueue (objecttype int, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' ) + item = '''update settings set value=? WHERE key='version';''' + parameters = (7,) + self.cur.execute(item, parameters) + logger.debug('Finished dropping and recreating the inventory table.') + + # With the change to protocol version 3, reset the user-settable difficulties to 1 + if shared.config.getint('bitmessagesettings', 'settingsversion') == 8: + shared.config.set('bitmessagesettings','defaultnoncetrialsperbyte', str(shared.networkDefaultProofOfWorkNonceTrialsPerByte)) + shared.config.set('bitmessagesettings','defaultpayloadlengthextrabytes', str(shared.networkDefaultPayloadLengthExtraBytes)) + previousTotalDifficulty = int(shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte')) / 320 + previousSmallMessageDifficulty = int(shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes')) / 1400 + shared.config.set('bitmessagesettings','maxacceptablenoncetrialsperbyte', str(previousTotalDifficulty * 1000)) + shared.config.set('bitmessagesettings','maxacceptablepayloadlengthextrabytes', str(previousSmallMessageDifficulty * 1000)) + shared.config.set('bitmessagesettings', 'settingsversion', '9') + with open(shared.appdata + 'keys.dat', 'wb') as configfile: + shared.config.write(configfile) + # Are you hoping to add a new option to the keys.dat file of existing - # Bitmessage users? Add it right above this line! + # Bitmessage users or modify the SQLite database? Add it right above this line! try: testpayload = '\x00\x00' diff --git a/src/defaultKnownNodes.py b/src/defaultKnownNodes.py index bf98c351..d46171d5 100644 --- a/src/defaultKnownNodes.py +++ b/src/defaultKnownNodes.py @@ -12,15 +12,8 @@ def createDefaultKnownNodes(appdata): stream1 = {} #stream1[shared.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time()) - stream1[shared.Peer('68.33.0.104', 8444)] = int(time.time()) - stream1[shared.Peer('97.77.34.35', 8444)] = int(time.time()) - stream1[shared.Peer('71.232.195.131', 8444)] = int(time.time()) - stream1[shared.Peer('192.241.231.39', 8444)] = int(time.time()) - stream1[shared.Peer('75.66.0.116', 8444)] = int(time.time()) - stream1[shared.Peer('182.169.23.102', 8444)] = int(time.time()) - stream1[shared.Peer('75.95.134.9', 8444)] = int(time.time()) - stream1[shared.Peer('46.236.100.108', 48444)] = int(time.time()) - stream1[shared.Peer('66.108.53.42', 8080)] = int(time.time()) + stream1[shared.Peer('158.222.211.81', 8080)] = int(time.time()) + stream1[shared.Peer('85.214.194.88', 8444)] = int(time.time()) ############# Stream 2 ################# stream2 = {} diff --git a/src/helper_bootstrap.py b/src/helper_bootstrap.py index 8e9d28e5..5c1aa77e 100644 --- a/src/helper_bootstrap.py +++ b/src/helper_bootstrap.py @@ -25,7 +25,7 @@ def knownNodes(): shared.knownNodes[stream][peer] = time except: shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(shared.appdata) - if shared.config.getint('bitmessagesettings', 'settingsversion') > 8: + if shared.config.getint('bitmessagesettings', 'settingsversion') > 9: print 'Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.' raise SystemExit diff --git a/src/helper_startup.py b/src/helper_startup.py index fd14e528..f8e592d1 100644 --- a/src/helper_startup.py +++ b/src/helper_startup.py @@ -57,7 +57,7 @@ def loadConfig(): # This appears to be the first time running the program; there is # no config file (or it cannot be accessed). Create config file. shared.config.add_section('bitmessagesettings') - shared.config.set('bitmessagesettings', 'settingsversion', '8') + shared.config.set('bitmessagesettings', 'settingsversion', '9') shared.config.set('bitmessagesettings', 'port', '8444') shared.config.set( 'bitmessagesettings', 'timeformat', '%%a, %%d %%b %%Y %%I:%%M %%p') @@ -89,7 +89,7 @@ def loadConfig(): shared.config.set( 'bitmessagesettings', 'messagesencrypted', 'false') shared.config.set('bitmessagesettings', 'defaultnoncetrialsperbyte', str( - shared.networkDefaultProofOfWorkNonceTrialsPerByte * 2)) + shared.networkDefaultProofOfWorkNonceTrialsPerByte)) shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes', str( shared.networkDefaultPayloadLengthExtraBytes)) shared.config.set('bitmessagesettings', 'minimizeonclose', 'false') diff --git a/src/highlevelcrypto.py b/src/highlevelcrypto.py index 8838aa9f..54c010ce 100644 --- a/src/highlevelcrypto.py +++ b/src/highlevelcrypto.py @@ -33,7 +33,10 @@ def sign(msg,hexPrivkey): return makeCryptor(hexPrivkey).sign(msg) # Verifies with hex public key def verify(msg,sig,hexPubkey): - return makePubCryptor(hexPubkey).verify(sig,msg) + try: + return makePubCryptor(hexPubkey).verify(sig,msg) + except: + return False # Does an EC point multiplication; turns a private key into a public key. def pointMult(secret): diff --git a/src/message_data_reader.py b/src/message_data_reader.py index d13e1493..80cea70a 100644 --- a/src/message_data_reader.py +++ b/src/message_data_reader.py @@ -54,13 +54,13 @@ def readPubkeys(): def readInventory(): print 'Printing everything in inventory table:' - item = '''select hash, objecttype, streamnumber, payload, receivedtime from inventory''' + item = '''select hash, objecttype, streamnumber, payload, expirestime from inventory''' parameters = '' cur.execute(item, parameters) output = cur.fetchall() for row in output: - hash, objecttype, streamnumber, payload, receivedtime = row - print 'Hash:', hash.encode('hex'), objecttype, streamnumber, '\t', payload.encode('hex'), '\t', unicode(strftime('%a, %d %b %Y %I:%M %p',localtime(receivedtime)),'utf-8') + hash, objecttype, streamnumber, payload, expirestime = row + print 'Hash:', hash.encode('hex'), objecttype, streamnumber, '\t', payload.encode('hex'), '\t', unicode(strftime('%a, %d %b %Y %I:%M %p',localtime(expirestime)),'utf-8') def takeInboxMessagesOutOfTrash(): diff --git a/src/proofofwork.py b/src/proofofwork.py index c26f2e1b..8857ccb3 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -5,6 +5,7 @@ import hashlib from struct import unpack, pack import sys from shared import config, frozen +import shared #import os def _set_idle(): @@ -39,7 +40,6 @@ def _doSafePoW(target, initialHash): return [trialValue, nonce] def _doFastPoW(target, initialHash): - import shared import time from multiprocessing import Pool, cpu_count try: diff --git a/src/pyelliptic/ecc.py b/src/pyelliptic/ecc.py index 95628572..31bcb1e7 100644 --- a/src/pyelliptic/ecc.py +++ b/src/pyelliptic/ecc.py @@ -436,11 +436,16 @@ class ECC: pubkey = ephem.get_pubkey() iv = OpenSSL.rand(OpenSSL.get_cipher(ciphername).get_blocksize()) ctx = Cipher(key_e, iv, 1, ciphername) - ciphertext = ctx.ciphering(data) - #ciphertext = iv + pubkey + ctx.ciphering(data) # We will switch to this line after an upgrade period + import time + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + ciphertext = ctx.ciphering(data) + else: + ciphertext = iv + pubkey + ctx.ciphering(data) # Everyone should be using this line after the Bitmessage protocol v3 upgrade period mac = hmac_sha256(key_m, ciphertext) - return iv + pubkey + ciphertext + mac - #return ciphertext + mac # We will switch to this line after an upgrade period. + if int(time.time()) < 1416175200: # Sun, 16 Nov 2014 22:00:00 GMT + return iv + pubkey + ciphertext + mac + else: + return ciphertext + mac # Everyone should be using this line after the Bitmessage protocol v3 upgrade period def decrypt(self, data, ciphername='aes-256-cbc'): """ diff --git a/src/shared.py b/src/shared.py index cb3aa1c6..e27a2213 100644 --- a/src/shared.py +++ b/src/shared.py @@ -1,9 +1,7 @@ -softwareVersion = '0.4.3' +softwareVersion = '0.4.4' verbose = 1 -maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours. -lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice. +maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # This is obsolete with the change to protocol v3 but the singleCleaner thread still hasn't been updated so we need this a little longer. lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time. -maximumAgeOfObjectsThatIAdvertiseToOthers = 216000 # Equals two days and 12 hours maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages. @@ -22,12 +20,13 @@ import threading import time from os import path, environ from struct import Struct +import traceback # Project imports. from addresses import * import highlevelcrypto import shared -import helper_startup +#import helper_startup from helper_sql import * @@ -82,8 +81,8 @@ objectProcessorQueue = Queue.Queue( streamsInWhichIAmParticipating = {} #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! -networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. -networkDefaultPayloadLengthExtraBytes = 14000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target. +networkDefaultProofOfWorkNonceTrialsPerByte = 1000 #The amount of work that should be performed (and demanded) per byte of the payload. +networkDefaultPayloadLengthExtraBytes = 1000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target. # Remember here the RPC port read from namecoin.conf so we can restore to # it as default whenever the user changes the "method" selection for @@ -114,10 +113,7 @@ Header = Struct('!L12sL4s') #Create a packet def CreatePacket(command, payload=''): payload_length = len(payload) - if payload_length == 0: - checksum = '\xCF\x83\xE1\x35' - else: - checksum = hashlib.sha512(payload).digest()[0:4] + checksum = hashlib.sha512(payload).digest()[0:4] b = bytearray(Header.size + payload_length) Header.pack_into(b, 0, 0xE9BEB4D9, command, payload_length, checksum) @@ -137,7 +133,7 @@ def encodeHost(host): def assembleVersionMessage(remoteHost, remotePort, myStreamNumber): payload = '' - payload += pack('>L', 2) # protocol version. + payload += pack('>L', 3) # protocol version. payload += pack('>q', 1) # bitflags of the services I offer. payload += pack('>q', int(time.time())) @@ -314,17 +310,20 @@ def reloadBroadcastSendersForWhichImWatching(): privEncryptionKey = doubleHashOfAddressData[:32] MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) -def isProofOfWorkSufficient( - data, - nonceTrialsPerByte=0, - payloadLengthExtraBytes=0): +def isProofOfWorkSufficient(data, + nonceTrialsPerByte=0, + payloadLengthExtraBytes=0): if nonceTrialsPerByte < networkDefaultProofOfWorkNonceTrialsPerByte: nonceTrialsPerByte = networkDefaultProofOfWorkNonceTrialsPerByte if payloadLengthExtraBytes < networkDefaultPayloadLengthExtraBytes: payloadLengthExtraBytes = networkDefaultPayloadLengthExtraBytes + endOfLifeTime, = unpack('>Q', data[8:16]) + TTL = endOfLifeTime - int(time.time()) + if TTL < 300: + TTL = 300 POW, = unpack('>Q', hashlib.sha512(hashlib.sha512(data[ :8] + hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8]) - return POW <= 2 ** 64 / ((len(data) + payloadLengthExtraBytes) * (nonceTrialsPerByte)) + return POW <= 2 ** 64 / (nonceTrialsPerByte*(len(data) + payloadLengthExtraBytes + ((TTL*(len(data)+payloadLengthExtraBytes))/(2 ** 16)))) def doCleanShutdown(): global shutdown @@ -383,9 +382,9 @@ def flushInventory(): #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. with SqlBulkExecute() as sql: for hash, storedValue in inventory.items(): - objectType, streamNumber, payload, receivedTime, tag = storedValue + objectType, streamNumber, payload, expiresTime, tag = storedValue sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash,objectType,streamNumber,payload,receivedTime,tag) + hash,objectType,streamNumber,payload,expiresTime,tag) del inventory[hash] def fixPotentiallyInvalidUTF8Data(text): @@ -455,115 +454,220 @@ def isBitSetWithinBitfield(fourByteString, n): x, = unpack('>L', fourByteString) return x & 2**n != 0 -def decryptAndCheckPubkeyPayload(payload, address): - status, addressVersion, streamNumber, ripe = decodeAddress(address) - doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( - addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest() - readPosition = 8 # bypass the nonce - readPosition += 8 # bypass the time - embeddedVersionNumber, varintLength = decodeVarint( - payload[readPosition:readPosition + 10]) - if embeddedVersionNumber != addressVersion: - logger.info('Pubkey decryption was UNsuccessful due to address version mismatch. This shouldn\'t have happened.') - return 'failed' - readPosition += varintLength - embeddedStreamNumber, varintLength = decodeVarint( - payload[readPosition:readPosition + 10]) - if embeddedStreamNumber != streamNumber: - logger.info('Pubkey decryption was UNsuccessful due to stream number mismatch. This shouldn\'t have happened.') - return 'failed' - readPosition += varintLength - signedData = payload[8:readPosition] # Some of the signed data is not encrypted so let's keep it for now. - toTag = payload[readPosition:readPosition+32] - readPosition += 32 #for the tag - encryptedData = payload[readPosition:] - # Let us try to decrypt the pubkey - privEncryptionKey = doubleHashOfAddressData[:32] - cryptorObject = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) - try: - decryptedData = cryptorObject.decrypt(encryptedData) - except: - # Someone must have encrypted some data with a different key - # but tagged it with a tag for which we are watching. - logger.info('Pubkey decryption was UNsuccessful. This shouldn\'t have happened.') - return 'failed' - logger.debug('Pubkey decryption successful') - readPosition = 4 # bypass the behavior bitfield - publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64] - # Is it possible for a public key to be invalid such that trying to - # encrypt or check a sig with it will cause an error? If it is, we should - # probably test these keys here. - readPosition += 64 - publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedNonceTrialsPerByteLength - specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedPayloadLengthExtraBytesLength - signedData += decryptedData[:readPosition] - signatureLength, signatureLengthLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += signatureLengthLength - signature = decryptedData[readPosition:readPosition + signatureLength] - try: - if not highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')): - logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload).') - return 'failed' - logger.debug('ECDSA verify passed (within decryptAndCheckPubkeyPayload)') - except Exception as err: - logger.debug('ECDSA verify failed (within decryptAndCheckPubkeyPayload) %s' % err) - return 'failed' - sha = hashlib.new('sha512') - sha.update(publicSigningKey + publicEncryptionKey) - ripeHasher = hashlib.new('ripemd160') - ripeHasher.update(sha.digest()) - embeddedRipe = ripeHasher.digest() - - if embeddedRipe != ripe: - # Although this pubkey object had the tag were were looking for and was - # encrypted with the correct encryption key, it doesn't contain the - # correct keys. Someone is either being malicious or using buggy software. - logger.info('Pubkey decryption was UNsuccessful due to RIPE mismatch. This shouldn\'t have happened.') - return 'failed' +def decryptAndCheckPubkeyPayload(data, address): + """ + With the changes in protocol v3, to maintain backwards compatibility, signatures will be sent + the 'old' way during an upgrade period and then a 'new' simpler way after that. We will therefore + check the sig both ways. + Old way: + signedData = timePubkeyWasSigned(8 bytes) + addressVersion + streamNumber + the decrypted data down through the payloadLengthExtraBytes + New way: + signedData = all of the payload data from the time to the tag + the decrypted data down through the payloadLengthExtraBytes - t = (ripe, addressVersion, signedData, int(time.time()), 'yes') - sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) - return 'successful' + The timePubkeyWasSigned will be calculated by subtracting 28 days form the embedded expiresTime. + """ + + """ + The time, address version, and stream number are not encrypted so let's + keep that data here for now. + """ + try: + status, addressVersion, streamNumber, ripe = decodeAddress(address) + + readPosition = 20 # bypass the nonce, time, and object type + embeddedAddressVersion, varintLength = decodeVarint(data[readPosition:readPosition + 10]) + readPosition += varintLength + embeddedStreamNumber, varintLength = decodeVarint(data[readPosition:readPosition + 10]) + readPosition += varintLength + + if addressVersion != embeddedAddressVersion: + logger.info('Pubkey decryption was UNsuccessful due to address version mismatch.') + return 'failed' + if streamNumber != embeddedStreamNumber: + logger.info('Pubkey decryption was UNsuccessful due to stream number mismatch.') + return 'failed' + + expiresTime, = unpack('>Q', data[8:16]) + TTL = 28 * 24 * 60 * 60 + signedDataOldMethod = pack('>Q', (expiresTime - TTL)) # the time that the pubkey was signed. 8 bytes. + signedDataOldMethod += data[20:readPosition] # the address version and stream number + + tag = data[readPosition:readPosition + 32] + readPosition += 32 + + signedDataNewMethod = data[8:readPosition] # the time through the tag + + encryptedData = data[readPosition:] + + # Let us try to decrypt the pubkey + toAddress, cryptorObject = shared.neededPubkeys[tag] + if toAddress != address: + logger.critical('decryptAndCheckPubkeyPayload failed due to toAddress mismatch. This is very peculiar. toAddress: %s, address %s' % (toAddress, address)) + # the only way I can think that this could happen is if someone encodes their address data two different ways. + # That sort of address-malleability should have been prevented earlier. + return 'failed' + try: + decryptedData = cryptorObject.decrypt(encryptedData) + except: + # Someone must have encrypted some data with a different key + # but tagged it with a tag for which we are watching. + logger.info('Pubkey decryption was unsuccessful.') + return 'failed' + + readPosition = 0 + bitfieldBehaviors = decryptedData[readPosition:readPosition + 4] + readPosition += 4 + publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64] + readPosition += 64 + publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64] + readPosition += 64 + specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint( + decryptedData[readPosition:readPosition + 10]) + readPosition += specifiedNonceTrialsPerByteLength + specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint( + decryptedData[readPosition:readPosition + 10]) + readPosition += specifiedPayloadLengthExtraBytesLength + signedDataOldMethod += decryptedData[:readPosition] + signedDataNewMethod += decryptedData[:readPosition] + signatureLength, signatureLengthLength = decodeVarint( + decryptedData[readPosition:readPosition + 10]) + readPosition += signatureLengthLength + signature = decryptedData[readPosition:readPosition + signatureLength] + + if highlevelcrypto.verify(signedDataOldMethod, signature, publicSigningKey.encode('hex')): + logger.info('ECDSA verify passed (within decryptAndCheckPubkeyPayload, old method)') + else: + logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload, old method)') + # Try the protocol v3 signing method + if highlevelcrypto.verify(signedDataNewMethod, signature, publicSigningKey.encode('hex')): + logger.info('ECDSA verify passed (within decryptAndCheckPubkeyPayload, new method)') + else: + logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload, new method)') + return 'failed' + + sha = hashlib.new('sha512') + sha.update(publicSigningKey + publicEncryptionKey) + ripeHasher = hashlib.new('ripemd160') + ripeHasher.update(sha.digest()) + embeddedRipe = ripeHasher.digest() + + if embeddedRipe != ripe: + # Although this pubkey object had the tag were were looking for and was + # encrypted with the correct encryption key, it doesn't contain the + # correct keys. Someone is either being malicious or using buggy software. + logger.info('Pubkey decryption was UNsuccessful due to RIPE mismatch.') + return 'failed' + + # Everything checked out. Insert it into the pubkeys table. + + logger.info('within decryptAndCheckPubkeyPayload, addressVersion: %s, streamNumber: %s \n\ + ripe %s\n\ + publicSigningKey in hex: %s\n\ + publicEncryptionKey in hex: %s' % (addressVersion, + streamNumber, + ripe.encode('hex'), + publicSigningKey.encode('hex'), + publicEncryptionKey.encode('hex') + ) + ) + + t = (ripe, addressVersion, signedDataOldMethod, int(time.time()), 'yes') + sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) + return 'successful' + except varintDecodeError as e: + logger.info('Pubkey decryption was UNsuccessful due to a malformed varint.') + return 'failed' + except Exception as e: + logger.critical('Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s' % traceback.format_exc()) + return 'failed' Peer = collections.namedtuple('Peer', ['host', 'port']) -def checkAndShareMsgWithPeers(data): +def checkAndShareObjectWithPeers(data): + """ + Function is called after either receiving an object off of the wire + or after receiving one as ackdata. + Returns the length of time that we should reserve to process this message + if we are receiving it off of the wire. + """ # Let us check to make sure that the proof of work is sufficient. if not isProofOfWorkSufficient(data): - logger.debug('Proof of work in msg message insufficient.') - return + logger.info('Proof of work is insufficient.') + return 0 + + endOfLifeTime, = unpack('>Q', data[8:16]) + if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room + logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime) + return 0 + if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much. + logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime) + return 0 + intObjectType, = unpack('>I', data[16:20]) + try: + if intObjectType == 0: + _checkAndShareGetpubkeyWithPeers(data) + return 0.1 + elif intObjectType == 1: + _checkAndSharePubkeyWithPeers(data) + return 0.1 + elif intObjectType == 2: + _checkAndShareMsgWithPeers(data) + return 0.6 + elif intObjectType == 3: + _checkAndShareBroadcastWithPeers(data) + return 0.6 + else: + _checkAndShareUndefinedObjectWithPeers(data) + return 0.6 + except varintDecodeError as e: + logger.debug("There was a problem with a varint while checking to see whether it was appropriate to share an object with peers. Some details: %s" % e) + except Exception as e: + logger.critical('There was a problem while checking to see whether it was appropriate to share an object with peers. This is definitely a bug! \n%s' % traceback.format_exc()) + return 0 + - readPosition = 8 - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - - if embeddedTime > (int(time.time()) + 10800): - logger.debug('The embedded time in this msg message is more than three hours in the future. That doesn\'t make sense. Ignoring message.') - return - if embeddedTime < (int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept): - logger.debug('The embedded time in this msg message is too old. Ignoring message.') - return - streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint( +def _checkAndShareUndefinedObjectWithPeers(data): + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass nonce, time, and object type + objectVersion, objectVersionLength = decodeVarint( data[readPosition:readPosition + 9]) - if not streamNumberAsClaimedByMsg in streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumberAsClaimedByMsg) + readPosition += objectVersionLength + streamNumber, streamNumberLength = decodeVarint( + data[readPosition:readPosition + 9]) + if not streamNumber in streamsInWhichIAmParticipating: + logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber) return - readPosition += streamNumberAsClaimedByMsgLength + + inventoryHash = calculateInventoryHash(data) + shared.numberOfInventoryLookupsPerformed += 1 + inventoryLock.acquire() + if inventoryHash in inventory: + logger.debug('We have already received this undefined object. Ignoring.') + inventoryLock.release() + return + elif isInSqlInventory(inventoryHash): + logger.debug('We have already received this undefined object (it is stored on disk in the SQL inventory). Ignoring it.') + inventoryLock.release() + return + objectType, = unpack('>I', data[16:20]) + inventory[inventoryHash] = ( + objectType, streamNumber, data, embeddedTime,'') + inventorySets[streamNumber].add(inventoryHash) + inventoryLock.release() + logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) + broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + + +def _checkAndShareMsgWithPeers(data): + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass nonce, time, and object type + streamNumber, streamNumberLength = decodeVarint( + data[readPosition:readPosition + 9]) + if not streamNumber in streamsInWhichIAmParticipating: + logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber) + return + readPosition += streamNumberLength inventoryHash = calculateInventoryHash(data) shared.numberOfInventoryLookupsPerformed += 1 inventoryLock.acquire() @@ -576,13 +680,13 @@ def checkAndShareMsgWithPeers(data): inventoryLock.release() return # This msg message is valid. Let's let our peers know about it. - objectType = 'msg' + objectType = 2 inventory[inventoryHash] = ( - objectType, streamNumberAsClaimedByMsg, data, embeddedTime,'') - inventorySets[streamNumberAsClaimedByMsg].add(inventoryHash) + objectType, streamNumber, data, embeddedTime,'') + inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) - broadcastToSendDataQueues((streamNumberAsClaimedByMsg, 'advertiseobject', inventoryHash)) + broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's enqueue it to be processed ourselves. # If we already have too much data in the queue to be processed, just sleep for now. @@ -592,30 +696,14 @@ def checkAndShareMsgWithPeers(data): shared.objectProcessorQueueSize += len(data) objectProcessorQueue.put((objectType,data)) -def checkAndSharegetpubkeyWithPeers(data): - if not isProofOfWorkSufficient(data): - logger.debug('Proof of work in getpubkey message insufficient.') - return - if len(data) < 34: - logger.debug('getpubkey message doesn\'t contain enough data. Ignoring.') - return - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - - if embeddedTime > int(time.time()) + 10800: - logger.debug('The time in this getpubkey message is too new. Ignoring it. Time: %s' % embeddedTime) - return - if embeddedTime < int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept: - logger.debug('The time in this getpubkey message is too old. Ignoring it. Time: %s' % embeddedTime) +def _checkAndShareGetpubkeyWithPeers(data): + if len(data) < 42: + logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') return + if len(data) > 200: + logger.info('getpubkey is abnormally long. Sanity check failed. Ignoring object.') + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass the nonce, time, and object type requestedAddressVersionNumber, addressVersionLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += addressVersionLength @@ -638,7 +726,7 @@ def checkAndSharegetpubkeyWithPeers(data): inventoryLock.release() return - objectType = 'getpubkey' + objectType = 0 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') inventorySets[streamNumber].add(inventoryHash) @@ -655,31 +743,11 @@ def checkAndSharegetpubkeyWithPeers(data): shared.objectProcessorQueueSize += len(data) objectProcessorQueue.put((objectType,data)) -def checkAndSharePubkeyWithPeers(data): - if len(data) < 146 or len(data) > 420: # sanity check - return - # Let us make sure that the proof of work is sufficient. - if not isProofOfWorkSufficient(data): - logger.debug('Proof of work in pubkey message insufficient.') - return - - readPosition = 8 # for the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - - if embeddedTime < int(time.time()) - lengthOfTimeToHoldOnToAllPubkeys: - logger.debug('The embedded time in this pubkey message is too old. Ignoring. Embedded time is: %s' % embeddedTime) - return - if embeddedTime > int(time.time()) + 10800: - logger.debug('The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.') +def _checkAndSharePubkeyWithPeers(data): + if len(data) < 146 or len(data) > 440: # sanity check return + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass the nonce, time, and object type addressVersion, varintLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += varintLength @@ -706,7 +774,7 @@ def checkAndSharePubkeyWithPeers(data): logger.debug('We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.') inventoryLock.release() return - objectType = 'pubkey' + objectType = 1 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) inventorySets[streamNumber].add(inventoryHash) @@ -725,31 +793,12 @@ def checkAndSharePubkeyWithPeers(data): objectProcessorQueue.put((objectType,data)) -def checkAndShareBroadcastWithPeers(data): - # Let us verify that the proof of work is sufficient. - if not isProofOfWorkSufficient(data): - logger.debug('Proof of work in broadcast message insufficient.') - return - readPosition = 8 # bypass the nonce - embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) - - # This section is used for the transition from 32 bit time to 64 bit - # time in the protocol. - if embeddedTime == 0: - embeddedTime, = unpack('>Q', data[readPosition:readPosition + 8]) - readPosition += 8 - else: - readPosition += 4 - - if embeddedTime > (int(time.time()) + 10800): # prevent funny business - logger.debug('The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.') - return - if embeddedTime < (int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept): - logger.debug('The embedded time in this broadcast message is too old. Ignoring message.') - return +def _checkAndShareBroadcastWithPeers(data): if len(data) < 180: logger.debug('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.') return + embeddedTime, = unpack('>Q', data[8:16]) + readPosition = 20 # bypass the nonce, time, and object type broadcastVersion, broadcastVersionLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += broadcastVersionLength @@ -775,7 +824,7 @@ def checkAndShareBroadcastWithPeers(data): inventoryLock.release() return # It is valid. Let's let our peers know about it. - objectType = 'broadcast' + objectType = 3 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) inventorySets[streamNumber].add(inventoryHash) @@ -793,5 +842,5 @@ def checkAndShareBroadcastWithPeers(data): objectProcessorQueue.put((objectType,data)) -helper_startup.loadConfig() +#helper_startup.loadConfig() from debug import logger