MiNode/minode/tests/test_process.py

115 lines
3.6 KiB
Python

import unittest
import signal
import subprocess
import sys
import tempfile
import time
import psutil
class TestProcessProto(unittest.TestCase):
"""Test process attributes, common flow"""
_process_cmd = ['minode']
_connection_limit = 4 if sys.platform.startswith('win') else 10
_listen = False
_listening_port = None
home = None
@classmethod
def setUpClass(cls):
if not cls.home:
cls.home = tempfile.gettempdir()
cmd = cls._process_cmd + [
'--data-dir', cls.home,
'--connection-limit', str(cls._connection_limit)
]
if not cls._listen:
cmd += ['--no-incoming']
elif cls._listening_port:
cmd += ['-p', str(cls._listening_port)]
cls.process = psutil.Popen(cmd, stderr=subprocess.STDOUT) # nosec
@classmethod
def _stop_process(cls, timeout=5):
cls.process.send_signal(signal.SIGTERM)
try:
cls.process.wait(timeout)
except psutil.TimeoutExpired:
return False
return True
@classmethod
def tearDownClass(cls):
"""Ensures that pybitmessage stopped and removes files"""
try:
if not cls._stop_process(10):
try:
cls.process.kill()
except psutil.NoSuchProcess:
pass
except psutil.NoSuchProcess:
pass
class TestProcessShutdown(TestProcessProto):
"""Separate test case for SIGTERM"""
def test_shutdown(self):
"""Send to minode SIGTERM and ensure it stopped"""
# longer wait time because it's not a benchmark
self.assertTrue(
self._stop_process(20),
'%s has not stopped in 20 sec' % ' '.join(self._process_cmd))
class TestProcess(TestProcessProto):
"""The test case for minode process"""
_wait_time = 120
_check_limit = False
def test_connections(self):
"""Check minode process connections"""
_started = time.time()
def connections():
return [
c for c in self.process.connections()
if c.status == 'ESTABLISHED']
def continue_check_limit(extra_time):
for t in range(extra_time * 2):
self.assertLessEqual(
len(connections()),
# shared.outgoing_connections, one listening
# TODO: find the cause of one extra
(min(self._connection_limit, 8) if not self._listen
else self._connection_limit) + 1,
'Opened more connections than required'
' by --connection-limit')
time.sleep(1)
for t in range(self._wait_time * 2):
if len(connections()) > self._connection_limit / 2:
_time_to_connect = round(time.time() - _started)
break
time.sleep(0.5)
else:
self.fail(
'Failed establish at least %s connections in %s sec'
% (self._connection_limit / 2, self._wait_time))
if self._check_limit:
continue_check_limit(_time_to_connect)
for c in self.process.connections():
if c.status == 'LISTEN':
if self._listen is False:
return self.fail(
'Listening while started with --no-incoming')
self.assertEqual(c.laddr[1], self._listening_port or 8444)
break
else:
if self._listen:
self.fail('No listening connection found')