A minimal test for assembling addr message #2166

Merged
PeterSurda merged 6 commits from gitea-44 into v0.6 2023-11-28 14:23:39 +01:00
11 changed files with 100 additions and 89 deletions

View File

@ -1033,7 +1033,7 @@ class objectProcessor(threading.Thread):
magic, command, payloadLength, checksum = protocol.Header.unpack( magic, command, payloadLength, checksum = protocol.Header.unpack(
ackData[:protocol.Header.size]) ackData[:protocol.Header.size])
if magic != 0xE9BEB4D9: if magic != protocol.magic:
logger.info('Ackdata magic bytes were wrong. Not sending ackData.') logger.info('Ackdata magic bytes were wrong. Not sending ackData.')
return False return False
payload = ackData[protocol.Header.size:] payload = ackData[protocol.Header.size:]

View File

@ -3,12 +3,13 @@ Announce addresses as they are received from other hosts
""" """
from six.moves import queue from six.moves import queue
# magic imports!
import state import state
from helper_random import randomshuffle from helper_random import randomshuffle
from network.assemble import assemble_addr from protocol import assembleAddrMessage
from queues import addrQueue # FIXME: init with queue
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from queues import addrQueue
from threads import StoppableThread from threads import StoppableThread
@ -41,7 +42,7 @@ class AddrThread(StoppableThread):
continue continue
filtered.append((stream, peer, seen)) filtered.append((stream, peer, seen))
if filtered: if filtered:
i.append_write_buf(assemble_addr(filtered)) i.append_write_buf(assembleAddrMessage(filtered))
addrQueue.iterate() addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -3,10 +3,12 @@ Announce myself (node address)
""" """
import time import time
# magic imports!
import state import state
from bmconfigparser import config from bmconfigparser import config
from network.assemble import assemble_addr from protocol import assembleAddrMessage
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from node import Peer from node import Peer
from threads import StoppableThread from threads import StoppableThread
@ -37,7 +39,6 @@ class AnnounceThread(StoppableThread):
stream, stream,
Peer( Peer(
'127.0.0.1', '127.0.0.1',
config.safeGetInt( config.safeGetInt('bitmessagesettings', 'port')),
'bitmessagesettings', 'port')), int(time.time()))
time.time()) connection.append_write_buf(assembleAddrMessage([addr]))
connection.append_write_buf(assemble_addr([addr]))

View File

@ -1,31 +0,0 @@
"""
Create bitmessage protocol command packets
"""
import struct
import addresses
from network.constants import MAX_ADDR_COUNT
from network.node import Peer
from protocol import CreatePacket, encodeHost
def assemble_addr(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = [peerList]
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
# 64-bit time
payload += struct.pack('>Q', timestamp)
payload += struct.pack('>I', stream)
# service bit flags offered by this node
payload += struct.pack('>q', 1)
payload += encodeHost(peer.host)
# remote port
payload += struct.pack('>H', peer.port)
retval += CreatePacket('addr', payload)
return retval

View File

@ -11,6 +11,7 @@ import struct
import time import time
from binascii import hexlify from binascii import hexlify
# magic imports!
import addresses import addresses
import connectionpool import connectionpool
import knownnodes import knownnodes
@ -18,22 +19,20 @@ import protocol
import state import state
from bmconfigparser import config from bmconfigparser import config
from inventory import Inventory from inventory import Inventory
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.bmobject import ( from network.bmobject import (
BMObject, BMObjectAlreadyHaveError, BMObjectExpiredError, BMObject, BMObjectAlreadyHaveError, BMObjectExpiredError,
BMObjectInsufficientPOWError, BMObjectInvalidError, BMObjectInsufficientPOWError, BMObjectInvalidError,
BMObjectUnwantedStreamError BMObjectUnwantedStreamError
) )
from network.constants import (
ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT,
MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET
)
from network.dandelion import Dandelion from network.dandelion import Dandelion
from network.proxy import ProxyError from network.proxy import ProxyError
from node import Node, Peer from node import Node, Peer
from objectracker import ObjectTracker, missingObjects from objectracker import ObjectTracker, missingObjects
from queues import invQueue, objectProcessorQueue, portCheckerQueue
from randomtrackingdict import RandomTrackingDict
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -87,7 +86,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.magic, self.command, self.payloadLength, self.checksum = \ self.magic, self.command, self.payloadLength, self.checksum = \
protocol.Header.unpack(self.read_buf[:protocol.Header.size]) protocol.Header.unpack(self.read_buf[:protocol.Header.size])
self.command = self.command.rstrip('\x00') self.command = self.command.rstrip('\x00')
if self.magic != 0xE9BEB4D9: if self.magic != protocol.magic:
# skip 1 byte in order to sync # skip 1 byte in order to sync
self.set_state("bm_header", length=1) self.set_state("bm_header", length=1)
self.bm_proto_reset() self.bm_proto_reset()
@ -96,7 +95,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.close_reason = "Bad magic" self.close_reason = "Bad magic"
self.set_state("close") self.set_state("close")
return False return False
if self.payloadLength > MAX_MESSAGE_SIZE: if self.payloadLength > protocol.MAX_MESSAGE_SIZE:
self.invalid = True self.invalid = True
self.set_state( self.set_state(
"bm_command", "bm_command",
@ -348,7 +347,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
""" """
items = self.decode_payload_content("l32s") items = self.decode_payload_content("l32s")
if len(items) > MAX_OBJECT_COUNT: if len(items) > protocol.MAX_OBJECT_COUNT:
logger.error( logger.error(
'Too many items in %sinv message!', 'd' if dandelion else '') 'Too many items in %sinv message!', 'd' if dandelion else '')
raise BMProtoExcessiveDataError() raise BMProtoExcessiveDataError()
@ -384,7 +383,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.payload, self.payloadOffset) self.payload, self.payloadOffset)
payload_len = len(self.payload) - self.payloadOffset payload_len = len(self.payload) - self.payloadOffset
if payload_len > MAX_OBJECT_PAYLOAD_SIZE: if payload_len > protocol.MAX_OBJECT_PAYLOAD_SIZE:
logger.info( logger.info(
'The payload length of this object is too large' 'The payload length of this object is too large'
' (%d bytes). Ignoring it.', payload_len) ' (%d bytes). Ignoring it.', payload_len)
@ -457,7 +456,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
decodedIP = protocol.checkIPAddress(ip) decodedIP = protocol.checkIPAddress(ip)
if ( if (
decodedIP and time.time() - seenTime > 0 decodedIP and time.time() - seenTime > 0
and seenTime > time.time() - ADDRESS_ALIVE and seenTime > time.time() - protocol.ADDRESS_ALIVE
and port > 0 and port > 0
): ):
peer = Peer(decodedIP, port) peer = Peer(decodedIP, port)
@ -570,7 +569,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closing connection to old protocol version %s, node: %s', 'Closing connection to old protocol version %s, node: %s',
self.remoteProtocolVersion, self.destination) self.remoteProtocolVersion, self.destination)
return False return False
if self.timeOffset > MAX_TIME_OFFSET: if self.timeOffset > protocol.MAX_TIME_OFFSET:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the future" errorText="Your time is too far in the future"
" compared to mine. Closing connection.", fatal=2)) " compared to mine. Closing connection.", fatal=2))
@ -580,7 +579,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.destination, self.timeOffset) self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1 BMProto.timeOffsetWrongCount += 1
return False return False
elif self.timeOffset < -MAX_TIME_OFFSET: elif self.timeOffset < -protocol.MAX_TIME_OFFSET:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the past compared to mine." errorText="Your time is too far in the past compared to mine."
" Closing connection.", fatal=2)) " Closing connection.", fatal=2))

View File

@ -1,17 +0,0 @@
"""
Network protocol constants
"""
#: address is online if online less than this many seconds ago
ADDRESS_ALIVE = 10800
#: protocol specification says max 1000 addresses in one addr command
MAX_ADDR_COUNT = 1000
#: ~1.6 MB which is the maximum possible size of an inv message.
MAX_MESSAGE_SIZE = 1600100
#: 2**18 = 256kB is the maximum size of an object payload
MAX_OBJECT_PAYLOAD_SIZE = 2**18
#: protocol specification says max 50000 objects in one inv command
MAX_OBJECT_COUNT = 50000
#: maximum time offset
MAX_TIME_OFFSET = 3600

View File

@ -2,35 +2,37 @@
TCP protocol handler TCP protocol handler
""" """
# pylint: disable=too-many-ancestors # pylint: disable=too-many-ancestors
import l10n
import logging import logging
import math import math
import random import random
import socket import socket
import time import time
# magic imports!
import addresses import addresses
import asyncore_pollchoose as asyncore
import connectionpool
import helper_random import helper_random
import knownnodes import l10n
import protocol import protocol
import state import state
from bmconfigparser import config from bmconfigparser import config
from helper_random import randomBytes from helper_random import randomBytes
from inventory import Inventory from inventory import Inventory
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
import asyncore_pollchoose as asyncore
import connectionpool
import knownnodes
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.assemble import assemble_addr
from network.bmproto import BMProto from network.bmproto import BMProto
from network.constants import MAX_OBJECT_COUNT
from network.dandelion import Dandelion from network.dandelion import Dandelion
from network.objectracker import ObjectTracker from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection from network.socks4a import Socks4aConnection
from network.socks5 import Socks5Connection from network.socks5 import Socks5Connection
from network.tls import TLSDispatcher from network.tls import TLSDispatcher
from node import Peer from node import Peer
from queues import invQueue, receiveDataQueue, UISignalQueue
from tr import _translate
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -205,7 +207,7 @@ class TCPConnection(BMProto, TLSDispatcher):
for peer, params in addrs[substream]: for peer, params in addrs[substream]:
templist.append((substream, peer, params["lastseen"])) templist.append((substream, peer, params["lastseen"]))
if templist: if templist:
self.append_write_buf(assemble_addr(templist)) self.append_write_buf(protocol.assembleAddrMessage(templist))
def sendBigInv(self): def sendBigInv(self):
""" """
@ -244,7 +246,7 @@ class TCPConnection(BMProto, TLSDispatcher):
# Remove -1 below when sufficient time has passed for users to # Remove -1 below when sufficient time has passed for users to
# upgrade to versions of PyBitmessage that accept inv with 50,000 # upgrade to versions of PyBitmessage that accept inv with 50,000
# items # items
if objectCount >= MAX_OBJECT_COUNT - 1: if objectCount >= protocol.MAX_OBJECT_COUNT - 1:
sendChunk() sendChunk()
payload = b'' payload = b''
objectCount = 0 objectCount = 0

View File

@ -5,13 +5,15 @@ import logging
import socket import socket
import time import time
# magic imports!
import protocol import protocol
import state import state
from queues import receiveDataQueue
from bmproto import BMProto from bmproto import BMProto
from constants import MAX_TIME_OFFSET
from node import Peer from node import Peer
from objectracker import ObjectTracker from objectracker import ObjectTracker
from queues import receiveDataQueue
logger = logging.getLogger('default') logger = logging.getLogger('default')
@ -81,8 +83,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes
decodedIP = protocol.checkIPAddress(str(ip)) decodedIP = protocol.checkIPAddress(str(ip))
if stream not in state.streamsInWhichIAmParticipating: if stream not in state.streamsInWhichIAmParticipating:
continue continue
if (seenTime < time.time() - MAX_TIME_OFFSET if (seenTime < time.time() - protocol.MAX_TIME_OFFSET
or seenTime > time.time() + MAX_TIME_OFFSET): or seenTime > time.time() + protocol.MAX_TIME_OFFSET):
continue continue
if decodedIP is False: if decodedIP is False:
# if the address isn't local, interpret it as # if the address isn't local, interpret it as

View File

@ -22,8 +22,24 @@ from bmconfigparser import config
from debug import logger from debug import logger
from fallback import RIPEMD160Hash from fallback import RIPEMD160Hash
from helper_sql import sqlExecute from helper_sql import sqlExecute
from network.node import Peer
from version import softwareVersion from version import softwareVersion
# Network constants
magic = 0xE9BEB4D9
#: protocol specification says max 1000 addresses in one addr command
MAX_ADDR_COUNT = 1000
#: address is online if online less than this many seconds ago
ADDRESS_ALIVE = 10800
#: ~1.6 MB which is the maximum possible size of an inv message.
MAX_MESSAGE_SIZE = 1600100
#: 2**18 = 256kB is the maximum size of an object payload
MAX_OBJECT_PAYLOAD_SIZE = 2**18
#: protocol specification says max 50000 objects in one inv command
MAX_OBJECT_COUNT = 50000
#: maximum time offset
MAX_TIME_OFFSET = 3600
# Service flags # Service flags
#: This is a normal network node #: This is a normal network node
NODE_NETWORK = 1 NODE_NETWORK = 1
@ -295,11 +311,33 @@ def CreatePacket(command, payload=b''):
checksum = hashlib.sha512(payload).digest()[0:4] checksum = hashlib.sha512(payload).digest()[0:4]
b = bytearray(Header.size + payload_length) b = bytearray(Header.size + payload_length)
Header.pack_into(b, 0, 0xE9BEB4D9, command, payload_length, checksum) Header.pack_into(b, 0, magic, command, payload_length, checksum)
b[Header.size:] = payload b[Header.size:] = payload
return bytes(b) return bytes(b)
def assembleAddrMessage(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = [peerList]
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
# 64-bit time
payload += pack('>Q', timestamp)
payload += pack('>I', stream)
# service bit flags offered by this node
payload += pack('>q', 1)
payload += encodeHost(peer.host)
# remote port
payload += pack('>H', peer.port)
retval += CreatePacket(b'addr', payload)
return retval
def assembleVersionMessage( def assembleVersionMessage(
remoteHost, remotePort, participatingStreams, server=False, nodeid=None remoteHost, remotePort, participatingStreams, server=False, nodeid=None
): ):

View File

@ -3,7 +3,14 @@
from binascii import unhexlify from binascii import unhexlify
magic = 0xE9BEB4D9 # 500 identical peers:
# 1626611891, 1, 1, 127.0.0.1, 8444
sample_addr_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
) * 500
)
# These keys are from addresses test script # These keys are from addresses test script
sample_pubsigningkey = unhexlify( sample_pubsigningkey = unhexlify(

View File

@ -5,7 +5,7 @@ from struct import pack
from pybitmessage import addresses, protocol from pybitmessage import addresses, protocol
from .samples import magic from .samples import sample_addr_data
from .test_protocol import TestSocketInet from .test_protocol import TestSocketInet
@ -45,7 +45,7 @@ class TestSerialize(TestSocketInet):
def test_packet(self): def test_packet(self):
"""Check the packet created by protocol.CreatePacket()""" """Check the packet created by protocol.CreatePacket()"""
head = unhexlify(b'%x' % magic) head = unhexlify(b'%x' % protocol.magic)
self.assertEqual( self.assertEqual(
protocol.CreatePacket(b'ping')[:len(head)], head) protocol.CreatePacket(b'ping')[:len(head)], head)
@ -67,3 +67,12 @@ class TestSerialize(TestSocketInet):
self.assertEqual( self.assertEqual(
protocol.encodeHost('quzwelsuziwqgpt2.onion'), protocol.encodeHost('quzwelsuziwqgpt2.onion'),
unhexlify('fd87d87eeb438533622e54ca2d033e7a')) unhexlify('fd87d87eeb438533622e54ca2d033e7a'))
def test_assemble_addr(self):
"""Assemble addr packet and compare it to pregenerated sample"""
self.assertEqual(
sample_addr_data,
protocol.assembleAddrMessage([
(1, protocol.Peer('127.0.0.1', 8444), 1626611891)
for _ in range(500)
])[protocol.Header.size:])