From 2085f0128185ee73f7b87d5ead48c6d90b8c0bc1 Mon Sep 17 00:00:00 2001 From: TheKysek Date: Thu, 20 Jul 2017 18:22:59 +0200 Subject: [PATCH] Code cleanup --- minode/connection.py | 7 ++ minode/main.py | 191 ++++++++++++++++++++++++------------------- minode/manager.py | 7 +- 3 files changed, 116 insertions(+), 89 deletions(-) diff --git a/minode/connection.py b/minode/connection.py index a5e8860..0536298 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -283,6 +283,7 @@ class Connection(threading.Thread): self.send_queue.put(message.Version(self.host, self.port)) else: self.send_queue.put(message.Version('127.0.0.1', 7656)) + elif m.command == b'verack': self.verack_received = True logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack')) @@ -297,6 +298,7 @@ class Connection(threading.Thread): self.vectors_to_get.update(to_get) # Do not send objects they already have. self.vectors_to_send.difference_update(inv.vectors) + elif m.command == b'object': obj = structure.Object.from_message(m) logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj)) @@ -311,20 +313,25 @@ class Connection(threading.Thread): logging.debug(dest) shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.vector_advertise_queue.put(obj.vector) + elif m.command == b'getdata': getdata = message.GetData.from_message(m) logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata)) self.vectors_to_send.update(getdata.vectors) + elif m.command == b'addr': addr = message.Addr.from_message(m) logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr)) for a in addr.addresses: shared.unchecked_node_pool.add((a.host, a.port)) + elif m.command == b'ping': logging.debug('{}:{} -> ping'.format(self.host_print, self.port)) self.send_queue.put(message.Message(b'pong', b'')) + elif m.command == b'error': logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload)) + else: logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m)) diff --git a/minode/main.py b/minode/main.py index e0f5e75..48e3d91 100644 --- a/minode/main.py +++ b/minode/main.py @@ -87,21 +87,7 @@ def parse_arguments(): shared.i2p_transient = True -def main(): - signal.signal(signal.SIGINT, handler) - signal.signal(signal.SIGTERM, handler) - - parse_arguments() - - logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s') - logging.info('Starting MiNode') - logging.info('Data directory: {}'.format(shared.data_directory)) - if not os.path.exists(shared.data_directory): - try: - os.makedirs(shared.data_directory) - except Exception as e: - logging.warning('Error while creating data directory in: {}'.format(shared.data_directory)) - logging.warning(e) +def load_data(): try: with open(shared.data_directory + 'objects.pickle', mode='br') as file: shared.objects = pickle.load(file) @@ -133,97 +119,130 @@ def main(): shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader} shared.i2p_node_pool.update(shared.i2p_core_nodes) - if shared.ip_enabled and not shared.trusted_peer: + +def bootstrap_from_dns(): + try: + for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): + shared.unchecked_node_pool.add((item[4][0], 8080)) + logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') + for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): + shared.unchecked_node_pool.add((item[4][0], 8444)) + logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') + except Exception as e: + logging.error('Error during DNS bootstrap') + logging.error(e) + + +def start_ip_listener(): + listener_ipv4 = None + listener_ipv6 = None + + if socket.has_ipv6: try: - for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): - shared.unchecked_node_pool.add((item[4][0], 8080)) - logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') - for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): - shared.unchecked_node_pool.add((item[4][0], 8444)) - logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') + listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6) + listener_ipv6.start() except Exception as e: - logging.error('Error during DNS bootstrap') + logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port)) + logging.warning(e) + + try: + listener_ipv4 = Listener(shared.listening_host, shared.listening_port) + listener_ipv4.start() + except Exception as e: + if listener_ipv6: + logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + + 'However the IPv6 one seems to be working and will probably accept IPv4 connections.') + else: + logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + + 'You will not receive incoming connections. Please check your port configuration') logging.error(e) - if shared.i2p_enabled: - # Grab I2P destinations from old object file - for obj in shared.objects.values(): - if obj.object_type == shared.i2p_dest_obj_type: - shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) - dest_priv = b'' +def start_i2p_listener(): + # Grab I2P destinations from old object file + for obj in shared.objects.values(): + if obj.object_type == shared.i2p_dest_obj_type: + shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) - if not shared.i2p_transient: - try: - with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file: - dest_priv = file.read() - logging.debug('Loaded I2P destination private key.') - except Exception as e: - logging.warning('Error while loading I2P destination private key.') - logging.warning(e) - - logging.info('Starting I2P Controller and creating tunnels. This may take a while.') - i2p_controller = i2p.controller.I2PController(shared.i2p_sam_host, shared.i2p_sam_port, dest_priv) - i2p_controller.start() - - shared.i2p_dest_pub = i2p_controller.dest_pub - shared.i2p_session_nick = i2p_controller.nick - - logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode())) - logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode())) - - logging.info('Starting I2P Listener') - i2p_listener = i2p.listener.I2PListener(i2p_controller.nick) - i2p_listener.start() - - if not shared.i2p_transient: - try: - with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file: - file.write(i2p_controller.dest_priv) - logging.debug('Saved I2P destination private key.') - except Exception as e: - logging.warning('Error while saving I2P destination private key.') - logging.warning(e) + dest_priv = b'' + if not shared.i2p_transient: try: - with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file: - file.write(shared.i2p_dest_pub) - logging.debug('Saved I2P destination public key.') + with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file: + dest_priv = file.read() + logging.debug('Loaded I2P destination private key.') except Exception as e: - logging.warning('Error while saving I2P destination public key.') + logging.warning('Error while loading I2P destination private key.') logging.warning(e) + logging.info('Starting I2P Controller and creating tunnels. This may take a while.') + i2p_controller = i2p.controller.I2PController(shared.i2p_sam_host, shared.i2p_sam_port, dest_priv) + i2p_controller.start() + + shared.i2p_dest_pub = i2p_controller.dest_pub + shared.i2p_session_nick = i2p_controller.nick + + logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode())) + logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode())) + + logging.info('Starting I2P Listener') + i2p_listener = i2p.listener.I2PListener(i2p_controller.nick) + i2p_listener.start() + + if not shared.i2p_transient: + try: + with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file: + file.write(i2p_controller.dest_priv) + logging.debug('Saved I2P destination private key.') + except Exception as e: + logging.warning('Error while saving I2P destination private key.') + logging.warning(e) + + try: + with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file: + file.write(shared.i2p_dest_pub) + logging.debug('Saved I2P destination public key.') + except Exception as e: + logging.warning('Error while saving I2P destination public key.') + logging.warning(e) + + +def main(): + signal.signal(signal.SIGINT, handler) + signal.signal(signal.SIGTERM, handler) + + parse_arguments() + + logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s') + logging.info('Starting MiNode') + + logging.info('Data directory: {}'.format(shared.data_directory)) + if not os.path.exists(shared.data_directory): + try: + os.makedirs(shared.data_directory) + except Exception as e: + logging.warning('Error while creating data directory in: {}'.format(shared.data_directory)) + logging.warning(e) + + load_data() + + if shared.ip_enabled and not shared.trusted_peer: + bootstrap_from_dns() + + if shared.i2p_enabled: + # We are starting it before cleaning expired objects so we can collect I2P destination objects + start_i2p_listener() + manager = Manager() manager.clean_objects() - manager.clean_connections() manager.start() advertiser = Advertiser() advertiser.start() - listener_ipv4 = None - listener_ipv6 = None - if shared.listen_for_connections: - if socket.has_ipv6: - try: - listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6) - listener_ipv6.start() - except Exception as e: - logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port)) - logging.warning(e) + start_ip_listener() - try: - listener_ipv4 = Listener(shared.listening_host, shared.listening_port) - listener_ipv4.start() - except Exception as e: - if listener_ipv6: - logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + - 'However the IPv6 one seems to be working and will probably accept IPv4 connections.') - else: - logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + - 'You will not receive incoming connections. Please check your port configuration') - logging.error(e) if __name__ == '__main__': multiprocessing.set_start_method('spawn') diff --git a/minode/manager.py b/minode/manager.py index ae257c6..04a36e8 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -22,7 +22,7 @@ class Manager(threading.Thread): self.last_cleaned_connections = time.time() self.last_pickled_objects = time.time() self.last_pickled_nodes = time.time() - self.last_published_i2p_destination = time.time() - 50 * 60 # Publish destination 10 minutes after start + self.last_published_i2p_destination = time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # Publish destination 5-15 minutes after start def run(self): while True: @@ -35,7 +35,7 @@ class Manager(threading.Thread): self.clean_objects() self.last_cleaned_objects = now if now - self.last_cleaned_connections > 2: - self.clean_connections() + self.manage_connections() self.last_cleaned_connections = now if now - self.last_pickled_objects > 100: self.pickle_objects() @@ -56,7 +56,7 @@ class Manager(threading.Thread): logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode())) @staticmethod - def clean_connections(): + def manage_connections(): hosts = set() outgoing_connections = 0 for c in shared.connections.copy(): @@ -147,6 +147,7 @@ class Manager(threading.Thread): shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000)) if len(shared.i2p_unchecked_node_pool) > 100: shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100)) + try: with open(shared.data_directory + 'nodes.pickle', mode='bw') as file: pickle.dump(shared.node_pool, file, protocol=3)