MiNode/minode/connection.py

517 lines
20 KiB
Python
Raw Normal View History

2016-06-30 10:11:33 +02:00
# -*- coding: utf-8 -*-
2022-09-23 00:54:12 +02:00
"""The logic and behaviour of a single connection"""
import base64
2016-08-03 19:05:01 +02:00
import errno
2016-06-30 10:11:33 +02:00
import logging
import math
2016-06-30 10:11:33 +02:00
import random
2016-07-19 11:53:24 +02:00
import select
2016-06-30 10:11:33 +02:00
import socket
2016-07-19 11:53:24 +02:00
import ssl
2016-06-30 10:11:33 +02:00
import threading
import queue
import time
2021-03-09 15:40:59 +01:00
from . import message, shared, structure
2016-06-30 10:11:33 +02:00
class ConnectionBase(threading.Thread):
"""
Common code for the connection thread
with minimum command handlers to reuse
"""
2021-03-08 16:06:07 +01:00
def __init__(
self, host, port, s=None, network='ip', server=False,
i2p_remote_dest=b''
):
2017-06-09 20:41:33 +02:00
self.host = host
self.port = port
self.network = network
self.i2p_remote_dest = i2p_remote_dest
if self.network == 'i2p':
self.host_print = self.i2p_remote_dest[:8].decode()
else:
self.host_print = self.host
2016-06-30 10:11:33 +02:00
super().__init__(name='Connection to {}:{}'.format(host, port))
self.send_queue = queue.Queue()
self.vectors_to_get = set()
2017-03-20 20:52:53 +01:00
self.vectors_to_send = set()
2016-06-30 10:11:33 +02:00
2023-08-14 04:53:20 +02:00
self.vectors_requested = {}
2016-06-30 10:11:33 +02:00
self.status = 'ready'
2016-07-19 12:48:04 +02:00
self.tls = False
self.verack_received = False
self.verack_sent = False
2016-06-30 10:11:33 +02:00
self.s = s
self.remote_version = None
2017-06-09 20:41:33 +02:00
self.server = server
2016-06-30 10:11:33 +02:00
2017-06-09 20:41:33 +02:00
if bool(s):
2016-06-30 10:11:33 +02:00
self.status = 'connected'
2016-08-03 19:05:01 +02:00
self.buffer_receive = b''
self.buffer_send = b''
2016-06-30 10:11:33 +02:00
self.next_message_size = shared.header_length
self.next_header = True
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = False
2016-06-30 10:11:33 +02:00
self.last_message_received = time.time()
self.last_message_sent = time.time()
self.wait_until = 0
2016-06-30 10:11:33 +02:00
def run(self):
if self.s is None:
self._connect()
if self.status != 'connected':
return
2016-08-03 19:05:01 +02:00
self.s.settimeout(0)
2016-06-30 10:11:33 +02:00
if not self.server:
2017-06-09 20:41:33 +02:00
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
2016-06-30 10:11:33 +02:00
while True:
2021-03-08 16:06:07 +01:00
if (
self.on_connection_fully_established_scheduled
and not (self.buffer_send or self.buffer_receive)
):
2016-08-17 17:51:04 +02:00
self._on_connection_fully_established()
2016-06-30 10:11:33 +02:00
data = True
try:
2016-08-17 17:51:04 +02:00
if self.status == 'fully_established':
2021-03-08 16:06:07 +01:00
data = self.s.recv(4096)
self.buffer_receive += data
if data and len(self.buffer_receive) < 4000000:
continue
2016-08-17 17:51:04 +02:00
else:
2021-03-08 16:06:07 +01:00
data = self.s.recv(
self.next_message_size - len(self.buffer_receive))
2016-08-17 17:51:04 +02:00
self.buffer_receive += data
2016-08-03 19:05:01 +02:00
except ssl.SSLWantReadError:
2016-10-02 16:52:14 +02:00
if self.status == 'fully_established':
self._request_objects()
2017-03-20 20:52:53 +01:00
self._send_objects()
2016-08-03 19:05:01 +02:00
except socket.error as e:
err = e.args[0]
2021-03-08 16:06:07 +01:00
if err in (errno.EAGAIN, errno.EWOULDBLOCK):
2016-10-02 16:52:14 +02:00
if self.status == 'fully_established':
self._request_objects()
2017-03-20 20:52:53 +01:00
self._send_objects()
2016-08-03 19:05:01 +02:00
else:
2021-03-08 16:06:07 +01:00
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
2016-10-15 15:53:26 +02:00
data = None
2016-08-03 19:05:01 +02:00
self._process_buffer_receive()
2016-06-30 10:11:33 +02:00
self._process_queue()
2016-08-03 19:05:01 +02:00
self._send_data()
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
2021-03-08 16:06:07 +01:00
'Disconnecting from %s:%s. Reason:'
' time.time() - self.last_message_received'
' > shared.timeout', self.host_print, self.port)
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2021-03-08 16:06:07 +01:00
if (
time.time() - self.last_message_received > 30
and self.status != 'fully_established'
and self.status != 'disconnecting'
):
2016-08-03 19:05:01 +02:00
logging.debug(
2021-03-08 16:06:07 +01:00
'Disconnecting from %s:%s. Reason:'
' time.time() - self.last_message_received > 30'
' and self.status != "fully_established"',
self.host_print, self.port)
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2021-03-08 16:06:07 +01:00
if (
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'ping', b''))
if self.status == 'disconnecting' or shared.shutting_down:
2016-06-30 10:11:33 +02:00
data = None
if not data:
self.status = 'disconnected'
self.s.close()
2023-08-16 02:42:16 +02:00
logging.info(
2021-03-08 16:06:07 +01:00
'Disconnected from %s:%s', self.host_print, self.port)
2016-06-30 10:11:33 +02:00
break
2016-08-17 17:51:04 +02:00
time.sleep(0.2)
2016-06-30 10:11:33 +02:00
def _connect(self):
peer_str = '{0.host_print}:{0.port}'.format(self)
logging.debug('Connecting to %s', peer_str)
2016-06-30 10:11:33 +02:00
try:
self.s = socket.create_connection((self.host, self.port), 10)
2016-06-30 10:11:33 +02:00
self.status = 'connected'
logging.debug('Established TCP connection to %s', peer_str)
except socket.timeout:
pass
except OSError as e:
# unreachable, refused, no route
(logging.info if e.errno not in (101, 111, 113)
else logging.debug)(
'Connection to %s failed. Reason: %s', peer_str, e)
except Exception:
2021-03-08 16:06:07 +01:00
logging.info(
'Connection to %s failed.', peer_str, exc_info=True)
if self.status != 'connected':
2016-06-30 10:11:33 +02:00
self.status = 'failed'
2016-08-03 19:05:01 +02:00
def _send_data(self):
2016-10-15 15:53:26 +02:00
if self.buffer_send and self:
2016-08-17 17:51:04 +02:00
try:
amount = self.s.send(self.buffer_send)
self.buffer_send = self.buffer_send[amount:]
except (BlockingIOError, ssl.SSLWantWriteError):
pass
2021-03-08 16:06:07 +01:00
except (
BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError
) as e:
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2016-08-03 19:05:01 +02:00
2016-07-19 12:18:16 +02:00
def _do_tls_handshake(self):
2021-03-08 16:06:07 +01:00
logging.debug(
'Initializing TLS connection with %s:%s',
self.host_print, self.port)
2017-06-25 10:29:46 +02:00
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH if self.server
else ssl.Purpose.SERVER_AUTH
)
2017-06-25 10:29:46 +02:00
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
2017-07-12 09:52:41 +02:00
2021-03-08 16:06:07 +01:00
if (
ssl.OPENSSL_VERSION_NUMBER >= 0x10100000
and not ssl.OPENSSL_VERSION.startswith("LibreSSL")
): # OpenSSL>=1.1
2017-07-12 09:52:41 +02:00
context.set_ciphers('AECDH-AES256-SHA@SECLEVEL=0')
else:
context.set_ciphers('AECDH-AES256-SHA')
2017-06-25 10:29:46 +02:00
context.set_ecdh_curve("secp256k1")
2021-03-08 16:06:07 +01:00
context.options = (
ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
| ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE)
# OP_NO_SSL* is deprecated since 3.6
try:
# TODO: ssl.TLSVersion.TLSv1 is deprecated
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1_2
except AttributeError:
pass
2017-06-25 10:29:46 +02:00
2021-03-08 16:06:07 +01:00
self.s = context.wrap_socket(
self.s, server_side=self.server, do_handshake_on_connect=False)
2017-06-25 10:29:46 +02:00
2016-07-19 12:18:16 +02:00
while True:
try:
self.s.do_handshake()
break
2016-08-03 19:05:01 +02:00
except ssl.SSLWantReadError:
select.select([self.s], [], [])
except ssl.SSLWantWriteError:
select.select([], [self.s], [])
2016-07-19 12:18:16 +02:00
except Exception as e:
2021-03-08 16:06:07 +01:00
logging.debug(
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
if isinstance(e, ssl.SSLError): # pylint: disable=no-member
logging.debug('ssl.SSLError reason: %s', e.reason)
shared.node_pool.discard((self.host, self.port))
return
2016-07-19 12:48:04 +02:00
self.tls = True
2021-03-08 16:06:07 +01:00
logging.debug(
'Established TLS connection with %s:%s (%s)',
self.host_print, self.port, self.s.version())
2016-07-19 12:18:16 +02:00
2016-06-30 10:11:33 +02:00
def _send_message(self, m):
2021-03-08 16:06:07 +01:00
if isinstance(m, message.Message) and m.command == b'object':
logging.debug(
'%s:%s <- %s',
self.host_print, self.port, structure.Object.from_message(m))
2016-06-30 10:11:33 +02:00
else:
2021-03-08 16:06:07 +01:00
logging.debug('%s:%s <- %s', self.host_print, self.port, m)
2016-08-03 19:05:01 +02:00
self.buffer_send += m.to_bytes()
2016-06-30 10:11:33 +02:00
2016-07-19 11:53:24 +02:00
def _on_connection_fully_established(self):
2021-03-08 16:06:07 +01:00
logging.info(
'Established Bitmessage protocol connection to %s:%s',
self.host_print, self.port)
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = False
2021-03-08 16:06:07 +01:00
if self.remote_version.services & 2 and self.network == 'ip':
self._do_tls_handshake() # NODE_SSL
2017-06-16 09:32:59 +02:00
2021-03-08 16:06:07 +01:00
addr = {
structure.NetAddr(c.remote_version.services, c.host, c.port)
for c in shared.connections if c.network != 'i2p'
and c.server is False and c.status == 'fully_established'}
# pylint: disable=unsubscriptable-object
# https://github.com/pylint-dev/pylint/issues/3637
2017-06-16 09:32:59 +02:00
if len(shared.node_pool) > 10:
2021-03-08 16:06:07 +01:00
addr.update({
structure.NetAddr(1, a[0], a[1])
for a in random.sample(tuple(shared.node_pool), 10)})
2017-06-16 09:32:59 +02:00
if len(shared.unchecked_node_pool) > 10:
2021-03-08 16:06:07 +01:00
addr.update({
structure.NetAddr(1, a[0], a[1])
for a in random.sample(tuple(shared.unchecked_node_pool), 10)})
2017-06-16 09:32:59 +02:00
if len(addr) != 0:
self.send_queue.put(message.Addr(addr))
2016-06-30 10:11:33 +02:00
with shared.objects_lock:
2016-07-19 14:09:42 +02:00
if len(shared.objects) > 0:
2021-03-08 16:06:07 +01:00
to_send = {
vector for vector in shared.objects.keys()
if shared.objects[vector].expires_time > time.time()}
while len(to_send) > 0:
2017-06-11 07:55:53 +02:00
if len(to_send) > 10000:
2021-03-08 16:06:07 +01:00
# We limit size of inv messaged to 10000 entries
# because they might time out
# in very slow networks (I2P)
pack = random.sample(tuple(to_send), 10000)
self.send_queue.put(message.Inv(pack))
to_send.difference_update(pack)
else:
self.send_queue.put(message.Inv(to_send))
to_send.clear()
2016-07-19 14:09:42 +02:00
self.status = 'fully_established'
2016-06-30 10:11:33 +02:00
def _process_queue(self):
while not self.send_queue.empty():
m = self.send_queue.get()
if m:
2016-07-19 12:48:04 +02:00
if m == 'fully_established':
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = True
2016-07-19 12:48:04 +02:00
else:
self._send_message(m)
self.last_message_sent = time.time()
2016-06-30 10:11:33 +02:00
else:
self.status = 'disconnecting'
break
2016-08-03 19:05:01 +02:00
def _process_buffer_receive(self):
while len(self.buffer_receive) >= self.next_message_size:
2016-06-30 10:11:33 +02:00
if self.next_header:
self.next_header = False
2016-10-15 17:00:18 +02:00
try:
2021-03-08 16:06:07 +01:00
h = message.Header.from_bytes(
self.buffer_receive[:shared.header_length])
2016-10-15 17:00:18 +02:00
except ValueError as e:
self.status = 'disconnecting'
2021-03-08 16:06:07 +01:00
logging.warning(
'Received malformed message from %s:%s: %s',
self.host_print, self.port, e)
2016-10-15 17:00:18 +02:00
break
2016-06-30 10:11:33 +02:00
self.next_message_size += h.payload_length
else:
2016-10-15 17:00:18 +02:00
try:
2021-03-08 16:06:07 +01:00
m = message.Message.from_bytes(
self.buffer_receive[:self.next_message_size])
2016-10-15 17:00:18 +02:00
except ValueError as e:
self.status = 'disconnecting'
2021-03-08 16:06:07 +01:00
logging.warning(
'Received malformed message from %s:%s, %s',
self.host_print, self.port, e)
2016-10-15 17:00:18 +02:00
break
2016-06-30 10:11:33 +02:00
self.next_header = True
2021-03-08 16:06:07 +01:00
self.buffer_receive = self.buffer_receive[
self.next_message_size:]
2016-06-30 10:11:33 +02:00
self.next_message_size = shared.header_length
self.last_message_received = time.time()
try:
self._process_message(m)
except ValueError as e:
self.status = 'disconnecting'
2021-03-08 16:06:07 +01:00
logging.warning(
'Received malformed message from %s:%s: %s',
self.host_print, self.port, e)
break
2016-06-30 10:11:33 +02:00
def _process_message(self, m):
if m.command == b'verack':
2016-07-19 12:48:04 +02:00
self.verack_received = True
2021-03-08 16:06:07 +01:00
logging.debug(
'%s:%s -> %s', self.host_print, self.port, 'verack')
2016-07-19 12:48:04 +02:00
if self.server:
self.send_queue.put('fully_established')
2016-07-09 19:37:07 +02:00
elif m.command == b'ping':
2021-03-08 16:06:07 +01:00
logging.debug('%s:%s -> ping', self.host_print, self.port)
2016-07-09 19:37:07 +02:00
self.send_queue.put(message.Message(b'pong', b''))
2017-07-20 18:22:59 +02:00
2017-05-25 12:09:20 +02:00
elif m.command == b'error':
2023-08-14 07:19:00 +02:00
error = message.Error.from_message(m)
logging.warning(
2023-08-14 07:19:00 +02:00
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
2017-07-20 18:22:59 +02:00
2016-06-30 10:11:33 +02:00
else:
try:
getattr(self, '_process_msg_{}'.format(m.command.decode()))(m)
except (AttributeError, UnicodeDecodeError):
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
def _process_msg_version(self, m):
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
or version.nonce == shared.nonce
or version.nonce in shared.nonce_pool.values()
):
logging.warning(
'Disconnecting v%s node %s with nonce %s',
version.protocol_version, self.host_print,
base64.b16encode(version.nonce))
self.status = 'disconnecting'
self.send_queue.put(None)
else:
shared.nonce_pool[self.host] = version.nonce
logging.info(
'%s:%s claims to be %s',
self.host_print, self.port, version.user_agent)
self.send_queue.put(message.Message(b'verack', b''))
self.verack_sent = True
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(
shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
def _process_msg_addr(self, m):
addr = message.Addr.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
2016-06-30 10:11:33 +02:00
def _request_objects(self):
if self.vectors_to_get and len(self.vectors_requested) < 100:
2016-10-02 16:52:14 +02:00
self.vectors_to_get.difference_update(shared.objects.keys())
if not self.wait_until:
nodes_count = (
len(shared.node_pool) + len(shared.unchecked_node_pool))
logging.debug('Nodes count is %i', nodes_count)
delay = math.ceil(math.log(nodes_count + 2, 20)) * 5.2
self.wait_until = time.time() + delay
logging.debug('Skip sending getdata for %.2fs', delay)
if self.vectors_to_get and self.wait_until < time.time():
logging.info(
'Queued %s vectors to get', len(self.vectors_to_get))
2017-06-11 07:55:53 +02:00
if len(self.vectors_to_get) > 64:
pack = random.sample(tuple(self.vectors_to_get), 64)
2016-10-02 16:52:14 +02:00
self.send_queue.put(message.GetData(pack))
2021-03-08 16:06:07 +01:00
self.vectors_requested.update({
vector: time.time() for vector in pack
if vector not in self.vectors_requested})
2016-10-02 16:52:14 +02:00
self.vectors_to_get.difference_update(pack)
else:
self.send_queue.put(message.GetData(self.vectors_to_get))
2021-03-08 16:06:07 +01:00
self.vectors_requested.update({
vector: time.time() for vector in self.vectors_to_get
if vector not in self.vectors_requested})
2016-10-02 16:52:14 +02:00
self.vectors_to_get.clear()
if self.vectors_requested:
2021-03-08 16:06:07 +01:00
self.vectors_requested = {
vector: t for vector, t in self.vectors_requested.items()
if vector not in shared.objects and t > time.time() - 15 * 60}
to_re_request = {
vector for vector, t in self.vectors_requested.items()
if t < time.time() - 10 * 60}
if to_re_request:
self.vectors_to_get.update(to_re_request)
logging.info(
2021-03-08 16:06:07 +01:00
'Re-requesting %i objects from %s:%s',
len(to_re_request), self.host_print, self.port)
2017-03-20 20:52:53 +01:00
def _send_objects(self):
if self.vectors_to_send:
logging.info(
'Preparing to send %s objects', len(self.vectors_to_send))
2017-06-19 08:54:41 +02:00
if len(self.vectors_to_send) > 16:
to_send = random.sample(tuple(self.vectors_to_send), 16)
2017-03-20 20:52:53 +01:00
self.vectors_to_send.difference_update(to_send)
else:
to_send = self.vectors_to_send.copy()
self.vectors_to_send.clear()
with shared.objects_lock:
for vector in to_send:
obj = shared.objects.get(vector, None)
if obj:
2021-03-09 15:40:59 +01:00
self.send_queue.put(
message.Message(b'object', obj.to_bytes()))
class Connection(ConnectionBase):
"""The connection with all commands implementation"""
def _process_msg_inv(self, m):
inv = message.Inv.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
def _process_msg_object(self, m):
obj = structure.Object.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
self.vectors_requested.pop(obj.vector, None)
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
and obj.version == shared.i2p_dest_obj_version
):
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug(
'Received I2P destination object,'
' adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
def _process_msg_getdata(self, m):
getdata = message.GetData.from_message(m)
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
self.vectors_to_send.update(getdata.vectors)
2021-03-09 15:40:59 +01:00
shared.connection = Connection