MiNode/minode/tests/test_process.py

170 lines
5.3 KiB
Python

"""Blind tests, starting the minode process"""
import unittest
import signal
import socket
import subprocess
import sys
import tempfile
import time
import psutil
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
try:
socket.socket().bind(('127.0.0.1', 9050))
tor_port_free = True
except (OSError, socket.error):
tor_port_free = False
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
_process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 10
_listen = False
_listening_port = None
home = None
@classmethod
def setUpClass(cls):
if not cls.home:
cls.home = tempfile.gettempdir()
cmd = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
cmd += ['--no-incoming']
elif cls._listening_port:
cmd += ['-p', str(cls._listening_port)]
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
@classmethod
def _stop_process(cls, timeout=5):
cls.process.send_signal(signal.SIGTERM)
try:
cls.process.wait(timeout)
except psutil.TimeoutExpired:
return False
return True
@classmethod
def tearDownClass(cls):
"""Ensures that process stopped and removes files"""
try:
if not cls._stop_process(10):
try:
cls.process.kill()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
def test_shutdown(self):
"""Send to minode SIGTERM and ensure it stopped"""
# longer wait time because it's not a benchmark
self.assertTrue(
self._stop_process(20),
'%s has not stopped in 20 sec' % ' '.join(self._process_cmd))
class TestProcess(TestProcessProto):
"""The test case for minode process"""
_wait_time = 120
_check_limit = False
def test_connections(self):
"""Check minode process connections"""
_started = time.time()
def continue_check_limit(extra_time):
for _ in range(extra_time * 2):
self.assertLessEqual(
len(self.connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
else self._connection_limit) + 1,
'Opened more connections than required'
' by --connection-limit')
time.sleep(1)
for _ in range(self._wait_time * 2):
if len(self.connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
else:
self.fail(
'Failed establish at least %s connections in %s sec'
% (self._connection_limit / 2, self._wait_time))
if self._check_limit:
continue_check_limit(_time_to_connect)
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
self.fail('Listening while started with --no-incoming')
return
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_connection_limit = 4
_wait_time = 120
_listen = True
_listening_port = 8448
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
@unittest.skipIf(tor_port_free, 'No running tor detected')
class TestProcessTor(TestProcessProto):
"""A test case for minode process running with tor enabled"""
_process_cmd = ['minode', '--tor']
_wait_time = 60
def test_connections(self):
"""Check minode process connections"""
for _ in range(self._wait_time):
time.sleep(0.5)
connections = self.connections()
for c in connections:
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 9050)
if len(connections) > self._connection_limit / 2:
break