diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 6fa31a47..1bacf639 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -7,10 +7,10 @@ processes the network objects import hashlib import logging import random +import subprocess # nosec import threading import time from binascii import hexlify -from subprocess import call # nosec import helper_bitcoin import helper_inbox @@ -22,7 +22,6 @@ import protocol import queues import shared import state -import tr from addresses import ( calculateInventoryHash, decodeAddress, decodeVarint, encodeAddress, encodeVarint, varintDecodeError @@ -32,7 +31,7 @@ from fallback import RIPEMD160Hash from helper_sql import sql_ready, SqlBulkExecute, sqlExecute, sqlQuery from network import bmproto, knownnodes from network.node import Peer -# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches, too-many-statements +from tr import _translate logger = logging.getLogger('default') @@ -52,11 +51,10 @@ class objectProcessor(threading.Thread): # objectprocessorqueue table. Let's pull it out. sql_ready.wait() queryreturn = sqlQuery( - '''SELECT objecttype, data FROM objectprocessorqueue''') - for row in queryreturn: - objectType, data = row + 'SELECT objecttype, data FROM objectprocessorqueue') + for objectType, data in queryreturn: queues.objectProcessorQueue.put((objectType, data)) - sqlExecute('''DELETE FROM objectprocessorqueue''') + sqlExecute('DELETE FROM objectprocessorqueue') logger.debug( 'Loaded %s objects from disk into the objectProcessorQueue.', len(queryreturn)) @@ -100,7 +98,7 @@ class objectProcessor(threading.Thread): 'The object is too big after decompression (stopped' ' decompressing at %ib, your configured limit %ib).' ' Ignoring', - e.size, BMConfigParser().safeGetInt("zlib", "maxsize")) + e.size, BMConfigParser().safeGetInt('zlib', 'maxsize')) except varintDecodeError as e: logger.debug( 'There was a problem with a varint while processing an' @@ -131,7 +129,6 @@ class objectProcessor(threading.Thread): @staticmethod def checkackdata(data): """Checking Acknowledgement of message received or not?""" - # pylint: disable=protected-access # Let's check whether this is a message acknowledgement bound for us. if len(data) < 32: return @@ -143,18 +140,15 @@ class objectProcessor(threading.Thread): logger.info('This object is an acknowledgement bound for me.') del state.ackdataForWhichImWatching[data[readPosition:]] sqlExecute( - 'UPDATE sent SET status=?, lastactiontime=?' - ' WHERE ackdata=?', - 'ackreceived', int(time.time()), data[readPosition:]) + "UPDATE sent SET status='ackreceived', lastactiontime=?" + " WHERE ackdata=?", int(time.time()), data[readPosition:]) queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', - ( + 'updateSentItemStatusByAckdata', ( data[readPosition:], - tr._translate( + _translate( "MainWindow", "Acknowledgement of the message received %1" - ).arg(l10n.formatTimestamp()) - ) + ).arg(l10n.formatTimestamp())) )) else: logger.debug('This object is not an acknowledgement bound for me.') @@ -183,10 +177,9 @@ class objectProcessor(threading.Thread): def processgetpubkey(data): """Process getpubkey object""" if len(data) > 200: - logger.info( + return logger.info( 'getpubkey is abnormally long. Sanity check failed.' ' Ignoring object.') - return readPosition = 20 # bypass the nonce, time, and object type requestedAddressVersionNumber, addressVersionLength = decodeVarint( data[readPosition:readPosition + 10]) @@ -196,29 +189,25 @@ class objectProcessor(threading.Thread): readPosition += streamNumberLength if requestedAddressVersionNumber == 0: - logger.debug( + return logger.debug( 'The requestedAddressVersionNumber of the pubkey request' ' is zero. That doesn\'t make any sense. Ignoring it.') - return - elif requestedAddressVersionNumber == 1: - logger.debug( + if requestedAddressVersionNumber == 1: + return logger.debug( 'The requestedAddressVersionNumber of the pubkey request' ' is 1 which isn\'t supported anymore. Ignoring it.') - return - elif requestedAddressVersionNumber > 4: - logger.debug( + if requestedAddressVersionNumber > 4: + return logger.debug( 'The requestedAddressVersionNumber of the pubkey request' ' is too high. Can\'t understand. Ignoring it.') - return myAddress = '' if requestedAddressVersionNumber <= 3: requestedHash = data[readPosition:readPosition + 20] if len(requestedHash) != 20: - logger.debug( + return logger.debug( 'The length of the requested hash is not 20 bytes.' ' Something is wrong. Ignoring.') - return logger.info( 'the hash requested in this getpubkey request is: %s', hexlify(requestedHash)) @@ -228,10 +217,9 @@ class objectProcessor(threading.Thread): elif requestedAddressVersionNumber >= 4: requestedTag = data[readPosition:readPosition + 32] if len(requestedTag) != 32: - logger.debug( + return logger.debug( 'The length of the requested tag is not 32 bytes.' ' Something is wrong. Ignoring.') - return logger.debug( 'the tag requested in this getpubkey request is: %s', hexlify(requestedTag)) @@ -243,35 +231,31 @@ class objectProcessor(threading.Thread): return if decodeAddress(myAddress)[1] != requestedAddressVersionNumber: - logger.warning( + return logger.warning( '(Within the processgetpubkey function) Someone requested' ' one of my pubkeys but the requestedAddressVersionNumber' ' doesn\'t match my actual address version number.' ' Ignoring.') - return if decodeAddress(myAddress)[2] != streamNumber: - logger.warning( + return logger.warning( '(Within the processgetpubkey function) Someone requested' ' one of my pubkeys but the stream number on which we' ' heard this getpubkey object doesn\'t match this' ' address\' stream number. Ignoring.') - return if BMConfigParser().safeGetBoolean(myAddress, 'chan'): - logger.info( + return logger.info( 'Ignoring getpubkey request because it is for one of my' ' chan addresses. The other party should already have' ' the pubkey.') - return lastPubkeySendTime = BMConfigParser().safeGetInt( myAddress, 'lastpubkeysendtime') # If the last time we sent our pubkey was more recent than # 28 days ago... if lastPubkeySendTime > time.time() - 2419200: - logger.info( + return logger.info( 'Found getpubkey-requested-item in my list of EC hashes' ' BUT we already sent it recently. Ignoring request.' ' The lastPubkeySendTime is: %s', lastPubkeySendTime) - return logger.info( 'Found getpubkey-requested-hash in my list of EC hashes.' ' Telling Worker thread to do the POW for a pubkey message' @@ -297,22 +281,19 @@ class objectProcessor(threading.Thread): data[readPosition:readPosition + 10]) readPosition += varintLength if addressVersion == 0: - logger.debug( + return logger.debug( '(Within processpubkey) addressVersion of 0 doesn\'t' ' make sense.') - return if addressVersion > 4 or addressVersion == 1: - logger.info( + return logger.info( 'This version of Bitmessage cannot handle version %s' ' addresses.', addressVersion) - return if addressVersion == 2: # sanity check. This is the minimum possible length. if len(data) < 146: - logger.debug( + return logger.debug( '(within processpubkey) payloadLength less than 146.' ' Sanity check failed.') - return readPosition += 4 publicSigningKey = data[readPosition:readPosition + 64] # Is it possible for a public key to be invalid such that trying to @@ -321,10 +302,9 @@ class objectProcessor(threading.Thread): readPosition += 64 publicEncryptionKey = data[readPosition:readPosition + 64] if len(publicEncryptionKey) < 64: - logger.debug( + return logger.debug( 'publicEncryptionKey length less than 64. Sanity check' ' failed.') - return readPosition += 64 # The data we'll store in the pubkeys table. dataToStore = data[20:readPosition] @@ -373,11 +353,11 @@ class objectProcessor(threading.Thread): readPosition += 64 publicEncryptionKey = '\x04' + data[readPosition:readPosition + 64] readPosition += 64 - _, specifiedNonceTrialsPerByteLength = decodeVarint( - data[readPosition:readPosition + 10]) + specifiedNonceTrialsPerByteLength = decodeVarint( + data[readPosition:readPosition + 10])[1] readPosition += specifiedNonceTrialsPerByteLength - _, specifiedPayloadLengthExtraBytesLength = decodeVarint( - data[readPosition:readPosition + 10]) + specifiedPayloadLengthExtraBytesLength = decodeVarint( + data[readPosition:readPosition + 10])[1] readPosition += specifiedPayloadLengthExtraBytesLength endOfSignedDataPosition = readPosition # The data we'll store in the pubkeys table. @@ -429,19 +409,17 @@ class objectProcessor(threading.Thread): if addressVersion == 4: if len(data) < 350: # sanity check. - logger.debug( + return logger.debug( '(within processpubkey) payloadLength less than 350.' ' Sanity check failed.') - return tag = data[readPosition:readPosition + 32] if tag not in state.neededPubkeys: - logger.info( + return logger.info( 'We don\'t need this v4 pubkey. We didn\'t ask for it.') - return # Let us try to decrypt the pubkey - toAddress, _ = state.neededPubkeys[tag] + toAddress = state.neededPubkeys[tag][0] if protocol.decryptAndCheckPubkeyPayload(data, toAddress) == \ 'successful': # At this point we know that we have been waiting on this @@ -450,11 +428,9 @@ class objectProcessor(threading.Thread): self.possibleNewPubkey(toAddress) # Display timing data - timeRequiredToProcessPubkey = time.time( - ) - pubkeyProcessingStartTime logger.debug( 'Time required to process this pubkey: %s', - timeRequiredToProcessPubkey) + time.time() - pubkeyProcessingStartTime) def processmsg(self, data): """Process a message object""" @@ -466,10 +442,9 @@ class objectProcessor(threading.Thread): msgVersion, msgVersionLength = decodeVarint( data[readPosition:readPosition + 9]) if msgVersion != 1: - logger.info( + return logger.info( 'Cannot understand message versions other than one.' ' Ignoring message.') - return readPosition += msgVersionLength streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = \ @@ -502,11 +477,10 @@ class objectProcessor(threading.Thread): pass if not initialDecryptionSuccessful: # This is not a message bound for me. - logger.info( + return logger.info( 'Length of time program spent failing to decrypt this' ' message: %s seconds.', time.time() - messageProcessingStartTime) - return # This is a message bound for me. # Look up my address based on the RIPE hash. @@ -516,20 +490,17 @@ class objectProcessor(threading.Thread): decodeVarint(decryptedData[readPosition:readPosition + 10]) readPosition += sendersAddressVersionNumberLength if sendersAddressVersionNumber == 0: - logger.info( + return logger.info( 'Cannot understand sendersAddressVersionNumber = 0.' ' Ignoring message.') - return if sendersAddressVersionNumber > 4: - logger.info( + return logger.info( 'Sender\'s address version number %s not yet supported.' ' Ignoring message.', sendersAddressVersionNumber) - return if len(decryptedData) < 170: - logger.info( + return logger.info( 'Length of the unencrypted data is unreasonably short.' ' Sanity check failed. Ignoring message.') - return sendersStreamNumber, sendersStreamNumberLength = decodeVarint( decryptedData[readPosition:readPosition + 10]) if sendersStreamNumber == 0: @@ -558,7 +529,7 @@ class objectProcessor(threading.Thread): # for later use. endOfThePublicKeyPosition = readPosition if toRipe != decryptedData[readPosition:readPosition + 20]: - logger.info( + return logger.info( 'The original sender of this message did not send it to' ' you. Someone is attempting a Surreptitious Forwarding' ' Attack.\nSee: ' @@ -567,7 +538,6 @@ class objectProcessor(threading.Thread): hexlify(toRipe), hexlify(decryptedData[readPosition:readPosition + 20]) ) - return readPosition += 20 messageEncodingType, messageEncodingTypeLength = decodeVarint( decryptedData[readPosition:readPosition + 10]) @@ -595,8 +565,7 @@ class objectProcessor(threading.Thread): if not highlevelcrypto.verify( signedData, signature, hexlify(pubSigningKey)): - logger.debug('ECDSA verify failed') - return + return logger.debug('ECDSA verify failed') logger.debug('ECDSA verify passed') if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -655,10 +624,9 @@ class objectProcessor(threading.Thread): if not protocol.isProofOfWorkSufficient( data, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes): - logger.info( + return logger.info( 'Proof of work in msg is insufficient only because' ' it does not meet our higher requirement.') - return # Gets set to True if the user shouldn't see the message according # to black or white lists. blockMessage = False @@ -681,10 +649,7 @@ class objectProcessor(threading.Thread): 'Message ignored because address not in whitelist.') blockMessage = True - toLabel = BMConfigParser().get(toAddress, 'label') - if toLabel == '': - toLabel = toAddress - + # toLabel = BMConfigParser().safeGet(toAddress, 'label', toAddress) try: decodedMessage = helper_msgcoding.MsgDecode( messageEncodingType, message) @@ -712,23 +677,17 @@ class objectProcessor(threading.Thread): # has arrived. if BMConfigParser().safeGetBoolean( 'bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = BMConfigParser().get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - call([apiNotifyPath, "newMessage"]) + apiNotifyPath = BMConfigParser().safeGet( + 'bitmessagesettings', 'apinotifypath') + if apiNotifyPath: + subprocess.call([apiNotifyPath, "newMessage"]) # Let us now check and see whether our receiving address is # behaving as a mailing list if BMConfigParser().safeGetBoolean(toAddress, 'mailinglist') \ and messageEncodingType != 0: - try: - mailingListName = BMConfigParser().get( - toAddress, 'mailinglistname') - except: - mailingListName = '' + mailingListName = BMConfigParser().safeGet( + toAddress, 'mailinglistname', '') # Let us send out this message as a broadcast subject = self.addMailingListNameToSubject( subject, mailingListName) @@ -763,10 +722,10 @@ class objectProcessor(threading.Thread): # Don't send ACK if invalid, blacklisted senders, invisible # messages, disabled or chan if ( - self.ackDataHasAValidHeader(ackData) and not blockMessage and - messageEncodingType != 0 and - not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack') and - not BMConfigParser().safeGetBoolean(toAddress, 'chan') + self.ackDataHasAValidHeader(ackData) and not blockMessage + and messageEncodingType != 0 + and not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack') + and not BMConfigParser().safeGetBoolean(toAddress, 'chan') ): self._ack_obj.send_data(ackData[24:]) @@ -798,13 +757,12 @@ class objectProcessor(threading.Thread): data[readPosition:readPosition + 9]) readPosition += broadcastVersionLength if broadcastVersion < 4 or broadcastVersion > 5: - logger.info( + return logger.info( 'Cannot decode incoming broadcast versions less than 4' ' or higher than 5. Assuming the sender isn\'t being silly,' ' you should upgrade Bitmessage because this message shall' ' be ignored.' ) - return cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint( data[readPosition:readPosition + 10]) readPosition += cleartextStreamNumberLength @@ -841,11 +799,10 @@ class objectProcessor(threading.Thread): 'cryptorObject.decrypt Exception:', exc_info=True) if not initialDecryptionSuccessful: # This is not a broadcast I am interested in. - logger.debug( + return logger.debug( 'Length of time program spent failing to decrypt this' ' v4 broadcast: %s seconds.', time.time() - messageProcessingStartTime) - return elif broadcastVersion == 5: embeddedTag = data[readPosition:readPosition + 32] readPosition += 32 @@ -860,10 +817,9 @@ class objectProcessor(threading.Thread): decryptedData = cryptorObject.decrypt(data[readPosition:]) logger.debug('EC decryption successful') except Exception: - logger.debug( + return logger.debug( 'Broadcast version %s decryption Unsuccessful.', broadcastVersion) - return # At this point this is a broadcast I have decrypted and am # interested in. readPosition = 0 @@ -871,32 +827,29 @@ class objectProcessor(threading.Thread): decryptedData[readPosition:readPosition + 9]) if broadcastVersion == 4: if sendersAddressVersion < 2 or sendersAddressVersion > 3: - logger.warning( + return logger.warning( 'Cannot decode senderAddressVersion other than 2 or 3.' ' Assuming the sender isn\'t being silly, you should' ' upgrade Bitmessage because this message shall be' ' ignored.' ) - return elif broadcastVersion == 5: if sendersAddressVersion < 4: - logger.info( + return logger.info( 'Cannot decode senderAddressVersion less than 4 for' ' broadcast version number 5. Assuming the sender' ' isn\'t being silly, you should upgrade Bitmessage' ' because this message shall be ignored.' ) - return readPosition += sendersAddressVersionLength sendersStream, sendersStreamLength = decodeVarint( decryptedData[readPosition:readPosition + 9]) if sendersStream != cleartextStreamNumber: - logger.info( + return logger.info( 'The stream number outside of the encryption on which the' ' POW was completed doesn\'t match the stream number' ' inside the encryption. Ignoring broadcast.' ) - return readPosition += sendersStreamLength readPosition += 4 sendersPubSigningKey = '\x04' + \ @@ -926,24 +879,22 @@ class objectProcessor(threading.Thread): if broadcastVersion == 4: if toRipe != calculatedRipe: - logger.info( + return logger.info( 'The encryption key used to encrypt this message' ' doesn\'t match the keys inbedded in the message' ' itself. Ignoring message.' ) - return elif broadcastVersion == 5: calculatedTag = hashlib.sha512(hashlib.sha512( - encodeVarint(sendersAddressVersion) + - encodeVarint(sendersStream) + calculatedRipe + encodeVarint(sendersAddressVersion) + + encodeVarint(sendersStream) + calculatedRipe ).digest()).digest()[32:] if calculatedTag != embeddedTag: - logger.debug( + return logger.debug( 'The tag and encryption key used to encrypt this' ' message doesn\'t match the keys inbedded in the' ' message itself. Ignoring message.' ) - return messageEncodingType, messageEncodingTypeLength = decodeVarint( decryptedData[readPosition:readPosition + 9]) if messageEncodingType == 0: @@ -987,10 +938,6 @@ class objectProcessor(threading.Thread): # and send it. self.possibleNewPubkey(fromAddress) - fromAddress = encodeAddress( - sendersAddressVersion, sendersStream, calculatedRipe) - logger.debug('fromAddress: %s', fromAddress) - try: decodedMessage = helper_msgcoding.MsgDecode( messageEncodingType, message) @@ -1014,13 +961,10 @@ class objectProcessor(threading.Thread): # outside command to let some program know that a new message # has arrived. if BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = BMConfigParser().get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - call([apiNotifyPath, "newBroadcast"]) + apiNotifyPath = BMConfigParser().safeGet( + 'bitmessagesettings', 'apinotifypath') + if apiNotifyPath: + subprocess.call([apiNotifyPath, "newBroadcast"]) # Display timing data logger.info( @@ -1036,7 +980,7 @@ class objectProcessor(threading.Thread): # For address versions <= 3, we wait on a key with the correct # address version, stream number and RIPE hash. - _, addressVersion, streamNumber, ripe = decodeAddress(address) + addressVersion, streamNumber, ripe = decodeAddress(address)[1:] if addressVersion <= 3: if address in state.neededPubkeys: del state.neededPubkeys[address]