MiNode/minode/connection.py

345 lines
15 KiB
Python
Raw Normal View History

2016-06-30 10:11:33 +02:00
# -*- coding: utf-8 -*-
import base64
2016-08-03 19:05:01 +02:00
import errno
2016-06-30 10:11:33 +02:00
import logging
import random
2016-07-19 11:53:24 +02:00
import select
2016-06-30 10:11:33 +02:00
import socket
2016-07-19 11:53:24 +02:00
import ssl
2016-06-30 10:11:33 +02:00
import threading
import queue
import time
import message
import shared
import structure
class Connection(threading.Thread):
2017-06-09 20:41:33 +02:00
def __init__(self, host, port, s=None, network='ip', server=False, i2p_remote_dest=b''):
self.host = host
self.port = port
self.network = network
self.i2p_remote_dest = i2p_remote_dest
if self.network == 'i2p':
self.host_print = self.i2p_remote_dest[:8].decode()
else:
self.host_print = self.host
2016-06-30 10:11:33 +02:00
super().__init__(name='Connection to {}:{}'.format(host, port))
self.send_queue = queue.Queue()
self.vectors_to_get = set()
2017-03-20 20:52:53 +01:00
self.vectors_to_send = set()
2016-06-30 10:11:33 +02:00
self.status = 'ready'
2016-07-19 12:48:04 +02:00
self.tls = False
self.verack_received = False
self.verack_sent = False
2016-06-30 10:11:33 +02:00
self.s = s
self.remote_version = None
2017-06-09 20:41:33 +02:00
self.server = server
2016-06-30 10:11:33 +02:00
2017-06-09 20:41:33 +02:00
if bool(s):
2016-06-30 10:11:33 +02:00
self.status = 'connected'
2016-08-03 19:05:01 +02:00
self.buffer_receive = b''
self.buffer_send = b''
2016-06-30 10:11:33 +02:00
self.next_message_size = shared.header_length
self.next_header = True
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = False
2016-06-30 10:11:33 +02:00
self.last_message_received = time.time()
self.last_message_sent = time.time()
def run(self):
if self.s is None:
self._connect()
if self.status != 'connected':
return
2016-08-03 19:05:01 +02:00
self.s.settimeout(0)
2016-06-30 10:11:33 +02:00
if not self.server:
2017-06-09 20:41:33 +02:00
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
2016-06-30 10:11:33 +02:00
while True:
2016-08-17 17:51:04 +02:00
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
self._on_connection_fully_established()
2016-06-30 10:11:33 +02:00
data = True
try:
2016-08-17 17:51:04 +02:00
if self.status == 'fully_established':
data = self.s.recv(4096)
self.buffer_receive += data
2016-08-29 10:58:34 +02:00
if data and len(self.buffer_receive) < 4000000:
continue
2016-08-17 17:51:04 +02:00
else:
data = self.s.recv(self.next_message_size - len(self.buffer_receive))
self.buffer_receive += data
2016-08-03 19:05:01 +02:00
except ssl.SSLWantReadError:
2016-10-02 16:52:14 +02:00
if self.status == 'fully_established':
self._request_objects()
2017-03-20 20:52:53 +01:00
self._send_objects()
2016-08-03 19:05:01 +02:00
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
2016-10-02 16:52:14 +02:00
if self.status == 'fully_established':
self._request_objects()
2017-03-20 20:52:53 +01:00
self._send_objects()
2016-08-03 19:05:01 +02:00
else:
2017-06-09 20:41:33 +02:00
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
2016-10-15 15:53:26 +02:00
data = None
2016-06-30 10:11:33 +02:00
except ConnectionResetError:
2017-06-09 20:41:33 +02:00
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host_print, self.port))
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2016-08-03 19:05:01 +02:00
self._process_buffer_receive()
2016-06-30 10:11:33 +02:00
self._process_queue()
2016-08-03 19:05:01 +02:00
self._send_data()
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
2017-06-09 20:41:33 +02:00
self.host_print, self.port))
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
2016-08-03 19:05:01 +02:00
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
2017-06-09 20:41:33 +02:00
self.host_print, self.port))
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2016-08-03 19:05:01 +02:00
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
2016-06-30 10:11:33 +02:00
data = None
if not data:
self.status = 'disconnected'
self.s.close()
2017-06-09 20:41:33 +02:00
logging.info('Disconnected from {}:{}'.format(self.host_print, self.port))
2016-06-30 10:11:33 +02:00
break
2016-08-17 17:51:04 +02:00
time.sleep(0.2)
2016-06-30 10:11:33 +02:00
def _connect(self):
2017-06-09 20:41:33 +02:00
logging.debug('Connecting to {}:{}'.format(self.host_print, self.port))
2016-06-30 10:11:33 +02:00
try:
self.s = socket.create_connection((self.host, self.port), 10)
2016-06-30 10:11:33 +02:00
self.status = 'connected'
2017-06-09 20:41:33 +02:00
logging.info('Established TCP connection to {}:{}'.format(self.host_print, self.port))
2016-06-30 10:11:33 +02:00
except Exception as e:
2017-06-09 20:41:33 +02:00
logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host_print, self.port, e))
2016-06-30 10:11:33 +02:00
self.status = 'failed'
2016-08-03 19:05:01 +02:00
def _send_data(self):
2016-10-15 15:53:26 +02:00
if self.buffer_send and self:
2016-08-17 17:51:04 +02:00
try:
amount = self.s.send(self.buffer_send)
self.buffer_send = self.buffer_send[amount:]
except (BlockingIOError, ssl.SSLWantWriteError):
pass
except (BrokenPipeError, ConnectionResetError, ssl.SSLError) as e:
2017-06-09 20:41:33 +02:00
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2016-08-03 19:05:01 +02:00
2016-07-19 12:18:16 +02:00
def _do_tls_handshake(self):
2017-06-09 20:41:33 +02:00
logging.debug('Initializing TLS connection with {}:{}'.format(self.host_print, self.port))
2017-06-25 10:29:46 +02:00
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.set_ciphers('AECDH-AES256-SHA')
context.set_ecdh_curve("secp256k1")
context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE
self.s = context.wrap_socket(self.s, server_side=self.server, do_handshake_on_connect=False)
2016-07-19 12:18:16 +02:00
while True:
try:
self.s.do_handshake()
break
2016-08-03 19:05:01 +02:00
except ssl.SSLWantReadError:
select.select([self.s], [], [])
except ssl.SSLWantWriteError:
select.select([], [self.s], [])
2016-07-19 12:18:16 +02:00
except Exception as e:
2017-06-09 20:41:33 +02:00
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
2016-10-15 15:53:26 +02:00
self.status = 'disconnecting'
2016-07-19 12:18:16 +02:00
break
2016-07-19 12:48:04 +02:00
self.tls = True
2017-06-25 10:29:46 +02:00
logging.debug('Established {} connection with {}:{}'.format(self.s.version(), self.host_print, self.port))
2016-07-19 12:18:16 +02:00
2016-06-30 10:11:33 +02:00
def _send_message(self, m):
if type(m) == message.Message and m.command == b'object':
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, structure.Object.from_message(m)))
2016-06-30 10:11:33 +02:00
else:
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, m))
2016-08-03 19:05:01 +02:00
self.buffer_send += m.to_bytes()
2016-06-30 10:11:33 +02:00
2016-07-19 11:53:24 +02:00
def _on_connection_fully_established(self):
2017-06-09 20:41:33 +02:00
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host_print, self.port))
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = False
2017-06-09 20:41:33 +02:00
if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL
2016-07-19 12:48:04 +02:00
self._do_tls_handshake()
2017-06-16 09:32:59 +02:00
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'}
2017-06-16 09:32:59 +02:00
if len(shared.node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)})
2017-06-16 09:32:59 +02:00
if len(shared.unchecked_node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)})
2017-06-16 09:32:59 +02:00
if len(addr) != 0:
self.send_queue.put(message.Addr(addr))
2016-06-30 10:11:33 +02:00
with shared.objects_lock:
2016-07-19 14:09:42 +02:00
if len(shared.objects) > 0:
to_send = {vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()}
while len(to_send) > 0:
2017-06-11 07:55:53 +02:00
if len(to_send) > 10000:
# We limit size of inv messaged to 10000 entries because they might time out in very slow networks (I2P)
pack = random.sample(to_send, 10000)
self.send_queue.put(message.Inv(pack))
to_send.difference_update(pack)
else:
self.send_queue.put(message.Inv(to_send))
to_send.clear()
2016-07-19 14:09:42 +02:00
self.status = 'fully_established'
2016-06-30 10:11:33 +02:00
def _process_queue(self):
while not self.send_queue.empty():
m = self.send_queue.get()
if m:
2016-07-19 12:48:04 +02:00
if m == 'fully_established':
2016-08-03 19:05:01 +02:00
self.on_connection_fully_established_scheduled = True
2016-07-19 12:48:04 +02:00
else:
self._send_message(m)
self.last_message_sent = time.time()
2016-06-30 10:11:33 +02:00
else:
self.status = 'disconnecting'
break
2016-08-03 19:05:01 +02:00
def _process_buffer_receive(self):
while len(self.buffer_receive) >= self.next_message_size:
2016-06-30 10:11:33 +02:00
if self.next_header:
self.next_header = False
2016-10-15 17:00:18 +02:00
try:
h = message.Header.from_bytes(self.buffer_receive[:shared.header_length])
except ValueError as e:
self.status = 'disconnecting'
2017-06-09 20:41:33 +02:00
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
2016-10-15 17:00:18 +02:00
break
2016-06-30 10:11:33 +02:00
self.next_message_size += h.payload_length
else:
2016-10-15 17:00:18 +02:00
try:
m = message.Message.from_bytes(self.buffer_receive[:self.next_message_size])
except ValueError as e:
self.status = 'disconnecting'
2017-06-09 20:41:33 +02:00
logging.warning('Received malformed message from {}:{}, {}'.format(self.host_print, self.port, e))
2016-10-15 17:00:18 +02:00
break
2016-06-30 10:11:33 +02:00
self.next_header = True
2016-08-03 19:05:01 +02:00
self.buffer_receive = self.buffer_receive[self.next_message_size:]
2016-06-30 10:11:33 +02:00
self.next_message_size = shared.header_length
self.last_message_received = time.time()
try:
self._process_message(m)
except ValueError as e:
self.status = 'disconnecting'
2017-06-09 20:41:33 +02:00
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
break
2016-06-30 10:11:33 +02:00
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, str(version)))
2016-06-30 10:11:33 +02:00
if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce:
self.status = 'disconnecting'
self.send_queue.put(None)
else:
self.send_queue.put(message.Message(b'verack', b''))
2016-07-19 12:48:04 +02:00
self.verack_sent = True
2016-06-30 10:11:33 +02:00
self.remote_version = version
if not self.server:
2016-07-19 12:48:04 +02:00
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
2016-06-30 10:11:33 +02:00
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
2016-07-19 12:18:16 +02:00
if self.server:
2017-06-09 20:41:33 +02:00
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
2016-06-30 10:11:33 +02:00
elif m.command == b'verack':
2016-07-19 12:48:04 +02:00
self.verack_received = True
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
2016-07-19 12:48:04 +02:00
if self.server:
self.send_queue.put('fully_established')
2016-06-30 10:11:33 +02:00
elif m.command == b'inv':
inv = message.Inv.from_message(m)
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, inv))
2016-06-30 10:11:33 +02:00
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
2017-03-20 21:10:29 +01:00
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
2016-06-30 10:11:33 +02:00
elif m.command == b'object':
obj = structure.Object.from_message(m)
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
2016-06-30 10:11:33 +02:00
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if obj.object_type == shared.i2p_dest_obj_type:
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
2016-06-30 10:11:33 +02:00
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
2017-03-20 20:52:53 +01:00
self.vectors_to_send.update(getdata.vectors)
2016-07-09 19:37:07 +02:00
elif m.command == b'addr':
2016-06-30 10:11:33 +02:00
addr = message.Addr.from_message(m)
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
2016-06-30 10:11:33 +02:00
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
2016-07-09 19:37:07 +02:00
elif m.command == b'ping':
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
2016-07-09 19:37:07 +02:00
self.send_queue.put(message.Message(b'pong', b''))
2017-05-25 12:09:20 +02:00
elif m.command == b'error':
2017-06-09 20:41:33 +02:00
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
2016-06-30 10:11:33 +02:00
else:
2017-06-09 20:41:33 +02:00
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
2016-06-30 10:11:33 +02:00
def _request_objects(self):
if self.vectors_to_get:
2016-10-02 16:52:14 +02:00
self.vectors_to_get.difference_update(shared.objects.keys())
if self.vectors_to_get:
2017-06-11 07:55:53 +02:00
if len(self.vectors_to_get) > 64:
pack = random.sample(self.vectors_to_get, 64)
2016-10-02 16:52:14 +02:00
self.send_queue.put(message.GetData(pack))
self.vectors_to_get.difference_update(pack)
else:
self.send_queue.put(message.GetData(self.vectors_to_get))
self.vectors_to_get.clear()
2017-03-20 20:52:53 +01:00
def _send_objects(self):
if self.vectors_to_send:
2017-06-19 08:54:41 +02:00
if len(self.vectors_to_send) > 16:
to_send = random.sample(self.vectors_to_send, 16)
2017-03-20 20:52:53 +01:00
self.vectors_to_send.difference_update(to_send)
else:
to_send = self.vectors_to_send.copy()
self.vectors_to_send.clear()
with shared.objects_lock:
for vector in to_send:
obj = shared.objects.get(vector, None)
if obj:
self.send_queue.put(message.Message(b'object', obj.to_bytes()))