From a69732f060608428ddfebf608bdb3c8751e296ce Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 27 Nov 2019 06:47:04 +0100 Subject: [PATCH] Addrthread finish - addrthread is supposed to spread addresses as they appear. This was never finished during migration to asyncore - conservative to prevent flood and loops - randomises order - move protocol constants into a separate file - move addr packet creation into a separate file - see #1575 --- src/network/addrthread.py | 28 +++++++++++++---- src/network/announcethread.py | 5 +-- src/network/assemble.py | 32 +++++++++++++++++++ src/network/bmproto.py | 58 ++++++++++------------------------- src/network/constants.py | 11 +++++++ src/network/tcp.py | 6 ++-- 6 files changed, 88 insertions(+), 52 deletions(-) create mode 100644 src/network/assemble.py create mode 100644 src/network/constants.py diff --git a/src/network/addrthread.py b/src/network/addrthread.py index d5d21599..8a0396f8 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -1,6 +1,11 @@ +""" +Announce addresses as they are received from other hosts +""" import Queue import state +from helper_random import randomshuffle +from network.assemble import assemble_addr from network.connectionpool import BMConnectionPool from queues import addrQueue from threads import StoppableThread @@ -15,15 +20,26 @@ class AddrThread(StoppableThread): while True: try: data = addrQueue.get(False) - chunk.append((data[0], data[1])) - if len(data) > 2: - source = BMConnectionPool().getConnectionByAddr(data[2]) + chunk.append(data) except Queue.Empty: break - except KeyError: - continue - # finish + if chunk: + # Choose peers randomly + connections = BMConnectionPool().establishedConnections() + randomshuffle(connections) + for i in connections: + randomshuffle(chunk) + filtered = [] + for stream, peer, seen, destination in chunk: + # peer's own address or address received from peer + if i.destination in (peer, destination): + continue + if stream not in i.streams: + continue + filtered.append((stream, peer, seen)) + if filtered: + i.append_write_buf(assemble_addr(filtered)) addrQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/announcethread.py b/src/network/announcethread.py index f635fc90..c11a2cc6 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -6,8 +6,9 @@ src/network/announcethread.py import time import state + from bmconfigparser import BMConfigParser -from network.bmproto import BMProto +from network.assemble import assemble_addr from network.connectionpool import BMConnectionPool from network.udp import UDPSocket from node import Peer @@ -41,4 +42,4 @@ class AnnounceThread(StoppableThread): '127.0.0.1', BMConfigParser().safeGetInt('bitmessagesettings', 'port')), time.time()) - connection.append_write_buf(BMProto.assembleAddr([addr])) + connection.append_write_buf(assemble_addr([addr])) diff --git a/src/network/assemble.py b/src/network/assemble.py new file mode 100644 index 00000000..2d31914c --- /dev/null +++ b/src/network/assemble.py @@ -0,0 +1,32 @@ +""" +Create bitmessage protocol command packets +""" + +import struct + +import addresses +from network.constants import MAX_ADDR_COUNT +from network.node import Peer +from protocol import CreatePacket, encodeHost + + +def assemble_addr(peerList): + """Create address command""" + if isinstance(peerList, Peer): + peerList = (peerList) + if not peerList: + return b'' + retval = b'' + for i in range(0, len(peerList), MAX_ADDR_COUNT): + payload = addresses.encodeVarint( + len(peerList[i:i + MAX_ADDR_COUNT])) + for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]: + payload += struct.pack( + '>Q', timestamp) # 64-bit time + payload += struct.pack('>I', stream) + payload += struct.pack( + '>q', 1) # service bit flags offered by this node + payload += encodeHost(peer.host) + payload += struct.pack('>H', peer.port) # remote port + retval += CreatePacket('addr', payload) + return retval diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 11e96fd6..d620daa3 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -19,6 +19,12 @@ import state from bmconfigparser import BMConfigParser from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher +from network.constants import ( + ADDRESS_ALIVE, + MAX_MESSAGE_SIZE, + MAX_OBJECT_COUNT, + MAX_OBJECT_PAYLOAD_SIZE, + MAX_TIME_OFFSET) from network.dandelion import Dandelion from network.bmobject import ( BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, @@ -51,18 +57,6 @@ class BMProtoExcessiveDataError(BMProtoError): class BMProto(AdvancedDispatcher, ObjectTracker): """A parser for the Bitmessage Protocol""" # pylint: disable=too-many-instance-attributes, too-many-public-methods - # ~1.6 MB which is the maximum possible size of an inv message. - maxMessageSize = 1600100 - # 2**18 = 256kB is the maximum size of an object payload - maxObjectPayloadSize = 2**18 - # protocol specification says max 1000 addresses in one addr command - maxAddrCount = 1000 - # protocol specification says max 50000 objects in one inv command - maxObjectCount = 50000 - # address is online if online less than this many seconds ago - addressAlive = 10800 - # maximum time offset - maxTimeOffset = 3600 timeOffsetWrongCount = 0 def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called @@ -100,7 +94,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.close_reason = "Bad magic" self.set_state("close") return False - if self.payloadLength > BMProto.maxMessageSize: + if self.payloadLength > MAX_MESSAGE_SIZE: self.invalid = True self.set_state( "bm_command", @@ -343,7 +337,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def _command_inv(self, dandelion=False): items = self.decode_payload_content("l32s") - if len(items) > BMProto.maxObjectCount: + if len(items) > MAX_OBJECT_COUNT: logger.error( 'Too many items in %sinv message!', 'd' if dandelion else '') raise BMProtoExcessiveDataError() @@ -378,7 +372,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): nonce, expiresTime, objectType, version, streamNumber, self.payload, self.payloadOffset) - if len(self.payload) - self.payloadOffset > BMProto.maxObjectPayloadSize: + if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE: logger.info( 'The payload length of this object is too large (%d bytes).' ' Ignoring it.', len(self.payload) - self.payloadOffset) @@ -442,7 +436,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): continue if ( decodedIP and time.time() - seenTime > 0 and - seenTime > time.time() - BMProto.addressAlive and + seenTime > time.time() - ADDRESS_ALIVE and port > 0 ): peer = Peer(decodedIP, port) @@ -461,7 +455,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker): "rating": 0, "self": False, } - addrQueue.put((stream, peer, self.destination)) + # since we don't track peers outside of knownnodes, + # only spread if in knownnodes to prevent flood + addrQueue.put((stream, peer, seenTime, + self.destination)) return True def bm_command_portcheck(self): @@ -552,7 +549,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): 'Closing connection to old protocol version %s, node: %s', self.remoteProtocolVersion, self.destination) return False - if self.timeOffset > BMProto.maxTimeOffset: + if self.timeOffset > MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( errorText="Your time is too far in the future compared to mine." " Closing connection.", fatal=2)) @@ -561,7 +558,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): " Closing connection to it.", self.destination, self.timeOffset) BMProto.timeOffsetWrongCount += 1 return False - elif self.timeOffset < -BMProto.maxTimeOffset: + elif self.timeOffset < -MAX_TIME_OFFSET: self.append_write_buf(protocol.assembleErrorMessage( errorText="Your time is too far in the past compared to mine." " Closing connection.", fatal=2)) @@ -623,29 +620,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True - @staticmethod - def assembleAddr(peerList): - """Build up a packed address""" - if isinstance(peerList, Peer): - peerList = (peerList) - if not peerList: - return b'' - retval = b'' - for i in range(0, len(peerList), BMProto.maxAddrCount): - payload = addresses.encodeVarint( - len(peerList[i:i + BMProto.maxAddrCount])) - for address in peerList[i:i + BMProto.maxAddrCount]: - stream, peer, timestamp = address - payload += struct.pack( - '>Q', timestamp) # 64-bit time - payload += struct.pack('>I', stream) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node - payload += protocol.encodeHost(peer.host) - payload += struct.pack('>H', peer.port) # remote port - retval += protocol.CreatePacket('addr', payload) - return retval - @staticmethod def stopDownloadingObject(hashId, forwardAnyway=False): """Stop downloading an object""" diff --git a/src/network/constants.py b/src/network/constants.py new file mode 100644 index 00000000..a3414ef3 --- /dev/null +++ b/src/network/constants.py @@ -0,0 +1,11 @@ +""" +Network protocol constants +""" + + +ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago +MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command +MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message. +MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload +MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command +MAX_TIME_OFFSET = 3600 #: maximum time offset diff --git a/src/network/tcp.py b/src/network/tcp.py index 31d20dea..3097765f 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -22,7 +22,9 @@ from bmconfigparser import BMConfigParser from helper_random import randomBytes from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher +from network.assemble import assemble_addr from network.bmproto import BMProto +from network.constants import MAX_OBJECT_COUNT from network.dandelion import Dandelion from network.objectracker import ObjectTracker from network.socks4a import Socks4aConnection @@ -183,7 +185,7 @@ class TCPConnection(BMProto, TLSDispatcher): for peer, params in addrs[substream]: templist.append((substream, peer, params["lastseen"])) if templist: - self.append_write_buf(BMProto.assembleAddr(templist)) + self.append_write_buf(assemble_addr(templist)) def sendBigInv(self): """ @@ -222,7 +224,7 @@ class TCPConnection(BMProto, TLSDispatcher): # Remove -1 below when sufficient time has passed for users to # upgrade to versions of PyBitmessage that accept inv with 50,000 # items - if objectCount >= BMProto.maxObjectCount - 1: + if objectCount >= MAX_OBJECT_COUNT - 1: sendChunk() payload = b'' objectCount = 0