Make I2P tests not fail for now #10
|
@ -1,11 +1,12 @@
|
|||
"""Blind tests, starting the minode process"""
|
||||
import unittest
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import psutil
|
||||
|
||||
|
@ -138,12 +139,29 @@ class TestProcess(TestProcessProto):
|
|||
class TestProcessI2P(TestProcess):
|
||||
"""Test minode process with --i2p and no IP"""
|
||||
_process_cmd = ['minode', '--i2p', '--no-ip']
|
||||
_connection_limit = 4
|
||||
_listen = True
|
||||
_listening_port = 8448
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.freezed = False
|
||||
cls.keyfile = os.path.join(cls.home, 'i2p_dest.pub')
|
||||
saved = os.path.isfile(cls.keyfile)
|
||||
super().setUpClass()
|
||||
for _ in range(cls._wait_time):
|
||||
if saved:
|
||||
if cls.process.num_threads() > 3:
|
||||
break
|
||||
elif os.path.isfile(cls.keyfile):
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
cls.freezed = True
|
||||
|
||||
def test_connections(self):
|
||||
"""Ensure all connections are I2P"""
|
||||
if self.freezed:
|
||||
self.fail('I2PController has probably failed to start')
|
||||
super().test_connections()
|
||||
for c in self.connections():
|
||||
self.assertEqual(c.raddr[0], '127.0.0.1')
|
||||
|
|
Loading…
Reference in New Issue
Block a user