"""Tests for the Inventory implementation""" import os import random import tempfile import time import unittest from minode import sql, shared, structure # + __bool__ # + __contains__ # + __getitem__ # + __setitem__ # = cleanup # + get # + filter # = select # + vectors_to_send class TestObjects(): """ A base class for the test case for shared.objects, containing tests for all the methods directly used in code. """ # pylint: disable=no-member # A possibility of abstract test cases was rejected: # https://bugs.python.org/issue17519 def test_set_get(self): """Put some objects and check presence and getting""" obj = structure.Object( int(time.time()), 42, 1, 1, object_payload=b'HELLO') self.assertFalse(obj.vector in self.objects) with self.assertRaises(KeyError): self.objects[obj.vector] # pylint: disable=pointless-statement self.assertIsNone(self.objects.get(obj.vector)) prev_len = len(self.objects) self.objects[obj.vector] = obj self.objects[obj.vector] = obj self.assertTrue(self.objects) self.assertEqual(len(self.objects), prev_len + 1) self.assertTrue(obj.vector in self.objects) obj1 = self.objects[obj.vector] self.assertEqual(obj.vector, obj1.vector) self.assertEqual(obj.data, obj1.data) def test_vectors_to_send(self): """Check vectors_to_send method""" needed = set() for _ in range(10): # wrong stream obj = structure.Object( int(time.time()) + 10, 42, 1, random.randint(1, 3), object_payload=os.urandom(32)) self.objects[obj.vector] = obj # expired obj = structure.Object( int(time.time()) - 10, 42, 1, 4, object_payload=os.urandom(32)) self.objects[obj.vector] = obj # interesting obj = structure.Object( int(time.time()) + 10, 42, 1, 4, object_payload=os.urandom(32)) self.objects[obj.vector] = obj needed.add(obj.vector) self.assertEqual( set(next(self.objects.vectors_to_send(stream=4))), needed) self.assertTrue( set(next(self.objects.vectors_to_send())).difference(needed)) def test_filter(self): """Check the objects filtering""" needed = set() tagged = set() tag = b'@' * 32 for _ in range(10): # wrong type obj = structure.Object( int(time.time()), 0, 1, 5, object_payload=os.urandom(64)) self.objects[obj.vector] = obj # wrong type, but the proper tag obj = structure.Object( int(time.time()) - 11000, 0, 4, random.choice([1, 2, 3, 5]), object_payload=tag + os.urandom(32)) self.objects[obj.vector] = obj tagged.add(obj.vector) # wrong stream obj = structure.Object( int(time.time()), 33, 1, 1, object_payload=os.urandom(32)) self.objects[obj.vector] = obj # interesting obj = structure.Object( int(time.time()) - 11000, 33, 1, 5, object_payload=os.urandom(32)) self.objects[obj.vector] = obj needed.add(obj.vector) # stream and type self.assertTrue(needed) for obj in self.objects.filter(5, 33): needed.remove(obj.vector) self.assertFalse(needed) # tag self.assertTrue(tagged) for obj in self.objects.filter(tag=tag): tagged.remove(obj.vector) self.assertFalse(tagged) def test_cleanup(self): """Check cleaning up""" for _ in range(10): obj = structure.Object( int(time.time()) - random.randint(4, 5) * 3600, 42, 1, 6, object_payload=os.urandom(32)) self.objects[obj.vector] = obj obj = structure.Object( int(time.time()) - 2 * 3600, 42, 1, 6, object_payload=os.urandom(32)) self.objects[obj.vector] = obj for obj in self.objects.values(): if obj.is_expired(): break else: self.fail('No objects found to delete') self.objects.cleanup() self.assertTrue(self.objects) for obj in self.objects.values(): self.assertFalse(obj.is_expired()) def test_select(self): """Check the select method""" pending = set() questionable = set() for _ in range(5): obj = structure.Object( int(time.time()) - 10, 42, 1, 7, object_payload=os.urandom(32)) questionable.add(obj.vector) self.objects[obj.vector] = obj obj = structure.Object( int(time.time()) + 10, 42, 1, 7, object_payload=os.urandom(32)) questionable.add(obj.vector) pending.add(obj.vector) self.assertEqual(self.objects.select(questionable), pending) class TestObjectsSQL(TestObjects, unittest.TestCase): """A test case for the sqlite inventory""" @classmethod def setUpClass(cls): shared.data_directory = tempfile.gettempdir() cls.tearDownClass() cls.objects = sql.Inventory() @classmethod def tearDownClass(cls): cls.objects = None os.remove(os.path.join(shared.data_directory, 'objects.dat'))