I2P improvements

- local destination publishing

- destination gathering
This commit is contained in:
TheKysek 2017-07-01 15:05:06 +02:00
parent 75734f94a6
commit 057b25a812
No known key found for this signature in database
GPG Key ID: 50D9AF00D0B1C497
12 changed files with 211 additions and 71 deletions

2
i2p_bridge.sh Normal file
View File

@ -0,0 +1,2 @@
#!/bin/sh
python3 minode/main.py --i2p --no-ip --host 127.0.0.1 "$@"

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import base64
import errno
import logging
import os
import random
import select
import socket
@ -186,11 +186,11 @@ class Connection(threading.Thread):
if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL
self._do_tls_handshake()
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections.copy() if c.network != 'i2p' and not c.server and c.status == 'fully_established'}
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'}
if len(shared.node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10) if a[1] != 'i2p'})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)})
if len(shared.unchecked_node_pool) > 10:
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10) if a[1] != 'i2p'})
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)})
if len(addr) != 0:
self.send_queue.put(message.Addr(addr))
@ -263,8 +263,11 @@ class Connection(threading.Thread):
self.remote_version = version
if not self.server:
self.send_queue.put('fully_established')
if self.network == 'ip':
shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port))
shared.node_pool.add((self.host, self.port))
elif self.network == 'i2p':
shared.i2p_node_pool.add((self.host, 'i2p'))
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
if self.server:
if self.network == 'ip':
@ -291,6 +294,11 @@ class Connection(threading.Thread):
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
shared.objects[obj.vector] = obj
if obj.object_type == shared.i2p_dest_obj_type:
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool')
logging.debug(dest)
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
shared.vector_advertise_queue.put(obj.vector)
elif m.command == b'getdata':
getdata = message.GetData.from_message(m)

View File

@ -36,11 +36,11 @@ class I2PController(threading.Thread):
def _receive_line(self):
line = receive_line(self.s)
logging.debug('I2PController <- ' + str(line))
# logging.debug('I2PController <- ' + str(line))
return line
def _send(self, command):
logging.debug('I2PController -> ' + str(command))
# logging.debug('I2PController -> ' + str(command))
self.s.sendall(command)
def init_connection(self):

View File

@ -1,25 +1,35 @@
# -*- coding: utf-8 -*-
import logging
import socket
import threading
import shared
from connection import Connection
from i2p.util import receive_line
class I2PDialer(object):
def __init__(self, destination, nick, host='127.0.0.1', port=7656):
self.host = host
self.port = port
class I2PDialer(threading.Thread):
def __init__(self, destination, nick, sam_host='127.0.0.1', sam_port=7656):
self.sam_host = sam_host
self.sam_port = sam_port
self.nick = nick
self.destination = destination
self.s = socket.create_connection((self.host, self.port))
super().__init__(name='I2P Dial to {}'.format(self.destination))
self.s = socket.create_connection((self.sam_host, self.sam_port))
self.version_reply = []
self.success = True
def run(self):
logging.debug('Connecting to {}'.format(self.destination))
self._connect()
if not shared.shutting_down and self.success:
c = Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination)
c.start()
shared.connections.add(c)
def _receive_line(self):
line = receive_line(self.s)
@ -33,11 +43,12 @@ class I2PDialer(object):
def _connect(self):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
assert b'RESULT=OK' in self.version_reply
if b'RESULT=OK' not in self.version_reply:
logging.warning('Error while connecting to {}'.format(self.destination))
self.success = False
self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n')
reply = self._receive_line().split(b' ')
assert b'RESULT=OK' in reply
def get_connection(self):
return Connection(self.destination, 'i2p', self.s, 'i2p', False, self.destination)
if b'RESULT=OK' not in reply:
logging.warning('Error while connecting to {}'.format(self.destination))
self.success = False

View File

@ -24,11 +24,11 @@ class I2PListener(threading.Thread):
def _receive_line(self):
line = receive_line(self.s)
logging.debug('I2PListener <-' + str(line))
# logging.debug('I2PListener <- ' + str(line))
return line
def _send(self, command):
logging.debug('I2PListener ->' + str(command))
# logging.debug('I2PListener -> ' + str(command))
self.s.sendall(command)
def create_socket(self):

View File

@ -0,0 +1 @@
IPHBFm1bfQ9HrUkq07aomTAGn~W1wChE53xprAqIftsF18cuoUCJbMYhdJl~pljhvAXHKDSePdsSWecg8yP3st0Ib0h429XaOdrxpoFJ6MI1ofkg-KFtnZ6sX~Yp5GD-z-Nqdu6H0YBlf~y18ToOT6vTUvyE5Jsb105LmRMUAP0pDon4-da9r2wD~rxGOuvkrT83CftfxAIIT1z3M6ouAFI3UBq-guEyiZszM-01yQ-IgVBXsvnou8DXrlysYeeaimL6LoLhJgTnXIDfHCfUsHbgYK0JvRdimu-eMs~BRTT7-o4N5RJjVDfsS4CUHa6JwuWYg3JNSfaJoGFlM2xeGjNSJUs5e7PkcXeqCTKZQERbdIJcFz~rGcTfvc-OfXjMf6VfU2XORKcYiA21zkHMOkQvmE1dATP8VpQTKcYYZrQrRAc5Wxn7ayf9Gdwtq0EZXeydZv36RVJ03E4CZUGQMxXOFGUXwLFXQ9QCbsbXSoukd3rAGoPgE~GboO1YJh3hAAAA
1 IPHBFm1bfQ9HrUkq07aomTAGn~W1wChE53xprAqIftsF18cuoUCJbMYhdJl~pljhvAXHKDSePdsSWecg8yP3st0Ib0h429XaOdrxpoFJ6MI1ofkg-KFtnZ6sX~Yp5GD-z-Nqdu6H0YBlf~y18ToOT6vTUvyE5Jsb105LmRMUAP0pDon4-da9r2wD~rxGOuvkrT83CftfxAIIT1z3M6ouAFI3UBq-guEyiZszM-01yQ-IgVBXsvnou8DXrlysYeeaimL6LoLhJgTnXIDfHCfUsHbgYK0JvRdimu-eMs~BRTT7-o4N5RJjVDfsS4CUHa6JwuWYg3JNSfaJoGFlM2xeGjNSJUs5e7PkcXeqCTKZQERbdIJcFz~rGcTfvc-OfXjMf6VfU2XORKcYiA21zkHMOkQvmE1dATP8VpQTKcYYZrQrRAc5Wxn7ayf9Gdwtq0EZXeydZv36RVJ03E4CZUGQMxXOFGUXwLFXQ9QCbsbXSoukd3rAGoPgE~GboO1YJh3hAAAA

View File

@ -1,15 +0,0 @@
import logging
from i2p.controller import I2PController
from i2p.listener import I2PListener
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [%(levelname)s] %(message)s')
i2p_controller = I2PController()
i2p_controller.start()
session_nick = i2p_controller.nick
i2p_listener = I2PListener(session_nick)
i2p_listener.start()

View File

@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
@ -23,10 +25,12 @@ def handler(s, f):
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
parser.add_argument('--host', help='Listening host')
parser.add_argument('--debug', help='Enable debug logging', action='store_true')
parser.add_argument('--data-dir', help='Path to data directory')
parser.add_argument('--no-incoming', help='Do not listen for incoming connections', action='store_true')
parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true')
parser.add_argument('--no-ip', help='Do not use IP network', action='store_true')
parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to')
parser.add_argument('--connection-limit', help='Maximum number of connections', type=int)
parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true')
@ -37,6 +41,8 @@ def parse_arguments():
args = parser.parse_args()
if args.port:
shared.listening_port = args.port
if args.host:
shared.listening_host = args.host
if args.debug:
shared.log_level = logging.DEBUG
if args.data_dir:
@ -48,6 +54,8 @@ def parse_arguments():
shared.listen_for_connections = False
if args.no_outgoing:
shared.send_outgoing_connections = False
if args.no_ip:
shared.ip_enabled = False
if args.trusted_peer:
if len(args.trusted_peer) > 50:
# I2P
@ -105,12 +113,24 @@ def main():
logging.warning('Error while loading nodes from disk.')
logging.warning(e)
try:
with open(shared.data_directory + 'i2p_nodes.pickle', mode='br') as file:
shared.i2p_node_pool = pickle.load(file)
except Exception as e:
logging.warning('Error while loading nodes from disk.')
logging.warning(e)
with open(os.path.join(shared.source_directory, 'core_nodes.csv'), mode='r', newline='') as f:
reader = csv.reader(f)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
if not shared.trusted_peer:
with open(os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), mode='r', newline='') as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
if shared.ip_enabled and not shared.trusted_peer:
try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
shared.unchecked_node_pool.add((item[4][0], 8080))
@ -122,15 +142,12 @@ def main():
logging.error('Error during DNS bootstrap')
logging.error(e)
manager = Manager()
manager.clean_objects()
manager.clean_connections()
manager.start()
advertiser = Advertiser()
advertiser.start()
if shared.i2p_enabled:
# Grab I2P destinations from old object file
for obj in shared.objects.values():
if obj.object_type == shared.i2p_dest_obj_type:
shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p'))
dest_priv = b''
try:
@ -171,20 +188,28 @@ def main():
logging.warning('Error while saving I2P destination public key.')
logging.warning(e)
manager = Manager()
manager.clean_objects()
manager.clean_connections()
manager.start()
advertiser = Advertiser()
advertiser.start()
listener_ipv4 = None
listener_ipv6 = None
if shared.listen_for_connections:
if socket.has_ipv6:
try:
listener_ipv6 = Listener('', shared.listening_port, family=socket.AF_INET6)
listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6)
listener_ipv6.start()
except Exception as e:
logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port))
logging.warning(e)
try:
listener_ipv4 = Listener('', shared.listening_port)
listener_ipv4 = Listener(shared.listening_host, shared.listening_port)
listener_ipv4.start()
except Exception as e:
if listener_ipv6:
@ -196,4 +221,5 @@ def main():
logging.error(e)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
main()

View File

@ -9,7 +9,9 @@ import time
from connection import Connection
from i2p.dialer import I2PDialer
import pow
import shared
import structure
class Manager(threading.Thread):
@ -20,11 +22,15 @@ class Manager(threading.Thread):
self.last_cleaned_connections = time.time()
self.last_pickled_objects = time.time()
self.last_pickled_nodes = time.time()
self.last_published_i2p_destination = time.time() - 15 * 60
def run(self):
while True:
time.sleep(0.8)
now = time.time()
if shared.shutting_down:
logging.debug('Shutting down Manager')
break
if now - self.last_cleaned_objects > 90:
self.clean_objects()
self.last_cleaned_objects = now
@ -37,10 +43,9 @@ class Manager(threading.Thread):
if now - self.last_pickled_nodes > 60:
self.pickle_nodes()
self.last_pickled_nodes = now
if shared.shutting_down:
logging.debug('Shutting down Manager')
break
if now - self.last_published_i2p_destination > 3600:
self.publish_i2p_destination()
self.last_published_i2p_destination = now
@staticmethod
def clean_objects():
@ -62,10 +67,19 @@ class Manager(threading.Thread):
hosts.add(c.host)
if not c.server:
outgoing_connections += 1
for d in shared.i2p_dialers.copy():
hosts.add(d.destination)
if not d.is_alive():
shared.i2p_dialers.remove(d)
to_connect = set()
if shared.trusted_peer:
to_connect.add(shared.trusted_peer)
if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections and not shared.trusted_peer:
if shared.ip_enabled:
if len(shared.unchecked_node_pool) > 16:
to_connect.update(random.sample(shared.unchecked_node_pool, 16))
else:
@ -75,16 +89,31 @@ class Manager(threading.Thread):
to_connect.update(random.sample(shared.node_pool, 8))
else:
to_connect.update(shared.node_pool)
if shared.i2p_enabled:
if len(shared.i2p_unchecked_node_pool) > 16:
to_connect.update(random.sample(shared.i2p_unchecked_node_pool, 16))
else:
to_connect.update(shared.i2p_unchecked_node_pool)
shared.i2p_unchecked_node_pool.difference_update(to_connect)
if len(shared.i2p_node_pool) > 8:
to_connect.update(random.sample(shared.i2p_node_pool, 8))
else:
to_connect.update(shared.i2p_node_pool)
for addr in to_connect:
if addr[0] in hosts:
continue
if addr[1] == 'i2p' and shared.i2p_enabled:
if shared.i2p_session_nick:
try:
c = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port).get_connection()
c.start()
except:
pass
d = I2PDialer(addr[0], shared.i2p_session_nick, shared.i2p_sam_host, shared.i2p_sam_port)
d.start()
hosts.add(d.destination)
shared.i2p_dialers.add(d)
except Exception as e:
logging.warning('Exception while trying to establish an I2P connection')
logging.warning(e)
else:
logging.debug('We were going to connect to an I2P peer but our tunnels are not ready')
continue
@ -113,10 +142,25 @@ class Manager(threading.Thread):
shared.node_pool = set(random.sample(shared.node_pool, 10000))
if len(shared.unchecked_node_pool) > 1000:
shared.unchecked_node_pool = set(random.sample(shared.unchecked_node_pool, 1000))
if len(shared.i2p_node_pool) > 1000:
shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000))
if len(shared.i2p_unchecked_node_pool) > 100:
shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100))
try:
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
pickle.dump(shared.node_pool, file, protocol=3)
with open(shared.data_directory + 'i2p_nodes.pickle', mode='bw') as file:
pickle.dump(shared.i2p_node_pool, file, protocol=3)
logging.debug('Saved nodes')
except Exception as e:
logging.warning('Error while saving nodes')
logging.warning(e)
@staticmethod
def publish_i2p_destination():
if shared.i2p_session_nick:
logging.info('Publishing our I2P destination')
dest_pub_raw = base64.b64decode(shared.i2p_dest_pub, altchars=b'-~')
obj = structure.Object(b'\x00' * 8, int(time.time() + 2 * 3600), 0x493250, 1, 1, dest_pub_raw)
pow.do_pow_and_publish(obj)

46
minode/pow.py Normal file
View File

@ -0,0 +1,46 @@
import hashlib
import multiprocessing
import shared
import struct
import threading
import time
import structure
def _pow_worker(target, initial_hash, q):
nonce = 0
print("target: {}, initial_hash: {}".format(target, initial_hash.hex()))
trial_value = target + 1
while trial_value > target:
nonce += 1
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
q.put(struct.pack('>Q', nonce))
def _worker(obj):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
print("Starting POW process")
t = time.time()
p.start()
nonce = q.get()
p.join()
print("Finished doing POW, nonce: {}, time: {}s".format(nonce, time.time() - t))
obj = structure.Object(nonce, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.object_payload)
print("Object vector is {}".format(obj.vector.hex()))
print("Advertising in 10s")
time.sleep(10)
print("shared.objects len: {}".format(len(shared.objects)))
with shared.objects_lock:
shared.objects[obj.vector] = obj
shared.vector_advertise_queue.put(obj.vector)
def do_pow_and_publish(obj):
t = threading.Thread(target=_worker, args=(obj, ))
t.start()

View File

@ -5,12 +5,13 @@ import queue
import threading
listening_port = 8444
listening_host = ''
send_outgoing_connections = True
listen_for_connections = True
data_directory = 'minode_data/'
source_directory = os.path.dirname(os.path.realpath(__file__))
trusted_peer = None
# trusted_peer = ('127.0.0.1', 8444)
ip_enabled = True
log_level = logging.INFO
@ -22,6 +23,7 @@ nonce = os.urandom(8)
user_agent = b'/MiNode:0.2.2/'
timeout = 600
header_length = 24
i2p_dest_obj_type = 0x493250
i2p_enabled = False
i2p_sam_host = '127.0.0.1'
@ -41,6 +43,8 @@ address_advertise_queue = queue.Queue()
connections = set()
connections_lock = threading.Lock()
i2p_dialers = set()
hosts = set()
core_nodes = set()
@ -48,6 +52,10 @@ core_nodes = set()
node_pool = set()
unchecked_node_pool = set()
i2p_core_nodes = set()
i2p_node_pool = set()
i2p_unchecked_node_pool = set()
outgoing_connections = 8
connection_limit = 250

View File

@ -95,12 +95,21 @@ class Object(object):
dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big')
target = int(2**64/(shared.nonce_trials_per_byte*(length+(dt*length)/(2**16))))
target = self.pow_target()
if target < pow_value:
logging.warning('Rejecting object {}, reason: insufficient pow'.format(base64.b16encode(self.vector).decode()))
return False
return True
def pow_target(self):
data = self.to_bytes()[8:]
length = len(data) + 8 + shared.payload_length_extra_bytes
dt = max(self.expires_time - time.time(), 0)
return int(2 ** 64 / (shared.nonce_trials_per_byte * (length + (dt * length) / (2 ** 16))))
def pow_initial_hash(self):
return hashlib.sha512(self.to_bytes()[8:]).digest()
class NetAddrNoPrefix(object):
def __init__(self, services, host, port):