84 lines
2.4 KiB
Python
84 lines
2.4 KiB
Python
|
import unittest
|
||
|
import sys
|
||
|
import tempfile
|
||
|
import time
|
||
|
import threading
|
||
|
|
||
|
from minode import shared
|
||
|
from minode.main import main as app
|
||
|
|
||
|
|
||
|
class TestAppProto(unittest.TestCase):
|
||
|
"""Import and start the application"""
|
||
|
_process_cmd = ['minode']
|
||
|
_connection_limit = 4 if sys.platform.startswith('win') else 6
|
||
|
_listen = False
|
||
|
_listening_port = None
|
||
|
|
||
|
home = None
|
||
|
|
||
|
@classmethod
|
||
|
def _build_app_args(cls):
|
||
|
if not cls.home:
|
||
|
cls.home = tempfile.gettempdir()
|
||
|
args = cls._process_cmd + [
|
||
|
'--data-dir', cls.home,
|
||
|
'--connection-limit', str(cls._connection_limit)
|
||
|
]
|
||
|
if not cls._listen:
|
||
|
args += ['--no-incoming']
|
||
|
elif cls._listening_port:
|
||
|
args += ['-p', str(cls._listening_port)]
|
||
|
|
||
|
return args
|
||
|
|
||
|
def _connections(self):
|
||
|
return [
|
||
|
c for c in shared.connections.copy()
|
||
|
if c.status == 'fully_established']
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
sys.argv = cls._build_app_args()
|
||
|
cls.app = threading.Thread(name="minode", target=app, daemon=True)
|
||
|
cls.app.start()
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
shared.shutting_down = True
|
||
|
|
||
|
|
||
|
class TestApp(TestAppProto):
|
||
|
"""Check the app parameters"""
|
||
|
_wait_time = 120
|
||
|
_check_limit = True
|
||
|
|
||
|
def test_connections(self):
|
||
|
"""Check connections"""
|
||
|
_started = time.time()
|
||
|
|
||
|
def continue_check_limit(extra_time):
|
||
|
for t in range(extra_time * 4):
|
||
|
self.assertLessEqual(
|
||
|
len(self._connections()),
|
||
|
# shared.outgoing_connections, one listening
|
||
|
# TODO: find the cause of one extra
|
||
|
(min(self._connection_limit, 8) if not self._listen
|
||
|
else self._connection_limit) + 1,
|
||
|
'Opened more connections than required'
|
||
|
' by --connection-limit')
|
||
|
time.sleep(0.5)
|
||
|
|
||
|
for t in range(self._wait_time * 2):
|
||
|
if len(self._connections()) > self._connection_limit / 2:
|
||
|
_time_to_connect = round(time.time() - _started)
|
||
|
break
|
||
|
time.sleep(0.5)
|
||
|
else:
|
||
|
self.fail(
|
||
|
'Failed establish at least %s connections in %s sec'
|
||
|
% (self._connection_limit / 2, self._wait_time))
|
||
|
|
||
|
if self._check_limit:
|
||
|
continue_check_limit(_time_to_connect)
|