"""Blind tests, starting the minode process""" import os import signal import socket import subprocess import sys import tempfile import time import unittest import psutil from minode.i2p import util from minode.structure import NetAddrNoPrefix from .common import tor_port_free try: socket.socket().bind(('127.0.0.1', 7656)) i2p_port_free = True except (OSError, socket.error): i2p_port_free = False class TestProcessProto(unittest.TestCase): """Test process attributes, common flow""" _process_cmd = ['minode'] _connection_limit = 4 if sys.platform.startswith('win') else 8 _listen = False _listening_port = None home = None @classmethod def setUpClass(cls): if not cls.home: cls.home = tempfile.gettempdir() cmd = cls._process_cmd + [ '--data-dir', cls.home, '--connection-limit', str(cls._connection_limit) ] if not cls._listen: cmd += ['--no-incoming'] elif cls._listening_port: cmd += ['-p', str(cls._listening_port)] cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec @classmethod def _stop_process(cls, timeout=5): cls.process.send_signal(signal.SIGTERM) try: cls.process.wait(timeout) except psutil.TimeoutExpired: return False return True @classmethod def tearDownClass(cls): """Ensures that process stopped and removes files""" try: if not cls._stop_process(10): try: cls.process.kill() except psutil.NoSuchProcess: pass except psutil.NoSuchProcess: pass def connections(self): """All process' established connections""" return [ c for c in self.process.connections() if c.status == 'ESTABLISHED'] class TestProcessShutdown(TestProcessProto): """Separate test case for SIGTERM""" _wait_time = 30 # longer wait time because it's not a benchmark def test_shutdown(self): """Send to minode SIGTERM and ensure it stopped""" self.assertTrue( self._stop_process(self._wait_time), '%s has not stopped in %i sec' % ( ' '.join(self._process_cmd), self._wait_time)) class TestProcess(TestProcessProto): """The test case for minode process""" _wait_time = 180 _check_limit = False def test_connections(self): """Check minode process connections""" _started = time.time() def continue_check_limit(extra_time): for _ in range(extra_time * 2): self.assertLessEqual( len(self.connections()), # shared.outgoing_connections, one listening # TODO: find the cause of one extra (min(self._connection_limit, 8) if not self._listen else self._connection_limit) + 1, 'Opened more connections than required' ' by --connection-limit') time.sleep(1) for _ in range(self._wait_time * 2): if len(self.connections()) >= self._connection_limit / 2: _time_to_connect = round(time.time() - _started) break if '--i2p' not in self._process_cmd: groups = [] for c in self.connections(): group = NetAddrNoPrefix.network_group(c.raddr[0]) self.assertNotIn(group, groups) groups.append(group) time.sleep(0.5) else: self.fail( 'Failed to establish at least %i connections in %s sec' % (int(self._connection_limit / 2), self._wait_time)) if self._check_limit: continue_check_limit(_time_to_connect) for c in self.process.connections(): if c.status == 'LISTEN': if self._listen is False: self.fail('Listening while started with --no-incoming') return self.assertEqual(c.laddr[1], self._listening_port or 8444) break else: if self._listen: self.fail('No listening connection found') @unittest.skipIf(i2p_port_free, 'No running i2pd detected') class TestProcessI2P(TestProcess): """Test minode process with --i2p and no IP""" _process_cmd = ['minode', '--i2p', '--no-ip'] _listen = True _listening_port = 8448 @classmethod def setUpClass(cls): cls.freezed = False cls.keyfile = os.path.join(cls.home, 'i2p_dest.pub') saved = os.path.isfile(cls.keyfile) super().setUpClass() for _ in range(cls._wait_time): if saved: if cls.process.num_threads() > 3: break elif os.path.isfile(cls.keyfile): break time.sleep(1) else: cls.freezed = True def setUp(self): """Skip any test if I2PController freezed""" if self.freezed: raise unittest.SkipTest( 'I2PController has probably failed to start') def test_saved_keys(self): """Check saved i2p keys""" with open(self.keyfile, 'br') as src: i2p_dest_pub = src.read() with open(os.path.join(self.home, 'i2p_dest_priv.key'), 'br') as src: i2p_dest_priv = src.read() self.assertEqual(util.pub_from_priv(i2p_dest_priv), i2p_dest_pub) def test_connections(self): """Ensure all connections are I2P""" super().test_connections() for c in self.connections(): self.assertEqual(c.raddr[0], '127.0.0.1') self.assertEqual(c.raddr[1], 7656) @unittest.skipUnless(i2p_port_free, 'Detected running i2pd') class TestProcessNoI2P(TestProcessShutdown): """Test minode process shutdown with --i2p and no IP""" _process_cmd = ['minode', '--i2p', '--no-ip'] @unittest.skipIf(tor_port_free, 'No running tor detected') class TestProcessTor(TestProcessProto): """A test case for minode process running with tor enabled""" _process_cmd = ['minode', '--tor'] _wait_time = 60 def test_connections(self): """Check minode process connections""" for _ in range(self._wait_time): time.sleep(0.5) connections = self.connections() for c in connections: self.assertEqual(c.raddr[0], '127.0.0.1') self.assertEqual(c.raddr[1], 9050) if len(connections) > self._connection_limit / 2: break