A minimal test for UDP. Restore expected default settings in tearDown().

This commit is contained in:
Dmitri Bogomolov 2021-02-22 19:46:23 +02:00
parent 5f9d507717
commit 6ee6989df2
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13

View File

@ -11,6 +11,7 @@ import shutil
import socket
import string
import sys
import threading
import time
import unittest
@ -61,6 +62,13 @@ class TestCore(unittest.TestCase):
"""Test case, which runs in main pybitmessage thread"""
addr = 'BM-2cVvkzJuQDsQHLqxRXc6HZGPLZnkBLzEZY'
def tearDown(self):
"""Reset possible unexpected settings after test"""
knownnodes.addKnownNode(1, Peer('127.0.0.1', 8444), is_self=True)
BMConfigParser().remove_option('bitmessagesettings', 'dontconnect')
BMConfigParser().remove_option('bitmessagesettings', 'onionservicesonly')
BMConfigParser().set('bitmessagesettings', 'socksproxytype', 'none')
def test_msgcoding(self):
"""test encoding and decoding (originally from helper_msgcoding)"""
msg_data = {
@ -270,6 +278,36 @@ class TestCore(unittest.TestCase):
return
self.fail('Failed to connect to at least 3 nodes within 360 sec')
def test_udp(self):
"""check default udp setting and presence of Announcer thread"""
self.assertTrue(
BMConfigParser().safeGetBoolean('bitmessagesettings', 'udp'))
for thread in threading.enumerate():
if thread.name == 'Announcer': # find Announcer thread
break
else:
return self.fail('No Announcer thread found')
for _ in range(20): # wait for UDP socket
for sock in BMConnectionPool().udpSockets.values():
thread.announceSelf()
break
else:
time.sleep(1)
continue
break
else:
self.fail('UDP socket is not started')
for _ in range(20):
if state.discoveredPeers:
peer = state.discoveredPeers.keys()[0]
self.assertEqual(peer.port, 8444)
break
time.sleep(1)
else:
self.fail('No self in discovered peers')
@staticmethod
def _decode_msg(data, pattern):
proto = BMProto()