Don't use network for sending ACK #2205
24
src/api.py
24
src/api.py
|
@ -66,7 +66,7 @@ import socket
|
||||||
import subprocess # nosec B404
|
import subprocess # nosec B404
|
||||||
import time
|
import time
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
from struct import pack, unpack
|
from struct import pack
|
||||||
|
|
||||||
import six
|
import six
|
||||||
from six.moves import configparser, http_client, xmlrpc_server
|
from six.moves import configparser, http_client, xmlrpc_server
|
||||||
|
@ -74,6 +74,7 @@ from six.moves import configparser, http_client, xmlrpc_server
|
||||||
import defaults
|
import defaults
|
||||||
import helper_inbox
|
import helper_inbox
|
||||||
import helper_sent
|
import helper_sent
|
||||||
|
import protocol
|
||||||
import proofofwork
|
import proofofwork
|
||||||
import queues
|
import queues
|
||||||
import shared
|
import shared
|
||||||
|
@ -1284,7 +1285,7 @@ class BMRPCDispatcher(object):
|
||||||
return {'subscriptions': data}
|
return {'subscriptions': data}
|
||||||
|
|
||||||
@command('disseminatePreEncryptedMsg')
|
@command('disseminatePreEncryptedMsg')
|
||||||
def HandleDisseminatePreEncryptedMsg( # pylint: disable=too-many-locals
|
def HandleDisseminatePreEncryptedMsg(
|
||||||
self, encryptedPayload, requiredAverageProofOfWorkNonceTrialsPerByte,
|
self, encryptedPayload, requiredAverageProofOfWorkNonceTrialsPerByte,
|
||||||
requiredPayloadLengthExtraBytes):
|
requiredPayloadLengthExtraBytes):
|
||||||
"""Handle a request to disseminate an encrypted message"""
|
"""Handle a request to disseminate an encrypted message"""
|
||||||
|
@ -1294,9 +1295,12 @@ class BMRPCDispatcher(object):
|
||||||
# to be done. PyBitmessage accepts this msg object and sends it out
|
# to be done. PyBitmessage accepts this msg object and sends it out
|
||||||
# to the rest of the Bitmessage network as if it had generated
|
# to the rest of the Bitmessage network as if it had generated
|
||||||
# the message itself. Please do not yet add this to the api doc.
|
# the message itself. Please do not yet add this to the api doc.
|
||||||
encryptedPayload = self._decode(encryptedPayload, "hex")
|
encryptedPayload = b'\x00' * 8 + self._decode(encryptedPayload, "hex")
|
||||||
expiresTime = unpack('>Q', encryptedPayload[0:8])[0]
|
# compatibility stub ^, since disseminatePreEncryptedMsg
|
||||||
objectType = unpack('>I', encryptedPayload[8:12])[0]
|
# still expects the encryptedPayload without a nonce
|
||||||
|
objectType, toStreamNumber, expiresTime = \
|
||||||
|
protocol.decodeObjectParameters(encryptedPayload)
|
||||||
|
encryptedPayload = encryptedPayload[8:]
|
||||||
TTL = expiresTime - time.time() + 300 # a bit of extra padding
|
TTL = expiresTime - time.time() + 300 # a bit of extra padding
|
||||||
# Let us do the POW and attach it to the front
|
# Let us do the POW and attach it to the front
|
||||||
target = 2**64 / (
|
target = 2**64 / (
|
||||||
|
@ -1329,22 +1333,16 @@ class BMRPCDispatcher(object):
|
||||||
nonce / (time.time() - powStartTime)
|
nonce / (time.time() - powStartTime)
|
||||||
)
|
)
|
||||||
encryptedPayload = pack('>Q', nonce) + encryptedPayload
|
encryptedPayload = pack('>Q', nonce) + encryptedPayload
|
||||||
parserPos = 20
|
|
||||||
_, objectVersionLength = decodeVarint(
|
|
||||||
encryptedPayload[parserPos:parserPos + 10])
|
|
||||||
parserPos += objectVersionLength
|
|
||||||
toStreamNumber, _ = decodeVarint(
|
|
||||||
encryptedPayload[parserPos:parserPos + 10])
|
|
||||||
inventoryHash = calculateInventoryHash(encryptedPayload)
|
inventoryHash = calculateInventoryHash(encryptedPayload)
|
||||||
Inventory()[inventoryHash] = (
|
Inventory()[inventoryHash] = (
|
||||||
objectType, toStreamNumber, encryptedPayload,
|
objectType, toStreamNumber, encryptedPayload,
|
||||||
expiresTime, ''
|
expiresTime, b''
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
'Broadcasting inv for msg(API disseminatePreEncryptedMsg'
|
||||||
' command): %s', hexlify(inventoryHash))
|
' command): %s', hexlify(inventoryHash))
|
||||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
queues.invQueue.put((toStreamNumber, inventoryHash))
|
||||||
return hexlify(inventoryHash)
|
return hexlify(inventoryHash).decode()
|
||||||
|
|
||||||
@command('trashSentMessageByAckData')
|
@command('trashSentMessageByAckData')
|
||||||
def HandleTrashSentMessageByAckDAta(self, ackdata):
|
def HandleTrashSentMessageByAckDAta(self, ackdata):
|
||||||
|
|
|
@ -31,7 +31,8 @@ from bmconfigparser import config
|
||||||
from fallback import RIPEMD160Hash
|
from fallback import RIPEMD160Hash
|
||||||
from helper_sql import (
|
from helper_sql import (
|
||||||
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
|
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
|
||||||
from network import bmproto, knownnodes
|
from inventory import Inventory
|
||||||
|
from network import knownnodes
|
||||||
from network.node import Peer
|
from network.node import Peer
|
||||||
from tr import _translate
|
from tr import _translate
|
||||||
|
|
||||||
|
@ -64,7 +65,6 @@ class objectProcessor(threading.Thread):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'Loaded %s objects from disk into the objectProcessorQueue.',
|
'Loaded %s objects from disk into the objectProcessorQueue.',
|
||||||
len(queryreturn))
|
len(queryreturn))
|
||||||
self._ack_obj = bmproto.BMStringParser()
|
|
||||||
self.successfullyDecryptMessageTimings = []
|
self.successfullyDecryptMessageTimings = []
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -733,7 +733,13 @@ class objectProcessor(threading.Thread):
|
||||||
and not config.safeGetBoolean(toAddress, 'dontsendack')
|
and not config.safeGetBoolean(toAddress, 'dontsendack')
|
||||||
and not config.safeGetBoolean(toAddress, 'chan')
|
and not config.safeGetBoolean(toAddress, 'chan')
|
||||||
):
|
):
|
||||||
self._ack_obj.send_data(ackData[24:])
|
ackPayload = ackData[24:]
|
||||||
|
objectType, toStreamNumber, expiresTime = \
|
||||||
|
protocol.decodeObjectParameters(ackPayload)
|
||||||
|
inventoryHash = calculateInventoryHash(ackPayload)
|
||||||
|
Inventory()[inventoryHash] = (
|
||||||
|
objectType, toStreamNumber, ackPayload, expiresTime, b'')
|
||||||
|
queues.invQueue.put((toStreamNumber, inventoryHash))
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
timeRequiredToAttemptToDecryptMessage = time.time(
|
timeRequiredToAttemptToDecryptMessage = time.time(
|
||||||
|
|
|
@ -45,3 +45,6 @@ class Inventory():
|
||||||
# hint for pylint: this is dictionary like object
|
# hint for pylint: this is dictionary like object
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self._realInventory[key]
|
return self._realInventory[key]
|
||||||
|
|
||||||
|
def __setitem__(self, key, value):
|
||||||
|
self._realInventory[key] = value
|
||||||
|
|
|
@ -9,7 +9,6 @@ import re
|
||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
from binascii import hexlify
|
|
||||||
|
|
||||||
# magic imports!
|
# magic imports!
|
||||||
import addresses
|
import addresses
|
||||||
|
@ -678,32 +677,3 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
logger.debug('Disconnected socket closing')
|
logger.debug('Disconnected socket closing')
|
||||||
AdvancedDispatcher.handle_close(self)
|
AdvancedDispatcher.handle_close(self)
|
||||||
|
|
||||||
|
|
||||||
class BMStringParser(BMProto):
|
|
||||||
"""
|
|
||||||
A special case of BMProto used by objectProcessor to send ACK
|
|
||||||
"""
|
|
||||||
def __init__(self):
|
|
||||||
super(BMStringParser, self).__init__()
|
|
||||||
self.destination = Peer('127.0.0.1', 8444)
|
|
||||||
self.payload = None
|
|
||||||
ObjectTracker.__init__(self)
|
|
||||||
|
|
||||||
def send_data(self, data):
|
|
||||||
"""Send object given by the data string"""
|
|
||||||
# This class is introduced specially for ACK sending, please
|
|
||||||
# change log strings if you are going to use it for something else
|
|
||||||
self.bm_proto_reset()
|
|
||||||
self.payload = data
|
|
||||||
try:
|
|
||||||
self.bm_command_object()
|
|
||||||
except BMObjectAlreadyHaveError:
|
|
||||||
pass # maybe the same msg received on different nodes
|
|
||||||
except BMObjectExpiredError:
|
|
||||||
logger.debug(
|
|
||||||
'Sending ACK failure (expired): %s', hexlify(data))
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(
|
|
||||||
'Exception of type %s while sending ACK',
|
|
||||||
type(e), exc_info=True)
|
|
||||||
|
|
|
@ -436,6 +436,17 @@ def assembleErrorMessage(fatal=0, banTime=0, inventoryVector='', errorText=''):
|
||||||
# Packet decoding
|
# Packet decoding
|
||||||
|
|
||||||
|
|
||||||
|
def decodeObjectParameters(data):
|
||||||
|
"""Decode the parameters of a raw object needed to put it in inventory"""
|
||||||
|
# BMProto.decode_payload_content("QQIvv")
|
||||||
|
expiresTime = unpack('>Q', data[8:16])[0]
|
||||||
|
objectType = unpack('>I', data[16:20])[0]
|
||||||
|
parserPos = 20 + decodeVarint(data[20:30])[1]
|
||||||
|
toStreamNumber = decodeVarint(data[parserPos:parserPos + 10])[0]
|
||||||
|
|
||||||
|
return objectType, toStreamNumber, expiresTime
|
||||||
|
|
||||||
|
|
||||||
def decryptAndCheckPubkeyPayload(data, address):
|
def decryptAndCheckPubkeyPayload(data, address):
|
||||||
"""
|
"""
|
||||||
Version 4 pubkeys are encrypted. This function is run when we
|
Version 4 pubkeys are encrypted. This function is run when we
|
||||||
|
|
|
@ -54,3 +54,12 @@ sample_subscription_addresses = [
|
||||||
'BM-2cWQLCBGorT9pUGkYSuGGVr9LzE4mRnQaq',
|
'BM-2cWQLCBGorT9pUGkYSuGGVr9LzE4mRnQaq',
|
||||||
'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw']
|
'BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw']
|
||||||
sample_subscription_name = 'test sub'
|
sample_subscription_name = 'test sub'
|
||||||
|
|
||||||
|
|
||||||
|
sample_object_expires = 1712271487
|
||||||
|
# from minode import structure
|
||||||
|
# obj = structure.Object(
|
||||||
|
# b'\x00' * 8, sample_object_expires, 42, 1, 2, b'HELLO')
|
||||||
|
# .. do pow and obj.to_bytes()
|
||||||
|
sample_object_data = unhexlify(
|
||||||
|
'00000000001be7fc00000000660f307f0000002a010248454c4c4f')
|
||||||
|
|
|
@ -2,11 +2,18 @@
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from binascii import hexlify, unhexlify
|
||||||
|
from struct import pack
|
||||||
|
|
||||||
from six.moves import queue, xmlrpc_client
|
from six.moves import queue, xmlrpc_client
|
||||||
|
|
||||||
|
from pybitmessage import protocol
|
||||||
|
from pybitmessage.defaults import (
|
||||||
|
networkDefaultProofOfWorkNonceTrialsPerByte,
|
||||||
|
networkDefaultPayloadLengthExtraBytes)
|
||||||
|
|
||||||
from .partial import TestPartialRun
|
from .partial import TestPartialRun
|
||||||
from .samples import sample_statusbar_msg # any
|
from .samples import sample_statusbar_msg, sample_object_data
|
||||||
|
|
||||||
|
|
||||||
class TestAPIThread(TestPartialRun):
|
class TestAPIThread(TestPartialRun):
|
||||||
|
@ -66,3 +73,22 @@ class TestAPIThread(TestPartialRun):
|
||||||
if sys.hexversion >= 0x3000000:
|
if sys.hexversion >= 0x3000000:
|
||||||
self.assertEqual(status["networkConnections"], 4)
|
self.assertEqual(status["networkConnections"], 4)
|
||||||
self.assertEqual(status["pendingDownload"], 0)
|
self.assertEqual(status["pendingDownload"], 0)
|
||||||
|
|
||||||
|
def test_disseminate_preencrypted(self):
|
||||||
|
"""Call disseminatePreEncryptedMsg API command and check inventory"""
|
||||||
|
import proofofwork
|
||||||
|
from inventory import Inventory
|
||||||
|
|
||||||
|
proofofwork.init()
|
||||||
|
update_object = pack(
|
||||||
|
'>Q', int(time.time() + 7200)) + sample_object_data[16:]
|
||||||
|
invhash = unhexlify(self.api.disseminatePreEncryptedMsg(
|
||||||
|
hexlify(update_object).decode(),
|
||||||
|
networkDefaultProofOfWorkNonceTrialsPerByte,
|
||||||
|
networkDefaultPayloadLengthExtraBytes
|
||||||
|
))
|
||||||
|
obj_type, obj_stream, obj_data = Inventory()[invhash][:3]
|
||||||
|
self.assertEqual(obj_type, 42)
|
||||||
|
self.assertEqual(obj_stream, 2)
|
||||||
|
self.assertEqual(sample_object_data[16:], obj_data[16:])
|
||||||
|
self.assertTrue(protocol.isProofOfWorkSufficient(obj_data))
|
||||||
|
|
|
@ -5,7 +5,8 @@ from struct import pack
|
||||||
|
|
||||||
from pybitmessage import addresses, protocol
|
from pybitmessage import addresses, protocol
|
||||||
|
|
||||||
from .samples import sample_addr_data
|
from .samples import (
|
||||||
|
sample_addr_data, sample_object_data, sample_object_expires)
|
||||||
from .test_protocol import TestSocketInet
|
from .test_protocol import TestSocketInet
|
||||||
|
|
||||||
|
|
||||||
|
@ -49,6 +50,14 @@ class TestSerialize(TestSocketInet):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
protocol.CreatePacket(b'ping')[:len(head)], head)
|
protocol.CreatePacket(b'ping')[:len(head)], head)
|
||||||
|
|
||||||
|
def test_decode_obj_parameters(self):
|
||||||
|
"""Check parameters decoded from a sample object"""
|
||||||
|
objectType, toStreamNumber, expiresTime = \
|
||||||
|
protocol.decodeObjectParameters(sample_object_data)
|
||||||
|
self.assertEqual(objectType, 42)
|
||||||
|
self.assertEqual(toStreamNumber, 2)
|
||||||
|
self.assertEqual(expiresTime, sample_object_expires)
|
||||||
|
|
||||||
def test_encodehost(self):
|
def test_encodehost(self):
|
||||||
"""Check the result of protocol.encodeHost()"""
|
"""Check the result of protocol.encodeHost()"""
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
|
Reference in New Issue
Block a user