diff --git a/i2p_bridge.sh b/i2p_bridge.sh new file mode 100644 index 0000000..8bf73c3 --- /dev/null +++ b/i2p_bridge.sh @@ -0,0 +1,2 @@ +#!/bin/sh +python3 minode/main.py --i2p --no-ip --host 127.0.0.1 "$@" \ No newline at end of file diff --git a/minode/connection.py b/minode/connection.py index ae41051..66066ca 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- +import base64 import errno import logging -import os import random import select import socket @@ -186,11 +186,11 @@ class Connection(threading.Thread): if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL self._do_tls_handshake() - addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections.copy() if c.network != 'i2p' and not c.server and c.status == 'fully_established'} + addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'} if len(shared.node_pool) > 10: - addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10) if a[1] != 'i2p'}) + addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)}) if len(shared.unchecked_node_pool) > 10: - addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10) if a[1] != 'i2p'}) + addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)}) if len(addr) != 0: self.send_queue.put(message.Addr(addr)) @@ -263,8 +263,11 @@ class Connection(threading.Thread): self.remote_version = version if not self.server: self.send_queue.put('fully_established') - shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port)) - shared.node_pool.add((self.host, self.port)) + if self.network == 'ip': + shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port)) + shared.node_pool.add((self.host, self.port)) + elif self.network == 'i2p': + shared.i2p_node_pool.add((self.host, 'i2p')) shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port)) if self.server: if self.network == 'ip': @@ -291,6 +294,11 @@ class Connection(threading.Thread): if obj.is_valid() and obj.vector not in shared.objects: with shared.objects_lock: shared.objects[obj.vector] = obj + if obj.object_type == shared.i2p_dest_obj_type: + dest = base64.b64encode(obj.object_payload, altchars=b'-~') + logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool') + logging.debug(dest) + shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.vector_advertise_queue.put(obj.vector) elif m.command == b'getdata': getdata = message.GetData.from_message(m) diff --git a/minode/i2p/controller.py b/minode/i2p/controller.py index 54fc739..d76c267 100644 --- a/minode/i2p/controller.py +++ b/minode/i2p/controller.py @@ -36,11 +36,11 @@ class I2PController(threading.Thread): def _receive_line(self): line = receive_line(self.s) - logging.debug('I2PController <- ' + str(line)) + # logging.debug('I2PController <- ' + str(line)) return line def _send(self, command): - logging.debug('I2PController -> ' + str(command)) + # logging.debug('I2PController -> ' + str(command)) self.s.sendall(command) def init_connection(self): diff --git a/minode/i2p/dialer.py b/minode/i2p/dialer.py index a4af5e9..d8bf755 100644 --- a/minode/i2p/dialer.py +++ b/minode/i2p/dialer.py @@ -1,43 +1,54 @@ # -*- coding: utf-8 -*- import logging import socket +import threading +import shared from connection import Connection from i2p.util import receive_line -class I2PDialer(object): - def __init__(self, destination, nick, host='127.0.0.1', port=7656): - - self.host = host - self.port = port +class I2PDialer(threading.Thread): + def __init__(self, destination, nick, sam_host='127.0.0.1', sam_port=7656): + self.sam_host = sam_host + self.sam_port = sam_port self.nick = nick self.destination = destination - self.s = socket.create_connection((self.host, self.port)) + super().__init__(name='I2P Dial to {}'.format(self.destination)) + + self.s = socket.create_connection((self.sam_host, self.sam_port)) self.version_reply = [] + self.success = True + def run(self): + logging.debug('Connecting to {}'.format(self.destination)) self._connect() + if not shared.shutting_down and self.success: + c = Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination) + c.start() + shared.connections.add(c) def _receive_line(self): line = receive_line(self.s) - logging.debug('I2PDialer <-' + str(line)) + logging.debug('I2PDialer <- ' + str(line)) return line def _send(self, command): - logging.debug('I2PDialer ->' + str(command)) + logging.debug('I2PDialer -> ' + str(command)) self.s.sendall(command) def _connect(self): self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n') self.version_reply = self._receive_line().split() - assert b'RESULT=OK' in self.version_reply + if b'RESULT=OK' not in self.version_reply: + logging.warning('Error while connecting to {}'.format(self.destination)) + self.success = False self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n') reply = self._receive_line().split(b' ') - assert b'RESULT=OK' in reply - - def get_connection(self): - return Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination) + if b'RESULT=OK' not in reply: + logging.warning('Error while connecting to {}'.format(self.destination)) + self.success = False diff --git a/minode/i2p/listener.py b/minode/i2p/listener.py index 4dd5545..f68c2b4 100644 --- a/minode/i2p/listener.py +++ b/minode/i2p/listener.py @@ -24,11 +24,11 @@ class I2PListener(threading.Thread): def _receive_line(self): line = receive_line(self.s) - logging.debug('I2PListener <-' + str(line)) + # logging.debug('I2PListener <- ' + str(line)) return line def _send(self, command): - logging.debug('I2PListener ->' + str(command)) + # logging.debug('I2PListener -> ' + str(command)) self.s.sendall(command) def create_socket(self): diff --git a/minode/i2p_core_nodes.csv b/minode/i2p_core_nodes.csv new file mode 100644 index 0000000..3c777aa --- /dev/null +++ b/minode/i2p_core_nodes.csv @@ -0,0 +1 @@ +IPHBFm1bfQ9HrUkq07aomTAGn~W1wChE53xprAqIftsF18cuoUCJbMYhdJl~pljhvAXHKDSePdsSWecg8yP3st0Ib0h429XaOdrxpoFJ6MI1ofkg-KFtnZ6sX~Yp5GD-z-Nqdu6H0YBlf~y18ToOT6vTUvyE5Jsb105LmRMUAP0pDon4-da9r2wD~rxGOuvkrT83CftfxAIIT1z3M6ouAFI3UBq-guEyiZszM-01yQ-IgVBXsvnou8DXrlysYeeaimL6LoLhJgTnXIDfHCfUsHbgYK0JvRdimu-eMs~BRTT7-o4N5RJjVDfsS4CUHa6JwuWYg3JNSfaJoGFlM2xeGjNSJUs5e7PkcXeqCTKZQERbdIJcFz~rGcTfvc-OfXjMf6VfU2XORKcYiA21zkHMOkQvmE1dATP8VpQTKcYYZrQrRAc5Wxn7ayf9Gdwtq0EZXeydZv36RVJ03E4CZUGQMxXOFGUXwLFXQ9QCbsbXSoukd3rAGoPgE~GboO1YJh3hAAAA diff --git a/minode/i2p_test.py b/minode/i2p_test.py deleted file mode 100644 index d51907f..0000000 --- a/minode/i2p_test.py +++ /dev/null @@ -1,15 +0,0 @@ -import logging - -from i2p.controller import I2PController -from i2p.listener import I2PListener - -logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] %(message)s') - -i2p_controller = I2PController() - -i2p_controller.start() - -session_nick = i2p_controller.nick - -i2p_listener = I2PListener(session_nick) -i2p_listener.start() diff --git a/minode/main.py b/minode/main.py index e691d8d..df8c098 100644 --- a/minode/main.py +++ b/minode/main.py @@ -1,7 +1,9 @@ # -*- coding: utf-8 -*- import argparse +import base64 import csv import logging +import multiprocessing import os import pickle import signal @@ -23,10 +25,12 @@ def handler(s, f): def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('-p', '--port', help='Port to listen on', type=int) + parser.add_argument('--host', help='Listening host') parser.add_argument('--debug', help='Enable debug logging', action='store_true') parser.add_argument('--data-dir', help='Path to data directory') parser.add_argument('--no-incoming', help='Do not listen for incoming connections', action='store_true') parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true') + parser.add_argument('--no-ip', help='Do not use IP network', action='store_true') parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to') parser.add_argument('--connection-limit', help='Maximum number of connections', type=int) parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true') @@ -37,6 +41,8 @@ def parse_arguments(): args = parser.parse_args() if args.port: shared.listening_port = args.port + if args.host: + shared.listening_host = args.host if args.debug: shared.log_level = logging.DEBUG if args.data_dir: @@ -48,6 +54,8 @@ def parse_arguments(): shared.listen_for_connections = False if args.no_outgoing: shared.send_outgoing_connections = False + if args.no_ip: + shared.ip_enabled = False if args.trusted_peer: if len(args.trusted_peer) > 50: # I2P @@ -105,12 +113,24 @@ def main(): logging.warning('Error while loading nodes from disk.') logging.warning(e) + try: + with open(shared.data_directory + 'i2p_nodes.pickle', mode='br') as file: + shared.i2p_node_pool = pickle.load(file) + except Exception as e: + logging.warning('Error while loading nodes from disk.') + logging.warning(e) + with open(os.path.join(shared.source_directory, 'core_nodes.csv'), mode='r', newline='') as f: reader = csv.reader(f) shared.core_nodes = {tuple(row) for row in reader} shared.node_pool.update(shared.core_nodes) - if not shared.trusted_peer: + with open(os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), mode='r', newline='') as f: + reader = csv.reader(f) + shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader} + shared.i2p_node_pool.update(shared.i2p_core_nodes) + + if shared.ip_enabled and not shared.trusted_peer: try: for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): shared.unchecked_node_pool.add((item[4][0], 8080)) @@ -122,15 +142,12 @@ def main(): logging.error('Error during DNS bootstrap') logging.error(e) - manager = Manager() - manager.clean_objects() - manager.clean_connections() - manager.start() - - advertiser = Advertiser() - advertiser.start() - if shared.i2p_enabled: + # Grab I2P destinations from old object file + for obj in shared.objects.values(): + if obj.object_type == shared.i2p_dest_obj_type: + shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) + dest_priv = b'' try: @@ -171,20 +188,28 @@ def main(): logging.warning('Error while saving I2P destination public key.') logging.warning(e) + manager = Manager() + manager.clean_objects() + manager.clean_connections() + manager.start() + + advertiser = Advertiser() + advertiser.start() + listener_ipv4 = None listener_ipv6 = None if shared.listen_for_connections: if socket.has_ipv6: try: - listener_ipv6 = Listener('', shared.listening_port, family=socket.AF_INET6) + listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6) listener_ipv6.start() except Exception as e: logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port)) logging.warning(e) try: - listener_ipv4 = Listener('', shared.listening_port) + listener_ipv4 = Listener(shared.listening_host, shared.listening_port) listener_ipv4.start() except Exception as e: if listener_ipv6: @@ -196,4 +221,5 @@ def main(): logging.error(e) if __name__ == '__main__': + multiprocessing.set_start_method('spawn') main() diff --git a/minode/manager.py b/minode/manager.py index cbd309a..954dac6 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -9,7 +9,9 @@ import time from connection import Connection from i2p.dialer import I2PDialer +import pow import shared +import structure class Manager(threading.Thread): @@ -20,11 +22,15 @@ class Manager(threading.Thread): self.last_cleaned_connections = time.time() self.last_pickled_objects = time.time() self.last_pickled_nodes = time.time() + self.last_published_i2p_destination = time.time() - 15 * 60 def run(self): while True: time.sleep(0.8) now = time.time() + if shared.shutting_down: + logging.debug('Shutting down Manager') + break if now - self.last_cleaned_objects > 90: self.clean_objects() self.last_cleaned_objects = now @@ -37,10 +43,9 @@ class Manager(threading.Thread): if now - self.last_pickled_nodes > 60: self.pickle_nodes() self.last_pickled_nodes = now - - if shared.shutting_down: - logging.debug('Shutting down Manager') - break + if now - self.last_published_i2p_destination > 3600: + self.publish_i2p_destination() + self.last_published_i2p_destination = now @staticmethod def clean_objects(): @@ -62,38 +67,62 @@ class Manager(threading.Thread): hosts.add(c.host) if not c.server: outgoing_connections += 1 + + for d in shared.i2p_dialers.copy(): + hosts.add(d.destination) + if not d.is_alive(): + shared.i2p_dialers.remove(d) + to_connect = set() if shared.trusted_peer: to_connect.add(shared.trusted_peer) + if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections and not shared.trusted_peer: - if len(shared.unchecked_node_pool) > 16: - to_connect.update(random.sample(shared.unchecked_node_pool, 16)) - else: - to_connect.update(shared.unchecked_node_pool) - shared.unchecked_node_pool.difference_update(to_connect) - if len(shared.node_pool) > 8: - to_connect.update(random.sample(shared.node_pool, 8)) - else: - to_connect.update(shared.node_pool) + + if shared.ip_enabled: + if len(shared.unchecked_node_pool) > 16: + to_connect.update(random.sample(shared.unchecked_node_pool, 16)) + else: + to_connect.update(shared.unchecked_node_pool) + shared.unchecked_node_pool.difference_update(to_connect) + if len(shared.node_pool) > 8: + to_connect.update(random.sample(shared.node_pool, 8)) + else: + to_connect.update(shared.node_pool) + + if shared.i2p_enabled: + if len(shared.i2p_unchecked_node_pool) > 16: + to_connect.update(random.sample(shared.i2p_unchecked_node_pool, 16)) + else: + to_connect.update(shared.i2p_unchecked_node_pool) + shared.i2p_unchecked_node_pool.difference_update(to_connect) + if len(shared.i2p_node_pool) > 8: + to_connect.update(random.sample(shared.i2p_node_pool, 8)) + else: + to_connect.update(shared.i2p_node_pool) + for addr in to_connect: if addr[0] in hosts: continue if addr[1] == 'i2p' and shared.i2p_enabled: if shared.i2p_session_nick: try: - c = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port).get_connection() - c.start() - except: - pass + d = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port) + d.start() + hosts.add(d.destination) + shared.i2p_dialers.add(d) + except Exception as e: + logging.warning('Exception while trying to establish an I2P connection') + logging.warning(e) else: logging.debug('We were going to connect to an I2P peer but our tunnels are not ready') continue else: c = Connection(addr[0], addr[1]) c.start() - hosts.add(c.host) - with shared.connections_lock: - shared.connections.add(c) + hosts.add(c.host) + with shared.connections_lock: + shared.connections.add(c) shared.hosts = hosts @staticmethod @@ -113,10 +142,25 @@ class Manager(threading.Thread): shared.node_pool = set(random.sample(shared.node_pool, 10000)) if len(shared.unchecked_node_pool) > 1000: shared.unchecked_node_pool = set(random.sample(shared.unchecked_node_pool, 1000)) + + if len(shared.i2p_node_pool) > 1000: + shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000)) + if len(shared.i2p_unchecked_node_pool) > 100: + shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100)) try: with open(shared.data_directory + 'nodes.pickle', mode='bw') as file: pickle.dump(shared.node_pool, file, protocol=3) + with open(shared.data_directory + 'i2p_nodes.pickle', mode='bw') as file: + pickle.dump(shared.i2p_node_pool, file, protocol=3) logging.debug('Saved nodes') except Exception as e: logging.warning('Error while saving nodes') logging.warning(e) + + @staticmethod + def publish_i2p_destination(): + if shared.i2p_session_nick: + logging.info('Publishing our I2P destination') + dest_pub_raw = base64.b64decode(shared.i2p_dest_pub, altchars=b'-~') + obj = structure.Object(b'\x00' * 8, int(time.time() + 2 * 3600), 0x493250, 1, 1, dest_pub_raw) + pow.do_pow_and_publish(obj) diff --git a/minode/pow.py b/minode/pow.py new file mode 100644 index 0000000..24151ad --- /dev/null +++ b/minode/pow.py @@ -0,0 +1,46 @@ +import hashlib +import multiprocessing +import shared +import struct +import threading +import time + +import structure + + +def _pow_worker(target, initial_hash, q): + nonce = 0 + print("target: {}, initial_hash: {}".format(target, initial_hash.hex())) + trial_value = target + 1 + + while trial_value > target: + nonce += 1 + trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0] + + q.put(struct.pack('>Q', nonce)) + + +def _worker(obj): + q = multiprocessing.Queue() + p = multiprocessing.Process(target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q)) + + print("Starting POW process") + t = time.time() + p.start() + nonce = q.get() + p.join() + print("Finished doing POW, nonce: {}, time: {}s".format(nonce, time.time() - t)) + + obj = structure.Object(nonce, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.object_payload) + print("Object vector is {}".format(obj.vector.hex())) + print("Advertising in 10s") + time.sleep(10) + print("shared.objects len: {}".format(len(shared.objects))) + with shared.objects_lock: + shared.objects[obj.vector] = obj + shared.vector_advertise_queue.put(obj.vector) + + +def do_pow_and_publish(obj): + t = threading.Thread(target=_worker, args=(obj, )) + t.start() diff --git a/minode/shared.py b/minode/shared.py index 97c53dc..748bc7d 100644 --- a/minode/shared.py +++ b/minode/shared.py @@ -5,12 +5,13 @@ import queue import threading listening_port = 8444 +listening_host = '' send_outgoing_connections = True listen_for_connections = True data_directory = 'minode_data/' source_directory = os.path.dirname(os.path.realpath(__file__)) trusted_peer = None -# trusted_peer = ('127.0.0.1', 8444) +ip_enabled = True log_level = logging.INFO @@ -22,6 +23,7 @@ nonce = os.urandom(8) user_agent = b'/MiNode:0.2.2/' timeout = 600 header_length = 24 +i2p_dest_obj_type = 0x493250 i2p_enabled = False i2p_sam_host = '127.0.0.1' @@ -41,6 +43,8 @@ address_advertise_queue = queue.Queue() connections = set() connections_lock = threading.Lock() +i2p_dialers = set() + hosts = set() core_nodes = set() @@ -48,6 +52,10 @@ core_nodes = set() node_pool = set() unchecked_node_pool = set() +i2p_core_nodes = set() +i2p_node_pool = set() +i2p_unchecked_node_pool = set() + outgoing_connections = 8 connection_limit = 250 diff --git a/minode/structure.py b/minode/structure.py index 659ad6f..b44bd80 100644 --- a/minode/structure.py +++ b/minode/structure.py @@ -95,12 +95,21 @@ class Object(object): dt = max(self.expires_time - time.time(), 0) h = hashlib.sha512(data).digest() pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big') - target = int(2**64/(shared.nonce_trials_per_byte*(length+(dt*length)/(2**16)))) + target = self.pow_target() if target < pow_value: logging.warning('Rejecting object {}, reason: insufficient pow'.format(base64.b16encode(self.vector).decode())) return False return True + def pow_target(self): + data = self.to_bytes()[8:] + length = len(data) + 8 + shared.payload_length_extra_bytes + dt = max(self.expires_time - time.time(), 0) + return int(2 ** 64 / (shared.nonce_trials_per_byte * (length + (dt * length) / (2 ** 16)))) + + def pow_initial_hash(self): + return hashlib.sha512(self.to_bytes()[8:]).digest() + class NetAddrNoPrefix(object): def __init__(self, services, host, port):