MiNode/minode/tests/test_queue.py

47 lines
1.3 KiB
Python
Raw Permalink Normal View History

"""Test interprocess queues"""
import time
import zmq
from minode import structure, message
from .test_process import TestProcessProto
class TestProcessQueue(TestProcessProto):
"""A test case starting the process with queues"""
_process_cmd = ['minode', '--msg-queue']
@classmethod
def setUpClass(cls):
super().setUpClass()
context = zmq.Context()
cls.socket = context.socket(zmq.SUB)
cls.socket.connect('tcp://localhost:5556')
cls.socket.setsockopt(zmq.RCVTIMEO, 5000)
cls.socket.setsockopt(zmq.SUBSCRIBE, b'obj')
def test_receive_msg(self):
"""wait a couple of messages"""
timeout = 240
start = time.time()
c = 0
while time.time() - start < timeout:
if c > 1:
return
try:
tag, data = self.socket.recv().split(b'\x00', 1)
except zmq.error.Again:
continue
c += 1
self.assertEqual(tag, b'obj')
obj_type, data = data.split(b'\x00', 1)
obj = structure.Object.from_message(
message.Message(b'object', data))
self.assertEqual(int(obj_type), obj.object_type)
self.assertTrue(obj.is_valid())
if c == 0:
self.fail('No messages received in %ss' % timeout)