import unittest import os import shutil import signal import subprocess import sys import tempfile import threading import time import psutil from minode import shared from minode.main import main as app class TestProcessProto(unittest.TestCase): """Test process attributes, common flow""" _process_cmd = ['minode'] _connection_limit = 4 if sys.platform.startswith('win') else 10 _listen = False _listening_port = None home = None @classmethod def _build_app_args(cls, extra=None): if not cls.home: cls.home = tempfile.gettempdir() args = cls._process_cmd + [ '--data-dir', cls.home, '--connection-limit', str(cls._connection_limit) ] if not cls._listen: args += ['--no-incoming'] elif cls._listening_port: args += ['-p', str(cls._listening_port)] if extra: args += extra print('ARGS: %r' % args) return args @classmethod def _stop_process(cls, timeout=5): cls.process.send_signal(signal.SIGTERM) try: cls.process.wait(timeout) except psutil.TimeoutExpired: return False return True @classmethod def setUpClass(cls): cmd = cls._build_app_args() cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec @classmethod def tearDownClass(cls): """Ensures the process is stopped and removes files""" try: if not cls._stop_process(10): try: cls.process.kill() except psutil.NoSuchProcess: pass except psutil.NoSuchProcess: pass class TestProcessShutdown(TestProcessProto): """Separate test case for SIGTERM""" def test_shutdown(self): """Send to minode SIGTERM and ensure it stopped""" # longer wait time because it's not a benchmark self.assertTrue( self._stop_process(20), '%s has not stopped in 20 sec' % ' '.join(self._process_cmd)) class TestProcess(TestProcessProto): """The test case for minode process""" _wait_time = 120 _check_limit = False def test_connections(self): """Check minode process connections""" _started = time.time() def connections(): return [ c for c in self.process.connections() if c.status == 'ESTABLISHED'] def continue_check_limit(extra_time): for t in range(extra_time * 2): self.assertLessEqual( len(connections()), # shared.outgoing_connections, one listening # TODO: find the cause of one extra (min(self._connection_limit, 8) if not self._listen else self._connection_limit) + 1, 'Opened more connections than required' ' by --connection-limit') time.sleep(1) for t in range(self._wait_time * 2): if len(connections()) > self._connection_limit / 2: _time_to_connect = round(time.time() - _started) break time.sleep(0.5) else: self.fail( 'Failed establish at least %s connections in %s sec' % (self._connection_limit / 2, self._wait_time)) if self._check_limit: continue_check_limit(_time_to_connect) for c in self.process.connections(): if c.status == 'LISTEN': if self._listen is False: return self.fail( 'Listening while started with --no-incoming') self.assertEqual(c.laddr[1], self._listening_port or 8444) break else: if self._listen: self.fail('No listening connection found') class TestConnectivity(TestProcessProto): """Check connectivity between instances""" _process_cmd = [ 'minode', '--trusted-peer', '127.0.0.1:8445'] # _listen = True @classmethod def setUpClass(cls): super(TestConnectivity, cls).setUpClass() cls._listen = True cls._listening_port = 8445 cls.home = os.path.join(cls.home, 'client') os.mkdir(cls.home) # sys.argv = cls._build_app_args(['--trusted-peer', '127.0.0.1:8444']) cls._process_cmd = ['minode', '--no-outgoing'] sys.argv = cls._build_app_args() cls.app = threading.Thread(name="minode", target=app, daemon=True) cls.app.start() @classmethod def tearDownClass(cls): super(TestConnectivity, cls).tearDownClass() shared.shutting_down = True shutil.rmtree(cls.home) def test_connections(self): """Check the connection with trusted peer""" time.sleep(5) for t in range(10): time.sleep(1) connection_count = len([ c for c in shared.connections.copy() if c.status == 'fully_established']) if connection_count != 1: self.fail("Unexpected connection count: %i" % connection_count)