MiNode/minode/tests/test_process.py

211 lines
6.7 KiB
Python

"""Blind tests, starting the minode process"""
import os
import signal
import socket
import subprocess
import sys
import tempfile
import time
import unittest
import psutil
from minode.i2p import util
from minode.structure import NetAddrNoPrefix
try:
socket.socket().bind(('127.0.0.1', 7656))
i2p_port_free = True
except (OSError, socket.error):
i2p_port_free = False
try:
socket.socket().bind(('127.0.0.1', 9050))
tor_port_free = True
except (OSError, socket.error):
tor_port_free = False
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
_process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 8
_listen = False
_listening_port = None
home = None
@classmethod
def setUpClass(cls):
if not cls.home:
cls.home = tempfile.gettempdir()
cmd = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
cmd += ['--no-incoming']
elif cls._listening_port:
cmd += ['-p', str(cls._listening_port)]
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
@classmethod
def _stop_process(cls, timeout=5):
cls.process.send_signal(signal.SIGTERM)
try:
cls.process.wait(timeout)
except psutil.TimeoutExpired:
return False
return True
@classmethod
def tearDownClass(cls):
"""Ensures that process stopped and removes files"""
try:
if not cls._stop_process(10):
try:
cls.process.kill()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
def connections(self):
"""All process' established connections"""
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
_wait_time = 30
# longer wait time because it's not a benchmark
def test_shutdown(self):
"""Send to minode SIGTERM and ensure it stopped"""
self.assertTrue(
self._stop_process(self._wait_time),
'%s has not stopped in %i sec' % (
' '.join(self._process_cmd), self._wait_time))
class TestProcess(TestProcessProto):
"""The test case for minode process"""
_wait_time = 180
_check_limit = False
def test_connections(self):
"""Check minode process connections"""
_started = time.time()
def continue_check_limit(extra_time):
for _ in range(extra_time * 2):
self.assertLessEqual(
len(self.connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
else self._connection_limit) + 1,
'Opened more connections than required'
' by --connection-limit')
time.sleep(1)
for _ in range(self._wait_time * 2):
if len(self.connections()) >= self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
if '--i2p' not in self._process_cmd:
groups = []
for c in self.connections():
group = NetAddrNoPrefix.network_group(c.raddr[0])
self.assertNotIn(group, groups)
groups.append(group)
time.sleep(0.5)
else:
self.fail(
'Failed to establish at least %i connections in %s sec'
% (int(self._connection_limit / 2), self._wait_time))
if self._check_limit:
continue_check_limit(_time_to_connect)
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
self.fail('Listening while started with --no-incoming')
return
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')
@unittest.skipIf(i2p_port_free, 'No running i2pd detected')
class TestProcessI2P(TestProcess):
"""Test minode process with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
_listen = True
_listening_port = 8448
@classmethod
def setUpClass(cls):
cls.freezed = False
cls.keyfile = os.path.join(cls.home, 'i2p_dest.pub')
saved = os.path.isfile(cls.keyfile)
super().setUpClass()
for _ in range(cls._wait_time):
if saved:
if cls.process.num_threads() > 3:
break
elif os.path.isfile(cls.keyfile):
break
time.sleep(1)
else:
cls.freezed = True
def setUp(self):
"""Skip any test if I2PController freezed"""
if self.freezed:
raise unittest.SkipTest(
'I2PController has probably failed to start')
def test_saved_keys(self):
"""Check saved i2p keys"""
with open(self.keyfile, 'br') as src:
i2p_dest_pub = src.read()
with open(os.path.join(self.home, 'i2p_dest_priv.key'), 'br') as src:
i2p_dest_priv = src.read()
self.assertEqual(util.pub_from_priv(i2p_dest_priv), i2p_dest_pub)
def test_connections(self):
"""Ensure all connections are I2P"""
super().test_connections()
for c in self.connections():
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 7656)
@unittest.skipUnless(i2p_port_free, 'Detected running i2pd')
class TestProcessNoI2P(TestProcessShutdown):
"""Test minode process shutdown with --i2p and no IP"""
_process_cmd = ['minode', '--i2p', '--no-ip']
@unittest.skipIf(tor_port_free, 'No running tor detected')
class TestProcessTor(TestProcessProto):
"""A test case for minode process running with tor enabled"""
_process_cmd = ['minode', '--tor']
_wait_time = 60
def test_connections(self):
"""Check minode process connections"""
for _ in range(self._wait_time):
time.sleep(0.5)
connections = self.connections()
for c in connections:
self.assertEqual(c.raddr[0], '127.0.0.1')
self.assertEqual(c.raddr[1], 9050)
if len(connections) > self._connection_limit / 2:
break