Addrthread finish

- addrthread is supposed to spread addresses as they appear. This was never
  finished during migration to asyncore
- conservative to prevent flood and loops
- randomises order
- move protocol constants into a separate file
- move addr packet creation into a separate file
- see #1575
This commit is contained in:
Peter Šurda 2019-11-27 06:47:04 +01:00
parent e47b573b3e
commit a69732f060
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
6 changed files with 88 additions and 52 deletions

View File

@ -1,6 +1,11 @@
"""
Announce addresses as they are received from other hosts
"""
import Queue import Queue
import state import state
from helper_random import randomshuffle
from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from queues import addrQueue from queues import addrQueue
from threads import StoppableThread from threads import StoppableThread
@ -15,15 +20,26 @@ class AddrThread(StoppableThread):
while True: while True:
try: try:
data = addrQueue.get(False) data = addrQueue.get(False)
chunk.append((data[0], data[1])) chunk.append(data)
if len(data) > 2:
source = BMConnectionPool().getConnectionByAddr(data[2])
except Queue.Empty: except Queue.Empty:
break break
except KeyError:
continue
# finish if chunk:
# Choose peers randomly
connections = BMConnectionPool().establishedConnections()
randomshuffle(connections)
for i in connections:
randomshuffle(chunk)
filtered = []
for stream, peer, seen, destination in chunk:
# peer's own address or address received from peer
if i.destination in (peer, destination):
continue
if stream not in i.streams:
continue
filtered.append((stream, peer, seen))
if filtered:
i.append_write_buf(assemble_addr(filtered))
addrQueue.iterate() addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -6,8 +6,9 @@ src/network/announcethread.py
import time import time
import state import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from network.bmproto import BMProto from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.udp import UDPSocket from network.udp import UDPSocket
from node import Peer from node import Peer
@ -41,4 +42,4 @@ class AnnounceThread(StoppableThread):
'127.0.0.1', '127.0.0.1',
BMConfigParser().safeGetInt('bitmessagesettings', 'port')), BMConfigParser().safeGetInt('bitmessagesettings', 'port')),
time.time()) time.time())
connection.append_write_buf(BMProto.assembleAddr([addr])) connection.append_write_buf(assemble_addr([addr]))

32
src/network/assemble.py Normal file
View File

@ -0,0 +1,32 @@
"""
Create bitmessage protocol command packets
"""
import struct
import addresses
from network.constants import MAX_ADDR_COUNT
from network.node import Peer
from protocol import CreatePacket, encodeHost
def assemble_addr(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = (peerList)
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = addresses.encodeVarint(
len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
payload += struct.pack(
'>Q', timestamp) # 64-bit time
payload += struct.pack('>I', stream)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += encodeHost(peer.host)
payload += struct.pack('>H', peer.port) # remote port
retval += CreatePacket('addr', payload)
return retval

View File

@ -19,6 +19,12 @@ import state
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from inventory import Inventory from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.constants import (
ADDRESS_ALIVE,
MAX_MESSAGE_SIZE,
MAX_OBJECT_COUNT,
MAX_OBJECT_PAYLOAD_SIZE,
MAX_TIME_OFFSET)
from network.dandelion import Dandelion from network.dandelion import Dandelion
from network.bmobject import ( from network.bmobject import (
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError, BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
@ -51,18 +57,6 @@ class BMProtoExcessiveDataError(BMProtoError):
class BMProto(AdvancedDispatcher, ObjectTracker): class BMProto(AdvancedDispatcher, ObjectTracker):
"""A parser for the Bitmessage Protocol""" """A parser for the Bitmessage Protocol"""
# pylint: disable=too-many-instance-attributes, too-many-public-methods # pylint: disable=too-many-instance-attributes, too-many-public-methods
# ~1.6 MB which is the maximum possible size of an inv message.
maxMessageSize = 1600100
# 2**18 = 256kB is the maximum size of an object payload
maxObjectPayloadSize = 2**18
# protocol specification says max 1000 addresses in one addr command
maxAddrCount = 1000
# protocol specification says max 50000 objects in one inv command
maxObjectCount = 50000
# address is online if online less than this many seconds ago
addressAlive = 10800
# maximum time offset
maxTimeOffset = 3600
timeOffsetWrongCount = 0 timeOffsetWrongCount = 0
def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called
@ -100,7 +94,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.close_reason = "Bad magic" self.close_reason = "Bad magic"
self.set_state("close") self.set_state("close")
return False return False
if self.payloadLength > BMProto.maxMessageSize: if self.payloadLength > MAX_MESSAGE_SIZE:
self.invalid = True self.invalid = True
self.set_state( self.set_state(
"bm_command", "bm_command",
@ -343,7 +337,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def _command_inv(self, dandelion=False): def _command_inv(self, dandelion=False):
items = self.decode_payload_content("l32s") items = self.decode_payload_content("l32s")
if len(items) > BMProto.maxObjectCount: if len(items) > MAX_OBJECT_COUNT:
logger.error( logger.error(
'Too many items in %sinv message!', 'd' if dandelion else '') 'Too many items in %sinv message!', 'd' if dandelion else '')
raise BMProtoExcessiveDataError() raise BMProtoExcessiveDataError()
@ -378,7 +372,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
nonce, expiresTime, objectType, version, streamNumber, nonce, expiresTime, objectType, version, streamNumber,
self.payload, self.payloadOffset) self.payload, self.payloadOffset)
if len(self.payload) - self.payloadOffset > BMProto.maxObjectPayloadSize: if len(self.payload) - self.payloadOffset > MAX_OBJECT_PAYLOAD_SIZE:
logger.info( logger.info(
'The payload length of this object is too large (%d bytes).' 'The payload length of this object is too large (%d bytes).'
' Ignoring it.', len(self.payload) - self.payloadOffset) ' Ignoring it.', len(self.payload) - self.payloadOffset)
@ -442,7 +436,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
continue continue
if ( if (
decodedIP and time.time() - seenTime > 0 and decodedIP and time.time() - seenTime > 0 and
seenTime > time.time() - BMProto.addressAlive and seenTime > time.time() - ADDRESS_ALIVE and
port > 0 port > 0
): ):
peer = Peer(decodedIP, port) peer = Peer(decodedIP, port)
@ -461,7 +455,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"rating": 0, "rating": 0,
"self": False, "self": False,
} }
addrQueue.put((stream, peer, self.destination)) # since we don't track peers outside of knownnodes,
# only spread if in knownnodes to prevent flood
addrQueue.put((stream, peer, seenTime,
self.destination))
return True return True
def bm_command_portcheck(self): def bm_command_portcheck(self):
@ -552,7 +549,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
'Closing connection to old protocol version %s, node: %s', 'Closing connection to old protocol version %s, node: %s',
self.remoteProtocolVersion, self.destination) self.remoteProtocolVersion, self.destination)
return False return False
if self.timeOffset > BMProto.maxTimeOffset: if self.timeOffset > MAX_TIME_OFFSET:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the future compared to mine." errorText="Your time is too far in the future compared to mine."
" Closing connection.", fatal=2)) " Closing connection.", fatal=2))
@ -561,7 +558,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
" Closing connection to it.", self.destination, self.timeOffset) " Closing connection to it.", self.destination, self.timeOffset)
BMProto.timeOffsetWrongCount += 1 BMProto.timeOffsetWrongCount += 1
return False return False
elif self.timeOffset < -BMProto.maxTimeOffset: elif self.timeOffset < -MAX_TIME_OFFSET:
self.append_write_buf(protocol.assembleErrorMessage( self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the past compared to mine." errorText="Your time is too far in the past compared to mine."
" Closing connection.", fatal=2)) " Closing connection.", fatal=2))
@ -623,29 +620,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return True return True
@staticmethod
def assembleAddr(peerList):
"""Build up a packed address"""
if isinstance(peerList, Peer):
peerList = (peerList)
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), BMProto.maxAddrCount):
payload = addresses.encodeVarint(
len(peerList[i:i + BMProto.maxAddrCount]))
for address in peerList[i:i + BMProto.maxAddrCount]:
stream, peer, timestamp = address
payload += struct.pack(
'>Q', timestamp) # 64-bit time
payload += struct.pack('>I', stream)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(peer.host)
payload += struct.pack('>H', peer.port) # remote port
retval += protocol.CreatePacket('addr', payload)
return retval
@staticmethod @staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False): def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object""" """Stop downloading an object"""

11
src/network/constants.py Normal file
View File

@ -0,0 +1,11 @@
"""
Network protocol constants
"""
ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago
MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command
MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message.
MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload
MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command
MAX_TIME_OFFSET = 3600 #: maximum time offset

View File

@ -22,7 +22,9 @@ from bmconfigparser import BMConfigParser
from helper_random import randomBytes from helper_random import randomBytes
from inventory import Inventory from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher from network.advanceddispatcher import AdvancedDispatcher
from network.assemble import assemble_addr
from network.bmproto import BMProto from network.bmproto import BMProto
from network.constants import MAX_OBJECT_COUNT
from network.dandelion import Dandelion from network.dandelion import Dandelion
from network.objectracker import ObjectTracker from network.objectracker import ObjectTracker
from network.socks4a import Socks4aConnection from network.socks4a import Socks4aConnection
@ -183,7 +185,7 @@ class TCPConnection(BMProto, TLSDispatcher):
for peer, params in addrs[substream]: for peer, params in addrs[substream]:
templist.append((substream, peer, params["lastseen"])) templist.append((substream, peer, params["lastseen"]))
if templist: if templist:
self.append_write_buf(BMProto.assembleAddr(templist)) self.append_write_buf(assemble_addr(templist))
def sendBigInv(self): def sendBigInv(self):
""" """
@ -222,7 +224,7 @@ class TCPConnection(BMProto, TLSDispatcher):
# Remove -1 below when sufficient time has passed for users to # Remove -1 below when sufficient time has passed for users to
# upgrade to versions of PyBitmessage that accept inv with 50,000 # upgrade to versions of PyBitmessage that accept inv with 50,000
# items # items
if objectCount >= BMProto.maxObjectCount - 1: if objectCount >= MAX_OBJECT_COUNT - 1:
sendChunk() sendChunk()
payload = b'' payload = b''
objectCount = 0 objectCount = 0