Experimental I2P support

This commit is contained in:
TheKysek 2017-06-09 20:41:33 +02:00
parent b230e024f9
commit 1a3e340537
No known key found for this signature in database
GPG Key ID: 50D9AF00D0B1C497
19 changed files with 363 additions and 55 deletions

0
minode/__init__.py Normal file
View File

View File

@ -33,7 +33,11 @@ class Advertiser(threading.Thread):
def _advertise_addresses():
addresses_to_advertise = set()
while not shared.address_advertise_queue.empty():
addresses_to_advertise.add(shared.address_advertise_queue.get())
addr = shared.address_advertise_queue.get()
if addr.port == 'i2p':
# We should not try to construct Addr messages with I2P destinations (yet)
continue
addresses_to_advertise.add(addr)
if len(addresses_to_advertise) > 0:
for c in shared.connections.copy():
if c.status == 'fully_established':

View File

@ -16,7 +16,17 @@ import structure
class Connection(threading.Thread):
def __init__(self, host, port, s=None):
def __init__(self, host, port, s=None, network='ip', server=False, i2p_remote_dest=b''):
self.host = host
self.port = port
self.network = network
self.i2p_remote_dest = i2p_remote_dest
if self.network == 'i2p':
self.host_print = self.i2p_remote_dest[:8].decode()
else:
self.host_print = self.host
super().__init__(name='Connection to {}:{}'.format(host, port))
self.send_queue = queue.Queue()
@ -31,16 +41,13 @@ class Connection(threading.Thread):
self.verack_received = False
self.verack_sent = False
self.host = host
self.port = int(port)
self.s = s
self.remote_version = None
self.server = bool(s)
self.server = server
if self.server:
if bool(s):
self.status = 'connected'
self.buffer_receive = b''
@ -60,7 +67,10 @@ class Connection(threading.Thread):
return
self.s.settimeout(0)
if not self.server:
self.send_queue.put(message.Version(self.host, self.port))
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
while True:
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
self._on_connection_fully_established()
@ -85,10 +95,10 @@ class Connection(threading.Thread):
self._request_objects()
self._send_objects()
else:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
data = None
except ConnectionResetError:
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host, self.port))
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host_print, self.port))
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
@ -96,12 +106,12 @@ class Connection(threading.Thread):
if time.time() - self.last_message_received > shared.timeout:
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
self.host, self.port))
self.host_print, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
logging.debug(
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
self.host, self.port))
self.host_print, self.port))
self.status = 'disconnecting'
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
self.send_queue.put(message.Message(b'pong', b''))
@ -110,19 +120,19 @@ class Connection(threading.Thread):
if not data:
self.status = 'disconnected'
self.s.close()
logging.info('Disconnected from {}:{}'.format(self.host, self.port))
logging.info('Disconnected from {}:{}'.format(self.host_print, self.port))
break
time.sleep(0.2)
def _connect(self):
logging.debug('Connecting to {}:{}'.format(self.host, self.port))
logging.debug('Connecting to {}:{}'.format(self.host_print, self.port))
try:
self.s = socket.create_connection((self.host, self.port), 10)
self.status = 'connected'
logging.info('Established TCP connection to {}:{}'.format(self.host, self.port))
logging.info('Established TCP connection to {}:{}'.format(self.host_print, self.port))
except Exception as e:
logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host, self.port, e))
logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'failed'
def _send_data(self):
@ -133,11 +143,11 @@ class Connection(threading.Thread):
except (BlockingIOError, ssl.SSLWantWriteError):
pass
except (BrokenPipeError, ConnectionResetError, ssl.SSLError) as e:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'disconnecting'
def _do_tls_handshake(self):
logging.debug('Initializing TLS connection with {}:{}'.format(self.host, self.port))
logging.debug('Initializing TLS connection with {}:{}'.format(self.host_print, self.port))
self.s = ssl.wrap_socket(self.s, keyfile=os.path.join(shared.source_directory, 'tls', 'key.pem'),
certfile=os.path.join(shared.source_directory, 'tls', 'cert.pem'),
server_side=self.server, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False,
@ -153,23 +163,23 @@ class Connection(threading.Thread):
except ssl.SSLWantWriteError:
select.select([], [self.s], [])
except Exception as e:
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e))
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
self.status = 'disconnecting'
break
self.tls = True
logging.debug('Established TLS connection with {}:{}'.format(self.host, self.port))
logging.debug('Established TLS connection with {}:{}'.format(self.host_print, self.port))
def _send_message(self, m):
if type(m) == message.Message and m.command == b'object':
logging.debug('{}:{} <- {}'.format(self.host, self.port, structure.Object.from_message(m)))
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, structure.Object.from_message(m)))
else:
logging.debug('{}:{} <- {}'.format(self.host, self.port, m))
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, m))
self.buffer_send += m.to_bytes()
def _on_connection_fully_established(self):
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host, self.port))
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host_print, self.port))
self.on_connection_fully_established_scheduled = False
if self.remote_version.services & 2: # NODE_SSL
if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL
self._do_tls_handshake()
with shared.objects_lock:
if len(shared.objects) > 0:
@ -182,11 +192,12 @@ class Connection(threading.Thread):
else:
self.send_queue.put(message.Inv(to_send))
to_send.clear()
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections.copy() if not c.server and c.status == 'fully_established'}
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections.copy() if c.network != 'i2p' and not c.server and c.status == 'fully_established'}
addr = set()
if len(shared.node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10) if a[1] != 'i2p'})
if len(shared.unchecked_node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10) if a[1] != 'i2p'})
if len(addr) != 0:
self.send_queue.put(message.Addr(addr))
self.status = 'fully_established'
@ -212,7 +223,7 @@ class Connection(threading.Thread):
h = message.Header.from_bytes(self.buffer_receive[:shared.header_length])
except ValueError as e:
self.status = 'disconnecting'
logging.warning('Received malformed message from {}:{}: {}'.format(self.host, self.port, e))
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
break
self.next_message_size += h.payload_length
else:
@ -220,7 +231,7 @@ class Connection(threading.Thread):
m = message.Message.from_bytes(self.buffer_receive[:self.next_message_size])
except ValueError as e:
self.status = 'disconnecting'
logging.warning('Received malformed message from {}:{}, {}'.format(self.host, self.port, e))
logging.warning('Received malformed message from {}:{}, {}'.format(self.host_print, self.port, e))
break
self.next_header = True
self.buffer_receive = self.buffer_receive[self.next_message_size:]
@ -230,13 +241,13 @@ class Connection(threading.Thread):
self._process_message(m)
except ValueError as e:
self.status = 'disconnecting'
logging.warning('Received malformed message from {}:{}: {}'.format(self.host, self.port, e))
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
break
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_bytes(m.to_bytes())
logging.debug('{}:{} -> {}'.format(self.host, self.port, str(version)))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, str(version)))
if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce:
self.status = 'disconnecting'
self.send_queue.put(None)
@ -250,16 +261,19 @@ class Connection(threading.Thread):
shared.node_pool.add((self.host, self.port))
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
if self.server:
self.send_queue.put(message.Version(self.host, self.port))
if self.network == 'ip':
self.send_queue.put(message.Version(self.host, self.port))
else:
self.send_queue.put(message.Version('127.0.0.1', 7656))
elif m.command == b'verack':
self.verack_received = True
logging.debug('{}:{} -> {}'.format(self.host, self.port, 'verack'))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
if self.server:
self.send_queue.put('fully_established')
elif m.command == b'inv':
inv = message.Inv.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host, self.port, inv))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, inv))
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
self.vectors_to_get.update(to_get)
@ -267,27 +281,27 @@ class Connection(threading.Thread):
self.vectors_to_send.difference_update(inv.vectors)
elif m.command == b'object':
obj = structure.Object.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host, self.port, obj))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host, self.port, getdata))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
self.vectors_to_send.update(getdata.vectors)
elif m.command == b'addr':
addr = message.Addr.from_message(m)
logging.debug('{}:{} -> {}'.format(self.host, self.port, addr))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
for a in addr.addresses:
shared.unchecked_node_pool.add((a.host, a.port))
elif m.command == b'ping':
logging.debug('{}:{} -> ping'.format(self.host, self.port))
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
logging.error('{}:{} -> error: {}'.format(self.host, self.port, m.payload))
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
else:
logging.debug('{}:{} -> {}'.format(self.host, self.port, m))
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
def _request_objects(self):
if self.vectors_to_get:

0
minode/i2p/__init__.py Normal file
View File

83
minode/i2p/controller.py Normal file
View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
import base64
import logging
import os
import socket
import threading
from i2p.util import receive_line, pub_from_priv
import shared
class I2PController(threading.Thread):
def __init__(self, host='127.0.0.1', port=7656, dest_priv=b''):
super().__init__(name='I2P Controller')
self.host = host
self.port = port
self.nick = b'MiNode_' + base64.b16encode(os.urandom(4)).lower()
self.s = socket.create_connection((self.host, self.port))
self.version_reply = []
self.init_connection()
if dest_priv:
self.dest_priv = dest_priv
self.dest_pub = pub_from_priv(dest_priv)
else:
self.dest_priv = b''
self.dest_pub = b''
self.generate_destination()
self.create_session()
def _receive_line(self):
line = receive_line(self.s)
logging.debug('I2PController <- ' + str(line))
return line
def _send(self, command):
logging.debug('I2PController -> ' + str(command))
self.s.sendall(command)
def init_connection(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
assert b'RESULT=OK' in self.version_reply
def generate_destination(self):
if b'VERSION=3.0' in self.version_reply:
# We will now receive old DSA_SHA1 destination :(
self._send(b'DEST GENERATE\n')
else:
self._send(b'DEST GENERATE SIGNATURE_TYPE=EdDSA_SHA512_Ed25519\n')
reply = self._receive_line().split()
for par in reply:
if par.startswith(b'PUB='):
self.dest_pub = par.replace(b'PUB=', b'')
if par.startswith(b'PRIV='):
self.dest_priv = par.replace(b'PRIV=', b'')
assert self.dest_priv
def create_session(self):
self._send(b'SESSION CREATE STYLE=STREAM ID=' + self.nick + b' DESTINATION=' + self.dest_priv + b'\n')
reply = self._receive_line().split()
assert b'RESULT=OK' in reply
def run(self):
self.s.settimeout(1)
while True:
if not shared.shutting_down:
try:
msg = self._receive_line().split(b' ')
if msg[0] == b'PING':
self._send(b'PONG ' + msg[1] + b'\n')
except socket.timeout:
pass
else:
logging.debug('Shutting down I2P Controller')
self.s.close()
break

43
minode/i2p/dialer.py Normal file
View File

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import logging
import socket
from connection import Connection
from i2p.util import receive_line
class I2PDialer(object):
def __init__(self, destination, nick, host='127.0.0.1', port=7656):
self.host = host
self.port = port
self.nick = nick
self.destination = destination
self.s = socket.create_connection((self.host, self.port))
self.version_reply = []
self._connect()
def _receive_line(self):
line = receive_line(self.s)
logging.debug('I2PDialer <-' + str(line))
return line
def _send(self, command):
logging.debug('I2PDialer ->' + str(command))
self.s.sendall(command)
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
assert b'RESULT=OK' in self.version_reply
self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n')
reply = self._receive_line().split(b' ')
assert b'RESULT=OK' in reply
def get_connection(self):
return Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination)

58
minode/i2p/listener.py Normal file
View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
from connection import Connection
from i2p.util import receive_line
import shared
class I2PListener(threading.Thread):
def __init__(self, nick, host='127.0.0.1', port=7656):
super().__init__(name='I2P Listener')
self.host = host
self.port = port
self.nick = nick
self.s = None
self.version_reply = []
self.create_socket()
def _receive_line(self):
line = receive_line(self.s)
logging.debug('I2PListener <-' + str(line))
return line
def _send(self, command):
logging.debug('I2PListener ->' + str(command))
self.s.sendall(command)
def create_socket(self):
self.s = socket.create_connection((self.host, self.port))
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
assert b'RESULT=OK' in self.version_reply
self._send(b'STREAM ACCEPT ID=' + self.nick + b'\n')
reply = self._receive_line().split(b' ')
assert b'RESULT=OK' in reply
self.s.settimeout(1)
def run(self):
while not shared.shutting_down:
try:
destination = self._receive_line().split()[0]
print(destination)
logging.info('Incoming I2P connection from: {}'.format(destination))
c = Connection(destination, 'i2p', self.s, 'i2p', True, destination)
c.start()
shared.connections.add(c)
self.create_socket()
except socket.timeout:
pass
logging.debug('Shutting down I2P Listener')

26
minode/i2p/util.py Normal file
View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
def receive_line(s):
data = b''
while b'\n' not in data:
d = s.recv(4096)
if not d:
raise ConnectionResetError
data += d
data = data.splitlines()
return data[0]
def pub_from_priv(priv):
priv = base64.b64decode(priv, altchars=b'-~')
# 256 for public key + 128 for signing key + 3 for certificate header + value of bytes priv[385:387]
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
pub = base64.b64encode(pub, altchars=b'-~')
return pub
def b32_from_pub(pub):
return base64.b32encode(hashlib.sha256(base64.b64decode(pub, b'-~')).digest()).replace(b"=", b"").lower() + b'.b32.i2p'

15
minode/i2p_test.py Normal file
View File

@ -0,0 +1,15 @@
import logging
from i2p.controller import I2PController
from i2p.listener import I2PListener
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] %(message)s')
i2p_controller = I2PController()
i2p_controller.start()
session_nick = i2p_controller.nick
i2p_listener = I2PListener(session_nick)
i2p_listener.start()

View File

@ -31,7 +31,7 @@ class Listener(threading.Thread):
if len(shared.connections) > shared.connection_limit:
conn.close()
else:
c = Connection(addr[0], addr[1], conn)
c = Connection(addr[0], addr[1], conn, True)
c.start()
shared.connections.add(c)
except socket.timeout:

View File

@ -10,6 +10,8 @@ import socket
from advertiser import Advertiser
from manager import Manager
from listener import Listener
import i2p.controller
import i2p.listener
import shared
@ -27,6 +29,7 @@ def parse_arguments():
parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true')
parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to')
parser.add_argument('--connection-limit', help='Maximum number of connections', type=int)
parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true')
args = parser.parse_args()
if args.port:
@ -43,19 +46,25 @@ def parse_arguments():
if args.no_outgoing:
shared.send_outgoing_connections = False
if args.trusted_peer:
colon_count = args.trusted_peer.count(':')
if colon_count == 0:
shared.trusted_peer = (args.trusted_peer, 8444)
if colon_count == 1:
addr = args.trusted_peer.split(':')
shared.trusted_peer = (addr[0], int(addr[1]))
if colon_count >= 2:
# IPv6 <3
addr = args.trusted_peer.split(']:')
addr[0] = addr[0][1:]
shared.trusted_peer = (addr[0], int(addr[1]))
if len(args.trusted_peer) > 50:
# I2P
shared.trusted_peer = (args.trusted_peer.encode(), 'i2p')
else:
colon_count = args.trusted_peer.count(':')
if colon_count == 0:
shared.trusted_peer = (args.trusted_peer, 8444)
if colon_count == 1:
addr = args.trusted_peer.split(':')
shared.trusted_peer = (addr[0], int(addr[1]))
if colon_count >= 2:
# IPv6 <3
addr = args.trusted_peer.split(']:')
addr[0] = addr[0][1:]
shared.trusted_peer = (addr[0], int(addr[1]))
if args.connection_limit:
shared.connection_limit = args.connection_limit
if args.i2p:
shared.i2p_enabled = True
def main():
@ -112,6 +121,47 @@ def main():
advertiser = Advertiser()
advertiser.start()
if shared.i2p_enabled:
dest_priv = b''
try:
with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file:
dest_priv = file.read()
logging.debug('Loaded I2P destination private key.')
except Exception as e:
logging.warning('Error while loading I2P destination private key.')
logging.warning(e)
logging.info('Starting I2P Controller and creating tunnels. This may take a while.')
i2p_controller = i2p.controller.I2PController(shared.i2p_sam_host, shared.i2p_sam_port, dest_priv)
i2p_controller.start()
shared.i2p_dest_pub = i2p_controller.dest_pub
shared.i2p_session_nick = i2p_controller.nick
logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode()))
logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode()))
logging.info('Starting I2P Listener')
i2p_listener = i2p.listener.I2PListener(i2p_controller.nick)
i2p_listener.start()
try:
with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file:
file.write(i2p_controller.dest_priv)
logging.debug('Saved I2P destination private key.')
except Exception as e:
logging.warning('Error while saving I2P destination private key.')
logging.warning(e)
try:
with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file:
file.write(shared.i2p_dest_pub)
logging.debug('Saved I2P destination public key.')
except Exception as e:
logging.warning('Error while saving I2P destination public key.')
logging.warning(e)
listener_ipv4 = None
listener_ipv6 = None

View File

@ -8,6 +8,7 @@ import threading
import time
from connection import Connection
from i2p.dialer import I2PDialer
import shared
@ -77,8 +78,16 @@ class Manager(threading.Thread):
for addr in to_connect:
if addr[0] in hosts:
continue
c = Connection(addr[0], addr[1])
c.start()
if addr[1] == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick:
c = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port).get_connection()
c.start()
else:
logging.debug('We were going to connect to an I2P peer but our tunnels are not ready')
continue
else:
c = Connection(addr[0], addr[1])
c.start()
hosts.add(c.host)
with shared.connections_lock:
shared.connections.add(c)

View File

@ -23,6 +23,12 @@ user_agent = b'/MiNode:0.2.2/'
timeout = 600
header_length = 24
i2p_enabled = False
i2p_sam_host = '127.0.0.1'
i2p_sam_port = 7656
i2p_session_nick = b''
i2p_dest_pub = b''
nonce_trials_per_byte = 1000
payload_length_extra_bytes = 1000

View File

@ -1,2 +1,2 @@
#!/bin/sh
python3 src/main.py "$@"
python3 minode/main.py "$@"