diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 246dfdea..2e1f5891 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -1033,7 +1033,7 @@ class objectProcessor(threading.Thread): magic, command, payloadLength, checksum = protocol.Header.unpack( ackData[:protocol.Header.size]) - if magic != 0xE9BEB4D9: + if magic != protocol.magic: logger.info('Ackdata magic bytes were wrong. Not sending ackData.') return False payload = ackData[protocol.Header.size:] diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 8b46750f..fea0910e 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -3,12 +3,13 @@ Announce addresses as they are received from other hosts """ from six.moves import queue - +# magic imports! import state from helper_random import randomshuffle -from network.assemble import assemble_addr +from protocol import assembleAddrMessage +from queues import addrQueue # FIXME: init with queue from network.connectionpool import BMConnectionPool -from queues import addrQueue + from threads import StoppableThread @@ -41,7 +42,7 @@ class AddrThread(StoppableThread): continue filtered.append((stream, peer, seen)) if filtered: - i.append_write_buf(assemble_addr(filtered)) + i.append_write_buf(assembleAddrMessage(filtered)) addrQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 6e18e661..84807757 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -3,10 +3,12 @@ Announce myself (node address) """ import time +# magic imports! import state from bmconfigparser import config -from network.assemble import assemble_addr +from protocol import assembleAddrMessage from network.connectionpool import BMConnectionPool + from node import Peer from threads import StoppableThread @@ -37,7 +39,6 @@ class AnnounceThread(StoppableThread): stream, Peer( '127.0.0.1', - config.safeGetInt( - 'bitmessagesettings', 'port')), - time.time()) - connection.append_write_buf(assemble_addr([addr])) + config.safeGetInt('bitmessagesettings', 'port')), + int(time.time())) + connection.append_write_buf(assembleAddrMessage([addr])) diff --git a/src/network/assemble.py b/src/network/assemble.py deleted file mode 100644 index 32fad3e4..00000000 --- a/src/network/assemble.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Create bitmessage protocol command packets -""" -import struct - -import addresses -from network.constants import MAX_ADDR_COUNT -from network.node import Peer -from protocol import CreatePacket, encodeHost - - -def assemble_addr(peerList): - """Create address command""" - if isinstance(peerList, Peer): - peerList = [peerList] - if not peerList: - return b'' - retval = b'' - for i in range(0, len(peerList), MAX_ADDR_COUNT): - payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) - for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: - # 64-bit time - payload += struct.pack('>Q', timestamp) - payload += struct.pack('>I', stream) - # service bit flags offered by this node - payload += struct.pack('>q', 1) - payload += encodeHost(peer.host) - # remote port - payload += struct.pack('>H', peer.port) - retval += CreatePacket('addr', payload) - return retval diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 9e6d5044..e23dfe8d 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -11,6 +11,7 @@ import struct import time from binascii import hexlify +# magic imports! import addresses import connectionpool import knownnodes @@ -18,22 +19,20 @@ import protocol import state from bmconfigparser import config from inventory import Inventory +from queues import invQueue, objectProcessorQueue, portCheckerQueue +from randomtrackingdict import RandomTrackingDict from network.advanceddispatcher import AdvancedDispatcher from network.bmobject import ( BMObject, BMObjectAlreadyHaveError, BMObjectExpiredError, BMObjectInsufficientPOWError, BMObjectInvalidError, BMObjectUnwantedStreamError ) -from network.constants import ( - ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT, - MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET -) from network.dandelion import Dandelion from network.proxy import ProxyError + from node import Node, Peer from objectracker import ObjectTracker, missingObjects -from queues import invQueue, objectProcessorQueue, portCheckerQueue -from randomtrackingdict import RandomTrackingDict + logger = logging.getLogger('default') @@ -87,7 +86,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.magic, self.command, self.payloadLength, self.checksum = \ protocol.Header.unpack(self.read_buf[:protocol.Header.size]) self.command = self.command.rstrip('\x00') - if self.magic != 0xE9BEB4D9: + if self.magic != protocol.magic: # skip 1 byte in order to sync self.set_state("bm_header", length=1) self.bm_proto_reset() @@ -96,7 +95,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.close_reason = "Bad magic" self.set_state("close") return False - if self.payloadLength > MAX_MESSAGE_SIZE: + if self.payloadLength > protocol.MAX_MESSAGE_SIZE: self.invalid = True self.set_state( "bm_command", @@ -348,7 +347,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): """ items = self.decode_payload_content("l32s") - if len(items) > MAX_OBJECT_COUNT: + if len(items) > protocol.MAX_OBJECT_COUNT: logger.error( 'Too many items in %sinv message!', 'd' if dandelion else '') raise BMProtoExcessiveDataError() @@ -384,7 +383,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.payload, self.payloadOffset) payload_len = len(self.payload) - self.payloadOffset - if payload_len > MAX_OBJECT_PAYLOAD_SIZE: + if payload_len > protocol.MAX_OBJECT_PAYLOAD_SIZE: logger.info( 'The payload length of this object is too large' ' (%d bytes). Ignoring it.', payload_len) @@ -457,7 +456,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): decodedIP = protocol.checkIPAddress(ip) if ( decodedIP and time.time() - seenTime > 0 - and seenTime > time.time() - ADDRESS_ALIVE + and seenTime > time.time() - protocol.ADDRESS_ALIVE and port > 0 ): peer = Peer(decodedIP, port) @@ -570,7 +569,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closing connection to old protocol version %s, node: %s', self.remoteProtocolVersion, self.destination) return False - if self.timeOffset > MAX_TIME_OFFSET: + if self.timeOffset > protocol.MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( errorText="Your time is too far in the future" " compared to mine. Closing connection.", fatal=2)) @@ -580,7 +579,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False - elif self.timeOffset < -MAX_TIME_OFFSET: + elif self.timeOffset < -protocol.MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( errorText="Your time is too far in the past compared to mine." " Closing connection.", fatal=2)) diff --git a/src/network/constants.py b/src/network/constants.py deleted file mode 100644 index f8f4120f..00000000 --- a/src/network/constants.py +++ /dev/null @@ -1,17 +0,0 @@ -""" -Network protocol constants -""" - - -#: address is online if online less than this many seconds ago -ADDRESS_ALIVE = 10800 -#: protocol specification says max 1000 addresses in one addr command -MAX_ADDR_COUNT = 1000 -#: ~1.6 MB which is the maximum possible size of an inv message. -MAX_MESSAGE_SIZE = 1600100 -#: 2**18 = 256kB is the maximum size of an object payload -MAX_OBJECT_PAYLOAD_SIZE = 2**18 -#: protocol specification says max 50000 objects in one inv command -MAX_OBJECT_COUNT = 50000 -#: maximum time offset -MAX_TIME_OFFSET = 3600 diff --git a/src/network/tcp.py b/src/network/tcp.py index b18080c9..0bfde3bb 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -2,35 +2,37 @@ TCP protocol handler """ # pylint: disable=too-many-ancestors -import l10n + import logging import math import random import socket import time +# magic imports! import addresses -import asyncore_pollchoose as asyncore -import connectionpool import helper_random -import knownnodes +import l10n import protocol import state from bmconfigparser import config from helper_random import randomBytes from inventory import Inventory +from queues import invQueue, receiveDataQueue, UISignalQueue +from tr import _translate + +import asyncore_pollchoose as asyncore +import connectionpool +import knownnodes from network.advanceddispatcher import AdvancedDispatcher -from network.assemble import assemble_addr from network.bmproto import BMProto -from network.constants import MAX_OBJECT_COUNT from network.dandelion import Dandelion from network.objectracker import ObjectTracker from network.socks4a import Socks4aConnection from network.socks5 import Socks5Connection from network.tls import TLSDispatcher from node import Peer -from queues import invQueue, receiveDataQueue, UISignalQueue -from tr import _translate + logger = logging.getLogger('default') @@ -205,7 +207,7 @@ class TCPConnection(BMProto, TLSDispatcher): for peer, params in addrs[substream]: templist.append((substream, peer, params["lastseen"])) if templist: - self.append_write_buf(assemble_addr(templist)) + self.append_write_buf(protocol.assembleAddrMessage(templist)) def sendBigInv(self): """ @@ -244,7 +246,7 @@ class TCPConnection(BMProto, TLSDispatcher): # Remove -1 below when sufficient time has passed for users to # upgrade to versions of PyBitmessage that accept inv with 50,000 # items - if objectCount >= MAX_OBJECT_COUNT - 1: + if objectCount >= protocol.MAX_OBJECT_COUNT - 1: sendChunk() payload = b'' objectCount = 0 diff --git a/src/network/udp.py b/src/network/udp.py index 3f999332..1a9891ec 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -5,13 +5,15 @@ import logging import socket import time +# magic imports! import protocol import state +from queues import receiveDataQueue + from bmproto import BMProto -from constants import MAX_TIME_OFFSET from node import Peer from objectracker import ObjectTracker -from queues import receiveDataQueue + logger = logging.getLogger('default') @@ -81,8 +83,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes decodedIP = protocol.checkIPAddress(str(ip)) if stream not in state.streamsInWhichIAmParticipating: continue - if (seenTime < time.time() - MAX_TIME_OFFSET - or seenTime > time.time() + MAX_TIME_OFFSET): + if (seenTime < time.time() - protocol.MAX_TIME_OFFSET + or seenTime > time.time() + protocol.MAX_TIME_OFFSET): continue if decodedIP is False: # if the address isn't local, interpret it as diff --git a/src/protocol.py b/src/protocol.py index e49b45b0..f5aa2750 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -22,8 +22,24 @@ from bmconfigparser import config from debug import logger from fallback import RIPEMD160Hash from helper_sql import sqlExecute +from network.node import Peer from version import softwareVersion +# Network constants +magic = 0xE9BEB4D9 +#: protocol specification says max 1000 addresses in one addr command +MAX_ADDR_COUNT = 1000 +#: address is online if online less than this many seconds ago +ADDRESS_ALIVE = 10800 +#: ~1.6 MB which is the maximum possible size of an inv message. +MAX_MESSAGE_SIZE = 1600100 +#: 2**18 = 256kB is the maximum size of an object payload +MAX_OBJECT_PAYLOAD_SIZE = 2**18 +#: protocol specification says max 50000 objects in one inv command +MAX_OBJECT_COUNT = 50000 +#: maximum time offset +MAX_TIME_OFFSET = 3600 + # Service flags #: This is a normal network node NODE_NETWORK = 1 @@ -295,11 +311,33 @@ def CreatePacket(command, payload=b''): checksum = hashlib.sha512(payload).digest()[0:4] b = bytearray(Header.size + payload_length) - Header.pack_into(b, 0, 0xE9BEB4D9, command, payload_length, checksum) + Header.pack_into(b, 0, magic, command, payload_length, checksum) b[Header.size:] = payload return bytes(b) +def assembleAddrMessage(peerList): + """Create address command""" + if isinstance(peerList, Peer): + peerList = [peerList] + if not peerList: + return b'' + retval = b'' + for i in range(0, len(peerList), MAX_ADDR_COUNT): + payload = encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT])) + for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: + # 64-bit time + payload += pack('>Q', timestamp) + payload += pack('>I', stream) + # service bit flags offered by this node + payload += pack('>q', 1) + payload += encodeHost(peer.host) + # remote port + payload += pack('>H', peer.port) + retval += CreatePacket(b'addr', payload) + return retval + + def assembleVersionMessage( remoteHost, remotePort, participatingStreams, server=False, nodeid=None ): diff --git a/src/tests/samples.py b/src/tests/samples.py index 426bd4c1..9237d19d 100644 --- a/src/tests/samples.py +++ b/src/tests/samples.py @@ -3,7 +3,14 @@ from binascii import unhexlify -magic = 0xE9BEB4D9 +# 500 identical peers: +# 1626611891, 1, 1, 127.0.0.1, 8444 +sample_addr_data = unhexlify( + 'fd01f4' + ( + '0000000060f420b30000000' + '1000000000000000100000000000000000000ffff7f00000120fc' + ) * 500 +) # These keys are from addresses test script sample_pubsigningkey = unhexlify( diff --git a/src/tests/test_packets.py b/src/tests/test_packets.py index 3a0c43e9..f030912a 100644 --- a/src/tests/test_packets.py +++ b/src/tests/test_packets.py @@ -5,7 +5,7 @@ from struct import pack from pybitmessage import addresses, protocol -from .samples import magic +from .samples import sample_addr_data from .test_protocol import TestSocketInet @@ -45,7 +45,7 @@ class TestSerialize(TestSocketInet): def test_packet(self): """Check the packet created by protocol.CreatePacket()""" - head = unhexlify(b'%x' % magic) + head = unhexlify(b'%x' % protocol.magic) self.assertEqual( protocol.CreatePacket(b'ping')[:len(head)], head) @@ -67,3 +67,12 @@ class TestSerialize(TestSocketInet): self.assertEqual( protocol.encodeHost('quzwelsuziwqgpt2.onion'), unhexlify('fd87d87eeb438533622e54ca2d033e7a')) + + def test_assemble_addr(self): + """Assemble addr packet and compare it to pregenerated sample""" + self.assertEqual( + sample_addr_data, + protocol.assembleAddrMessage([ + (1, protocol.Peer('127.0.0.1', 8444), 1626611891) + for _ in range(500) + ])[protocol.Header.size:])