diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 9ae2093b..f7875331 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -743,12 +743,12 @@ class objectProcessor(threading.Thread): # Don't send ACK if invalid, blacklisted senders, invisible # messages, disabled or chan - if (self.ackDataHasAValidHeader(ackData) and not blockMessage - and messageEncodingType != 0 and - not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack') - and not BMConfigParser().safeGetBoolean(toAddress, 'chan') - ): - shared.checkAndShareObjectWithPeers(ackData[24:]) + # if (self.ackDataHasAValidHeader(ackData) and not blockMessage + # and messageEncodingType != 0 and + # not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack') + # and not BMConfigParser().safeGetBoolean(toAddress, 'chan') + # ): + # shared.checkAndShareObjectWithPeers(ackData[24:]) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( diff --git a/src/protocol.py b/src/protocol.py index 913c8e5f..6a62b522 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -23,12 +23,10 @@ import traceback import defaults import highlevelcrypto import state -from addresses import calculateInventoryHash, encodeVarint, decodeVarint, decodeAddress, varintDecodeError +from addresses import encodeVarint, decodeVarint, decodeAddress, varintDecodeError from bmconfigparser import BMConfigParser from debug import logger from helper_sql import sqlExecute -from inventory import Inventory -from queues import objectProcessorQueue from version import softwareVersion @@ -437,228 +435,6 @@ def decryptAndCheckPubkeyPayload(data, address): return 'failed' -def checkAndShareObjectWithPeers(data): - """ - This function is called after either receiving an object off of the wire - or after receiving one as ackdata. - Returns the length of time that we should reserve to process this message - if we are receiving it off of the wire. - """ - if len(data) > 2 ** 18: - logger.info('The payload length of this object is too large (%s bytes). Ignoring it.', len(data)) - return 0 - # Let us check to make sure that the proof of work is sufficient. - if not isProofOfWorkSufficient(data): - logger.info('Proof of work is insufficient.') - return 0 - - endOfLifeTime, = unpack('>Q', data[8:16]) - # The TTL may not be larger than 28 days + 3 hours of wiggle room - if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: - logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s', endOfLifeTime) - return 0 - if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much. - logger.info( - 'This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s', - endOfLifeTime) - return 0 - intObjectType, = unpack('>I', data[16:20]) - try: - if intObjectType == 0: - _checkAndShareGetpubkeyWithPeers(data) - return 0.1 - elif intObjectType == 1: - _checkAndSharePubkeyWithPeers(data) - return 0.1 - elif intObjectType == 2: - _checkAndShareMsgWithPeers(data) - return 0.6 - elif intObjectType == 3: - _checkAndShareBroadcastWithPeers(data) - return 0.6 - _checkAndShareUndefinedObjectWithPeers(data) - return 0.6 - except varintDecodeError as err: - logger.debug( - "There was a problem with a varint while checking to see whether it was appropriate to share an object" - " with peers. Some details: %s", err - ) - except Exception: - logger.critical( - 'There was a problem while checking to see whether it was appropriate to share an object with peers.' - ' This is definitely a bug! %s%s' % os.linesep, traceback.format_exc() - ) - return 0 - - -def _checkAndShareUndefinedObjectWithPeers(data): - # pylint: disable=unused-variable - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this undefined object. Ignoring.') - return - objectType, = unpack('>I', data[16:20]) - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - -def _checkAndShareMsgWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( # pylint: disable=unused-variable - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - readPosition += streamNumberLength - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this msg message. Ignoring.') - return - # This msg message is valid. Let's let our peers know about it. - objectType = 2 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's enqueue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareGetpubkeyWithPeers(data): - # pylint: disable=unused-variable - if len(data) < 42: - logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') - return - if len(data) > 200: - logger.info('getpubkey is abnormally long. Sanity check failed. Ignoring object.') - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - requestedAddressVersionNumber, addressVersionLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += addressVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 10]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - readPosition += streamNumberLength - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this getpubkey request. Ignoring it.') - return - - objectType = 0 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - # This getpubkey request is valid. Forward to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndSharePubkeyWithPeers(data): - if len(data) < 146 or len(data) > 440: # sanity check - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - addressVersion, varintLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += varintLength - streamNumber, varintLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += varintLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - if addressVersion >= 4: - tag = data[readPosition:readPosition + 32] - logger.debug('tag in received pubkey is: %s', hexlify(tag)) - else: - tag = '' - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this pubkey. Ignoring it.') - return - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareBroadcastWithPeers(data): - if len(data) < 180: - logger.debug( - 'The payload length of this broadcast packet is unreasonably low. ' - 'Someone is probably trying funny business. Ignoring message.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - broadcastVersion, broadcastVersionLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += broadcastVersionLength - if broadcastVersion >= 2: - streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition + 10]) - readPosition += streamNumberLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - if broadcastVersion >= 3: - tag = data[readPosition:readPosition + 32] - else: - tag = '' - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this broadcast object. Ignoring.') - return - # It is valid. Let's let our peers know about it. - objectType = 3 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def broadcastToSendDataQueues(data): - """ - If you want to command all of the sendDataThreads to do something, like shutdown or send some data, this - function puts your data into the queues for each of the sendDataThreads. The sendDataThreads are - responsible for putting their queue into (and out of) the sendDataQueues list. - """ - for q in state.sendDataQueues: - q.put(data) - - # sslProtocolVersion if sys.version_info >= (2, 7, 13): # this means TLSv1 or higher diff --git a/src/shared.py b/src/shared.py index caf24769..76769a02 100644 --- a/src/shared.py +++ b/src/shared.py @@ -14,18 +14,14 @@ from binascii import hexlify from pyelliptic import arithmetic # Project imports. -import protocol import state import highlevelcrypto from bmconfigparser import BMConfigParser from debug import logger from addresses import ( - decodeAddress, encodeVarint, decodeVarint, varintDecodeError, - calculateInventoryHash + decodeAddress, encodeVarint, decodeVarint, varintDecodeError ) from helper_sql import sqlQuery, sqlExecute -from inventory import Inventory -from queues import objectProcessorQueue verbose = 1 @@ -431,248 +427,6 @@ def decryptAndCheckPubkeyPayload(data, address): return 'failed' -def checkAndShareObjectWithPeers(data): - """ - This function is called after either receiving an object - off of the wire or after receiving one as ackdata. - Returns the length of time that we should reserve to process - this message if we are receiving it off of the wire. - """ - if len(data) > 2 ** 18: - logger.info( - 'The payload length of this object is too large (%i bytes).' - ' Ignoring it.', len(data) - ) - return 0 - # Let us check to make sure that the proof of work is sufficient. - if not protocol.isProofOfWorkSufficient(data): - logger.info('Proof of work is insufficient.') - return 0 - - endOfLifeTime, = unpack('>Q', data[8:16]) - # The TTL may not be larger than 28 days + 3 hours of wiggle room - if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: - logger.info( - 'This object\'s End of Life time is too far in the future.' - ' Ignoring it. Time is %s', endOfLifeTime - ) - return 0 - # The EOL time was more than an hour ago. That's too much. - if endOfLifeTime - int(time.time()) < -3600: - logger.info( - 'This object\'s End of Life time was more than an hour ago.' - ' Ignoring the object. Time is %s' % endOfLifeTime - ) - return 0 - intObjectType, = unpack('>I', data[16:20]) - try: - if intObjectType == 0: - _checkAndShareGetpubkeyWithPeers(data) - return 0.1 - elif intObjectType == 1: - _checkAndSharePubkeyWithPeers(data) - return 0.1 - elif intObjectType == 2: - _checkAndShareMsgWithPeers(data) - return 0.6 - elif intObjectType == 3: - _checkAndShareBroadcastWithPeers(data) - return 0.6 - else: - _checkAndShareUndefinedObjectWithPeers(data) - return 0.6 - except varintDecodeError as e: - logger.debug( - 'There was a problem with a varint while checking' - ' to see whether it was appropriate to share an object' - ' with peers. Some details: %s' % e) - except Exception: - logger.critical( - 'There was a problem while checking to see whether it was' - ' appropriate to share an object with peers. This is' - ' definitely a bug! \n%s' % traceback.format_exc()) - return 0 - - -def _checkAndShareUndefinedObjectWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this undefined object. Ignoring.') - return - objectType, = unpack('>I', data[16:20]) - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - -def _checkAndShareMsgWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = \ - decodeVarint(data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - readPosition += streamNumberLength - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this msg message. Ignoring.') - return - # This msg message is valid. Let's let our peers know about it. - objectType = 2 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's enqueue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareGetpubkeyWithPeers(data): - if len(data) < 42: - logger.info( - 'getpubkey message doesn\'t contain enough data. Ignoring.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - requestedAddressVersionNumber, addressVersionLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += addressVersionLength - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - readPosition += streamNumberLength - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this getpubkey request. Ignoring it.') - return - - objectType = 0 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - # This getpubkey request is valid. Forward to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndSharePubkeyWithPeers(data): - if len(data) < 146 or len(data) > 440: # sanity check - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - addressVersion, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - streamNumber, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - if addressVersion >= 4: - tag = data[readPosition:readPosition + 32] - logger.debug('tag in received pubkey is: %s', hexlify(tag)) - else: - tag = '' - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this pubkey. Ignoring it.') - return - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareBroadcastWithPeers(data): - if len(data) < 180: - logger.debug( - 'The payload length of this broadcast packet is unreasonably low.' - ' Someone is probably trying funny business. Ignoring message.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - broadcastVersion, broadcastVersionLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += broadcastVersionLength - if broadcastVersion >= 2: - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += streamNumberLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - if broadcastVersion >= 3: - tag = data[readPosition:readPosition+32] - else: - tag = '' - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this broadcast object. Ignoring.') - return - # It is valid. Let's let our peers know about it. - objectType = 3 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - def openKeysFile(): if 'linux' in sys.platform: subprocess.call(["xdg-open", state.appdata + 'keys.dat']) diff --git a/src/state.py b/src/state.py index f052d7bb..e8d6a27d 100644 --- a/src/state.py +++ b/src/state.py @@ -2,7 +2,6 @@ import collections neededPubkeys = {} streamsInWhichIAmParticipating = [] -sendDataQueues = [] # each sendData thread puts its queue in this list. # For UPnP extPort = None