58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
|
"""Tests for memory usage"""
|
||
|
|
||
|
import gc
|
||
|
import time
|
||
|
|
||
|
from minode import shared
|
||
|
|
||
|
from .test_network import TestProcessProto, run_listener
|
||
|
|
||
|
|
||
|
class TestListener(TestProcessProto):
|
||
|
"""A separate test case for Listener with a process with --trusted-peer"""
|
||
|
_process_cmd = ['minode', '--trusted-peer', '127.0.0.1']
|
||
|
|
||
|
def setUp(self):
|
||
|
shared.shutting_down = False
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
super().tearDownClass()
|
||
|
shared.shutting_down = False
|
||
|
|
||
|
def test_listener(self):
|
||
|
"""Start Listener and disconnect a client"""
|
||
|
with run_listener() as listener:
|
||
|
if not listener:
|
||
|
self.fail('Failed to start listener')
|
||
|
|
||
|
shared.connection_limit = 2
|
||
|
connected = False
|
||
|
started = time.time()
|
||
|
while not connected:
|
||
|
time.sleep(0.2)
|
||
|
if time.time() - started > 90:
|
||
|
self.fail('Failed to establish the connection')
|
||
|
for c in shared.connections:
|
||
|
if c.status == 'fully_established':
|
||
|
connected = True
|
||
|
|
||
|
if not self._stop_process(10):
|
||
|
self.fail('Failed to stop the client process')
|
||
|
|
||
|
for c in shared.connections.copy():
|
||
|
if not c.is_alive() or c.status == 'disconnected':
|
||
|
shared.connections.remove(c)
|
||
|
c = None
|
||
|
break
|
||
|
else:
|
||
|
self.fail('The connection is alive')
|
||
|
|
||
|
gc.collect()
|
||
|
for obj in gc.get_objects():
|
||
|
if (
|
||
|
isinstance(obj, shared.connection)
|
||
|
and obj not in shared.connections
|
||
|
):
|
||
|
self.fail('Connection %s remains in memory' % obj)
|