From 00c4ee881f9c1e8c256c5ef23fe5ad7286097827 Mon Sep 17 00:00:00 2001 From: Biryuzovye Kleshni Date: Sun, 29 Jul 2018 09:44:15 +0000 Subject: [PATCH] Added API for raw objects --- src/api.py | 72 ++++++++++ src/bitmessagemain.py | 1 - src/bitmessageqt/__init__.py | 9 +- src/class_objectProcessor.py | 4 +- src/protocol.py | 270 +++++++++-------------------------- src/shared.py | 244 ------------------------------- src/singleworker.py | 64 ++------- src/state.py | 1 - 8 files changed, 157 insertions(+), 508 deletions(-) diff --git a/src/api.py b/src/api.py index bc6514e7..04699bde 100644 --- a/src/api.py +++ b/src/api.py @@ -17,6 +17,7 @@ from binascii import hexlify, unhexlify from random import randint from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer from struct import pack +import errno import shared from addresses import ( @@ -32,6 +33,7 @@ import state import queues import shutdown import network.stats +import protocol # Classes from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute, sqlStoredProcedure @@ -1165,6 +1167,73 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): data += ']}' return data + def HandleDisseminateRawObject(self, arguments): + if len(arguments) != 1: + raise APIError(0, "1 argument needed") + + payload = self._decode(arguments[0], "hex") + + inventoryHash = protocol.checkAndShareObjectWithPeers(payload) + + if inventoryHash is None: + raise APIError(30, "Invalid object or insufficient POW") + else: + return hexlify(inventoryHash) + + def HandleGetRawObject(self, arguments): + if len(arguments) != 1: + raise APIError(0, "1 argument needed") + + inventoryHash, = arguments + + if len(inventoryHash) != 64: + raise APIError(19, "The length of hash should be 32 bytes (encoded in hex thus 64 characters)") + + inventoryHash = self._decode(inventoryHash, "hex") + + try: + inventoryItem = Inventory()[inventoryHash] + except KeyError: + raise APIError(31, "Object not found") + + return json.dumps({ + "hash": hexlify(inventoryHash), + "expiryTime": inventoryItem.expires, + "objectType": inventoryItem.type, + "stream": inventoryItem.stream, + "tag": hexlify(inventoryItem.tag), + "payload": hexlify(inventoryItem.payload) + }, indent = 4, separators = (",", ": ")) + + def HandleListRawObjects(self, arguments): + if len(arguments) != 3: + raise APIError(0, "3 arguments needed") + + objectType, stream, tag = arguments + + if tag is not None: + tag = buffer(self._decode(tag, "hex")) + + result = [] + + inventory = Inventory() + + for i in inventory: + inventoryItem = inventory[str(i)] + + if objectType is not None and inventoryItem.type != objectType: + continue + + if stream is not None and inventoryItem.stream != stream: + continue + + if tag is not None and inventoryItem.tag != tag: + continue + + result.append(hexlify(i)) + + return json.dumps(result, indent = 4, separators = (",", ": ")) + def HandleClientStatus(self, params): if len(network.stats.connectedHostsList()) == 0: networkStatus = 'notConnected' @@ -1272,6 +1341,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): HandleGetMessageDataByDestinationHash handlers['getMessageDataByDestinationTag'] = \ HandleGetMessageDataByDestinationHash + handlers["disseminateRawObject"] = HandleDisseminateRawObject + handlers["getRawObject"] = HandleGetRawObject + handlers["listRawObjects"] = HandleListRawObjects handlers['clientStatus'] = HandleClientStatus handlers['decodeAddress'] = HandleDecodeAddress handlers['deleteAndVacuum'] = HandleDeleteAndVacuum diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index d8449a0a..86611bf7 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -410,7 +410,6 @@ if __name__ == "__main__": import signal # The next 3 are used for the API from singleinstance import singleinstance - import errno import socket import ctypes from struct import pack diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 250ed47f..e647f20f 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -1884,9 +1884,9 @@ class MyForm(settingsmixin.SMainWindow): _translate("MainWindow", "%1 kiH / s").arg("{:.1f}".format(status.speed / 1024)) ) - self.ui.workProverSpeed.setToolTip("Difficulty: {}, 80 % completion time: {:.1f} s".format( + self.ui.workProverSpeed.setToolTip("Difficulty: {}, 95 % completion time: {:.1f} s".format( status.difficulty, - workprover.utils.estimateMaximumIterationsCount(status.difficulty, .8) / status.speed + workprover.utils.estimateMaximumIterationsCount(status.difficulty, .95) / status.speed )) def rerenderMessagelistFromLabels(self): @@ -4597,11 +4597,14 @@ class settingsDialog(QtGui.QDialog): self.ui.spinBoxForkingSolverParallelism.setValue(forkingSolverParallelism) self.ui.spinBoxFastSolverParallelism.setValue(fastSolverParallelism) - vendors = set(singleworker.workProver.availableSolvers["gpu"].vendors) + vendors = set() if GPUVendor is not None: vendors.add(GPUVendor) + if "gpu" in singleworker.workProver.availableSolvers: + vendors |= set(singleworker.workProver.availableSolvers["gpu"].vendors) + self.ui.comboBoxGPUVendor.clear() self.ui.comboBoxGPUVendor.addItems(list(vendors)) self.ui.comboBoxGPUVendor.setCurrentIndex(0) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 8702ce0b..7af94f81 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -747,7 +747,7 @@ class objectProcessor(threading.Thread): not BMConfigParser().safeGetBoolean(toAddress, 'dontsendack') and not BMConfigParser().safeGetBoolean(toAddress, 'chan') ): - shared.checkAndShareObjectWithPeers(ackData[24:]) + protocol.checkAndShareObjectWithPeers(ackData[24:]) # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( @@ -1075,7 +1075,7 @@ class objectProcessor(threading.Thread): # The largest message should be either an inv or a getdata # message at 1.6 MB in size. # That doesn't mean that the object may be that big. The - # shared.checkAndShareObjectWithPeers function will verify + # protocol.checkAndShareObjectWithPeers function will verify # that it is no larger than 2^18 bytes. return False # test the checksum in the message. diff --git a/src/protocol.py b/src/protocol.py index 865c3f72..a43cf6a9 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -27,9 +27,9 @@ from addresses import calculateInventoryHash, encodeVarint, decodeVarint, decode from bmconfigparser import BMConfigParser from debug import logger from helper_sql import sqlExecute -from inventory import Inventory -from queues import objectProcessorQueue from version import softwareVersion +import inventory +import queues # Service flags @@ -400,227 +400,89 @@ def decryptAndCheckV4Pubkey(payload, address, cryptor): return result -def checkAndShareObjectWithPeers(data): - """ - This function is called after either receiving an object off of the wire - or after receiving one as ackdata. - Returns the length of time that we should reserve to process this message - if we are receiving it off of the wire. - """ - if len(data) > 2 ** 18: - logger.info('The payload length of this object is too large (%s bytes). Ignoring it.', len(data)) - return 0 - # Let us check to make sure that the proof of work is sufficient. - if not isProofOfWorkSufficient(data): - logger.info('Proof of work is insufficient.') - return 0 +def checkAndShareObjectWithPeers(payload): + if len(payload) > 2 ** 18: + logger.info("The payload length of this object is too large (%i bytes)", len(payload)) + + return None + + if not isProofOfWorkSufficient(payload): + logger.info("Proof of work is insufficient") + + return None + + readPosition = 8 - endOfLifeTime, = unpack('>Q', data[8:16]) - # The TTL may not be larger than 28 days + 3 hours of wiggle room - if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: - logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s', endOfLifeTime) - return 0 - if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much. - logger.info( - 'This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s', - endOfLifeTime) - return 0 - intObjectType, = unpack('>I', data[16:20]) try: - if intObjectType == 0: - _checkAndShareGetpubkeyWithPeers(data) - return 0.1 - elif intObjectType == 1: - _checkAndSharePubkeyWithPeers(data) - return 0.1 - elif intObjectType == 2: - _checkAndShareMsgWithPeers(data) - return 0.6 - elif intObjectType == 3: - _checkAndShareBroadcastWithPeers(data) - return 0.6 - _checkAndShareUndefinedObjectWithPeers(data) - return 0.6 - except varintDecodeError as err: - logger.debug( - "There was a problem with a varint while checking to see whether it was appropriate to share an object" - " with peers. Some details: %s", err - ) - except Exception: - logger.critical( - 'There was a problem while checking to see whether it was appropriate to share an object with peers.' - ' This is definitely a bug! %s%s' % os.linesep, traceback.format_exc() - ) - return 0 + expiryTime, objectType = unpack(">QI", payload[readPosition: readPosition + 12]) + readPosition += 12 + version, readLength = decodeVarint(payload[readPosition: readPosition + 8]) + readPosition += readLength -def _checkAndShareUndefinedObjectWithPeers(data): - # pylint: disable=unused-variable - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return + stream, readLength = decodeVarint(payload[readPosition: readPosition + 8]) + readPosition += readLength + except: + logger.info("Error parsing object header") - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this undefined object. Ignoring.') - return - objectType, = unpack('>I', data[16:20]) - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + return None + tag = payload[readPosition: readPosition + 32] -def _checkAndShareMsgWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( # pylint: disable=unused-variable - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - readPosition += streamNumberLength - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this msg message. Ignoring.') - return - # This msg message is valid. Let's let our peers know about it. - objectType = 2 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + TTL = expiryTime - int(time.time()) - # Now let's enqueue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) + # TTL may not be lesser than -1 hour or larger than 28 days + 3 hours of wiggle room + if TTL < -3600: + logger.info("This object\'s expiry time was more than an hour ago: %s", expiryTime) -def _checkAndShareGetpubkeyWithPeers(data): - # pylint: disable=unused-variable - if len(data) < 42: - logger.info('getpubkey message doesn\'t contain enough data. Ignoring.') - return - if len(data) > 200: - logger.info('getpubkey is abnormally long. Sanity check failed. Ignoring object.') - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - requestedAddressVersionNumber, addressVersionLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += addressVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 10]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - readPosition += streamNumberLength + return None + elif TTL > 28 * 24 * 60 * 60 + 10800: + logger.info("This object\'s expiry time is too far in the future: %s", expiryTime) - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this getpubkey request. Ignoring it.') - return + return None - objectType = 0 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - # This getpubkey request is valid. Forward to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + if stream not in state.streamsInWhichIAmParticipating: + logger.info("The stream number %i isn\'t one we are interested in", stream) - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) + return None + if objectType == 0: + if len(payload) < 42: + logger.info("Too short \"getpubkey\" message") -def _checkAndSharePubkeyWithPeers(data): - if len(data) < 146 or len(data) > 440: # sanity check - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - addressVersion, varintLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += varintLength - streamNumber, varintLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += varintLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - if addressVersion >= 4: - tag = data[readPosition:readPosition + 32] - logger.debug('tag in received pubkey is: %s', hexlify(tag)) + return None + elif objectType == 1: + if len(payload) < 146 or len(payload) > 440: + logger.info("Invalid length \"pubkey\"") + + return None + elif objectType == 3: + if len(payload) < 180: + logger.info("Too short \"broadcast\" message") + + return None + + if version == 1: + logger.info("Obsolete \"broadcast\" message version") + + return None + + inventoryHash = calculateDoubleHash(payload)[: 32] + + if inventoryHash in inventory.Inventory(): + logger.info("We already have this object") + + return inventoryHash else: - tag = '' + inventory.Inventory()[inventoryHash] = objectType, stream, payload, expiryTime, buffer(tag) + queues.invQueue.put((stream, inventoryHash)) - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this pubkey. Ignoring it.') - return - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + logger.info("Broadcasting inventory object with hash: %s", hexlify(inventoryHash)) - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareBroadcastWithPeers(data): - if len(data) < 180: - logger.debug( - 'The payload length of this broadcast packet is unreasonably low. ' - 'Someone is probably trying funny business. Ignoring message.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - broadcastVersion, broadcastVersionLength = decodeVarint( - data[readPosition:readPosition + 10]) - readPosition += broadcastVersionLength - if broadcastVersion >= 2: - streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition + 10]) - readPosition += streamNumberLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug('The streamNumber %s isn\'t one we are interested in.', streamNumber) - return - if broadcastVersion >= 3: - tag = data[readPosition:readPosition + 32] - else: - tag = '' - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this broadcast object. Ignoring.') - return - # It is valid. Let's let our peers know about it. - objectType = 3 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def broadcastToSendDataQueues(data): - """ - If you want to command all of the sendDataThreads to do something, like shutdown or send some data, this - function puts your data into the queues for each of the sendDataThreads. The sendDataThreads are - responsible for putting their queue into (and out of) the sendDataQueues list. - """ - for q in state.sendDataQueues: - q.put(data) + queues.objectProcessorQueue.put((objectType, payload)) + return inventoryHash # sslProtocolVersion if sys.version_info >= (2, 7, 13): diff --git a/src/shared.py b/src/shared.py index 6fb40b79..438c058a 100644 --- a/src/shared.py +++ b/src/shared.py @@ -24,8 +24,6 @@ from addresses import ( calculateInventoryHash ) from helper_sql import sqlQuery, sqlExecute -from inventory import Inventory -from queues import objectProcessorQueue verbose = 1 @@ -286,248 +284,6 @@ def fixSensitiveFilePermissions(filename, hasEnabledKeys): logger.exception('Keyfile permissions could not be fixed.') raise -def checkAndShareObjectWithPeers(data): - """ - This function is called after either receiving an object - off of the wire or after receiving one as ackdata. - Returns the length of time that we should reserve to process - this message if we are receiving it off of the wire. - """ - if len(data) > 2 ** 18: - logger.info( - 'The payload length of this object is too large (%i bytes).' - ' Ignoring it.', len(data) - ) - return 0 - # Let us check to make sure that the proof of work is sufficient. - if not protocol.isProofOfWorkSufficient(data): - logger.info('Proof of work is insufficient.') - return 0 - - endOfLifeTime, = unpack('>Q', data[8:16]) - # The TTL may not be larger than 28 days + 3 hours of wiggle room - if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: - logger.info( - 'This object\'s End of Life time is too far in the future.' - ' Ignoring it. Time is %s', endOfLifeTime - ) - return 0 - # The EOL time was more than an hour ago. That's too much. - if endOfLifeTime - int(time.time()) < -3600: - logger.info( - 'This object\'s End of Life time was more than an hour ago.' - ' Ignoring the object. Time is %s' % endOfLifeTime - ) - return 0 - intObjectType, = unpack('>I', data[16:20]) - try: - if intObjectType == 0: - _checkAndShareGetpubkeyWithPeers(data) - return 0.1 - elif intObjectType == 1: - _checkAndSharePubkeyWithPeers(data) - return 0.1 - elif intObjectType == 2: - _checkAndShareMsgWithPeers(data) - return 0.6 - elif intObjectType == 3: - _checkAndShareBroadcastWithPeers(data) - return 0.6 - else: - _checkAndShareUndefinedObjectWithPeers(data) - return 0.6 - except varintDecodeError as e: - logger.debug( - 'There was a problem with a varint while checking' - ' to see whether it was appropriate to share an object' - ' with peers. Some details: %s' % e) - except Exception: - logger.critical( - 'There was a problem while checking to see whether it was' - ' appropriate to share an object with peers. This is' - ' definitely a bug! \n%s' % traceback.format_exc()) - return 0 - - -def _checkAndShareUndefinedObjectWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = decodeVarint( - data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = decodeVarint( - data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this undefined object. Ignoring.') - return - objectType, = unpack('>I', data[16:20]) - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - -def _checkAndShareMsgWithPeers(data): - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass nonce, time, and object type - objectVersion, objectVersionLength = \ - decodeVarint(data[readPosition:readPosition + 9]) - readPosition += objectVersionLength - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 9]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - readPosition += streamNumberLength - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this msg message. Ignoring.') - return - # This msg message is valid. Let's let our peers know about it. - objectType = 2 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's enqueue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareGetpubkeyWithPeers(data): - if len(data) < 42: - logger.info( - 'getpubkey message doesn\'t contain enough data. Ignoring.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - requestedAddressVersionNumber, addressVersionLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += addressVersionLength - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - readPosition += streamNumberLength - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this getpubkey request. Ignoring it.') - return - - objectType = 0 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, '') - # This getpubkey request is valid. Forward to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndSharePubkeyWithPeers(data): - if len(data) < 146 or len(data) > 440: # sanity check - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - addressVersion, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - streamNumber, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - if addressVersion >= 4: - tag = data[readPosition:readPosition + 32] - logger.debug('tag in received pubkey is: %s', hexlify(tag)) - else: - tag = '' - - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug('We have already received this pubkey. Ignoring it.') - return - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - -def _checkAndShareBroadcastWithPeers(data): - if len(data) < 180: - logger.debug( - 'The payload length of this broadcast packet is unreasonably low.' - ' Someone is probably trying funny business. Ignoring message.') - return - embeddedTime, = unpack('>Q', data[8:16]) - readPosition = 20 # bypass the nonce, time, and object type - broadcastVersion, broadcastVersionLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += broadcastVersionLength - if broadcastVersion >= 2: - streamNumber, streamNumberLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += streamNumberLength - if streamNumber not in state.streamsInWhichIAmParticipating: - logger.debug( - 'The streamNumber %i isn\'t one we are interested in.', - streamNumber - ) - return - if broadcastVersion >= 3: - tag = data[readPosition:readPosition+32] - else: - tag = '' - inventoryHash = calculateInventoryHash(data) - if inventoryHash in Inventory(): - logger.debug( - 'We have already received this broadcast object. Ignoring.') - return - # It is valid. Let's let our peers know about it. - objectType = 3 - Inventory()[inventoryHash] = ( - objectType, streamNumber, data, embeddedTime, tag) - # This object is valid. Forward it to peers. - logger.debug('advertising inv with hash: %s', hexlify(inventoryHash)) - protocol.broadcastToSendDataQueues( - (streamNumber, 'advertiseobject', inventoryHash)) - - # Now let's queue it to be processed ourselves. - objectProcessorQueue.put((objectType, data)) - - def openKeysFile(): if 'linux' in sys.platform: subprocess.call(["xdg-open", state.appdata + 'keys.dat']) diff --git a/src/singleworker.py b/src/singleworker.py index 008bafa6..35a1adf1 100644 --- a/src/singleworker.py +++ b/src/singleworker.py @@ -210,17 +210,6 @@ def getDestinationAddressProperties(address): def randomizeTTL(TTL): return TTL + helper_random.randomrandrange(-300, 300) -def disseminateObject(nonce, expiryTime, headlessPayload, objectType, stream, tag): - payload = nonce + struct.pack(">Q", expiryTime) + headlessPayload - inventoryHash = protocol.calculateDoubleHash(payload)[: 32] - - inventory.Inventory()[inventoryHash] = objectType, stream, payload, expiryTime, buffer(tag) - queues.invQueue.put((stream, inventoryHash)) - - debug.logger.info("Broadcasting inventory object with hash: %s", binascii.hexlify(inventoryHash)) - - return inventoryHash, payload - workProver = workprover.WorkProver( os.path.join(paths.codePath(), "workprover"), helper_random.randomBytes(32), @@ -370,7 +359,7 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): debug.logger.info("Found proof of work %s", ID) if ID in self.startedWorks: - self.startedWorks[ID](nonce, expiryTime) + self.startedWorks[ID](nonce + struct.pack(">Q", expiryTime)) del self.startedWorks[ID] @@ -412,8 +401,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): headlessPayload += addresses.encodeVarint(addressProperties.version) headlessPayload += addresses.encodeVarint(addressProperties.stream) - inventoryTagPosition = len(headlessPayload) - headlessPayload += tag if addressProperties.version == 4: @@ -449,10 +436,8 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): headlessPayload += addresses.encodeVarint(len(signature)) headlessPayload += signature - def workDone(nonce, expiryTime): - inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32] - - disseminateObject(nonce, expiryTime, headlessPayload, 1, addressProperties.stream, inventoryTag) + def workDone(head): + protocol.checkAndShareObjectWithPeers(head + headlessPayload) # TODO: not atomic with the addition to the inventory, the "lastpubkeysendtime" property should be removed # Instead check if the pubkey is present in the inventory @@ -528,8 +513,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): headlessPayload += addresses.encodeVarint(addressProperties.stream) - inventoryTagPosition = len(headlessPayload) - headlessPayload += tag plaintext = addresses.encodeVarint(addressProperties.version) @@ -566,15 +549,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): return - def workDone(nonce, expiryTime): - inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32] - + def workDone(head): # TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction - inventoryHash, payload = disseminateObject( - nonce, expiryTime, headlessPayload, - 3, addressProperties.stream, inventoryTag - ) + inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload) helper_sql.sqlExecute(""" UPDATE "sent" SET "msgid" = ?, "status" = 'broadcastsent', "lastactiontime" = ? @@ -587,15 +565,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp()) ))) - # Add to own inbox - - if addressProperties.version == 4: - if tag in shared.MyECSubscriptionCryptorObjects: - queues.objectProcessorQueue.put((3, payload)) - else: - if addressProperties.ripe in shared.MyECSubscriptionCryptorObjects: - queues.objectProcessorQueue.put((3, payload)) - helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'doingbroadcastpow' WHERE "ackdata" == ?;""", ackData) queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( @@ -679,10 +648,8 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): TTL = randomizeTTL(TTL) expiryTime = int(time.time() + TTL) - def workDone(nonce, expiryTime): - payload = nonce + struct.pack(">Q", expiryTime) + ackData - - callback(protocol.CreatePacket("object", payload)) + def workDone(head): + callback(protocol.CreatePacket("object", head + ackData)) self.startWork( ID, ackData, TTL, expiryTime, @@ -834,7 +801,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): return headlessPayload += ciphertext - inventoryTag = ciphertext[: 32] if len(headlessPayload) > 2 ** 18 - (8 + 8): # 256 kiB debug.logger.critical( @@ -844,16 +810,13 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): return - def workDone(nonce, expiryTime): + def workDone(head): if ackMessage is not None: state.watchedAckData.add(ackData) #TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction - inventoryHash, payload = disseminateObject( - nonce, expiryTime, headlessPayload, - 2, destinationProperties.stream, inventoryTag - ) + inventoryHash = protocol.checkAndShareObjectWithPeers(head + headlessPayload) if ackMessage is None: newStatus = "msgsentnoackexpected" @@ -868,11 +831,6 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): WHERE "status" == 'doingmsgpow' AND "ackdata" == ?; """, inventoryHash, newStatus, retryNumber + 1, sleepTill, int(time.time()), ackData) - # Add to own inbox - - if destinationProperties.own: - queues.objectProcessorQueue.put((2, payload)) - if ackMessage is None: queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( "msgsentnoackexpected", @@ -1061,10 +1019,10 @@ class singleWorker(threading.Thread, helper_threading.StoppableThread): headlessPayload += tag - def workDone(nonce, expiryTime): + def workDone(head): # TODO: adding to the inventory and setting the sent status should be within a single SQL transaction - disseminateObject(nonce, expiryTime, headlessPayload, 0, stream, tag) + protocol.checkAndShareObjectWithPeers(head + headlessPayload) sleepTill = int(time.time() + TTL * 1.1) diff --git a/src/state.py b/src/state.py index 26eae25b..450af6e7 100644 --- a/src/state.py +++ b/src/state.py @@ -7,7 +7,6 @@ neededPubkeys = {} watchedAckData = set() streamsInWhichIAmParticipating = [] -sendDataQueues = [] # each sendData thread puts its queue in this list. # For UPnP extPort = None