"""Test interprocess queues""" import time import zmq from minode import structure, message from .test_process import TestProcessProto class TestProcessQueue(TestProcessProto): """A test case starting the process with queues""" _process_cmd = ['minode', '--msg-queue'] @classmethod def setUpClass(cls): super().setUpClass() context = zmq.Context() cls.socket = context.socket(zmq.SUB) cls.socket.connect('tcp://localhost:5556') cls.socket.setsockopt(zmq.RCVTIMEO, 5000) cls.socket.setsockopt(zmq.SUBSCRIBE, b'obj') def test_receive_msg(self): """wait a couple of messages""" timeout = 240 start = time.time() c = 0 while time.time() - start < timeout: if c > 1: return try: tag, data = self.socket.recv().split(b'\x00', 1) except zmq.error.Again: continue c += 1 self.assertEqual(tag, b'obj') obj_type, data = data.split(b'\x00', 1) obj = structure.Object.from_message( message.Message(b'object', data)) self.assertEqual(int(obj_type), obj.object_type) self.assertTrue(obj.is_valid()) if c == 0: self.fail('No messages received in %ss' % timeout)