From bf8eca8eee6ba409364610e0b445c1fe1f287181 Mon Sep 17 00:00:00 2001 From: Lee Miller Date: Sun, 29 Sep 2024 17:04:34 +0300 Subject: [PATCH] Started a test case for the objects --- minode/tests/test_objects.py | 153 +++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 minode/tests/test_objects.py diff --git a/minode/tests/test_objects.py b/minode/tests/test_objects.py new file mode 100644 index 0000000..46bff6c --- /dev/null +++ b/minode/tests/test_objects.py @@ -0,0 +1,153 @@ +"""Tests for the Inventory implementation""" +import os +import random +import tempfile +import time +import unittest + +from minode import sql, shared, structure + + +# + __bool__ +# + __contains__ +# + __getitem__ +# + __setitem__ +# = cleanup +# + get +# + filter +# = select +# + vectors_to_send + + +class TestObjectsSQL(unittest.TestCase): + """A test case for the sqlite inventory""" + + @classmethod + def setUpClass(cls): + shared.data_directory = tempfile.gettempdir() + cls.tearDownClass() + cls.objects = sql.Inventory() + + @classmethod + def tearDownClass(cls): + cls.objects = None + os.remove(os.path.join(shared.data_directory, 'objects.dat')) + + def test_set_get(self): + """Put some objects and check presence and getting""" + obj = structure.Object( + int(time.time()), 42, 1, 1, object_payload=b'HELLO') + self.assertFalse(obj.vector in self.objects) + with self.assertRaises(KeyError): + self.objects[obj.vector] # pylint: disable=pointless-statement + self.assertIsNone(self.objects.get(obj.vector)) + prev_len = len(self.objects) + self.objects[obj.vector] = obj + self.objects[obj.vector] = obj + self.assertTrue(self.objects) + self.assertEqual(len(self.objects), prev_len + 1) + self.assertTrue(obj.vector in self.objects) + obj1 = self.objects[obj.vector] + self.assertEqual(obj.vector, obj1.vector) + self.assertEqual(obj.data, obj1.data) + + def test_vectors_to_send(self): + """Check vectors_to_send method""" + needed = set() + for _ in range(10): + # wrong stream + obj = structure.Object( + int(time.time()) + 10, 42, 1, random.randint(1, 3), + object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # expired + obj = structure.Object( + int(time.time()) - 10, 42, 1, 4, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # interesting + obj = structure.Object( + int(time.time()) + 10, 42, 1, 4, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + needed.add(obj.vector) + + self.assertEqual(set(self.objects.vectors_to_send(4)), needed) + self.assertTrue(set(self.objects.vectors_to_send()).difference(needed)) + + def test_filter(self): + """Check the objects filtering""" + needed = set() + tagged = set() + tag = b'@' * 32 + for _ in range(10): + # wrong type + obj = structure.Object( + int(time.time()), 0, 1, 5, object_payload=os.urandom(64)) + self.objects[obj.vector] = obj + # wrong type, but the proper tag + obj = structure.Object( + int(time.time()) - 11000, 0, 4, random.randint(1, 5), + object_payload=tag + os.urandom(32)) + self.objects[obj.vector] = obj + tagged.add(obj.vector) + # wrong stream + obj = structure.Object( + int(time.time()), 33, 1, 1, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + # interesting + obj = structure.Object( + int(time.time()) - 11000, 33, 1, 5, + object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + needed.add(obj.vector) + + # stream and type + self.assertTrue(needed) + for obj in self.objects.filter(5, 33): + needed.remove(obj.vector) + self.assertFalse(needed) + + # tag + self.assertTrue(tagged) + for obj in self.objects.filter(tag=tag): + tagged.remove(obj.vector) + self.assertFalse(tagged) + + def test_cleanup(self): + """Check cleaning up""" + for _ in range(10): + obj = structure.Object( + int(time.time()) - random.randint(4, 5) * 3600, + 42, 1, 6, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + obj = structure.Object( + int(time.time()) - 2 * 3600, + 42, 1, 6, object_payload=os.urandom(32)) + self.objects[obj.vector] = obj + + for obj in self.objects.values(): + if obj.is_expired(): + break + else: + self.fail('No objects found to delete') + + self.objects.cleanup() + self.assertTrue(self.objects) + for obj in self.objects.values(): + self.assertFalse(obj.is_expired()) + + def test_select(self): + """Check the select method""" + pending = set() + questionable = set() + + for _ in range(5): + obj = structure.Object( + int(time.time()) - 10, 42, 1, 7, object_payload=os.urandom(32)) + questionable.add(obj.vector) + self.objects[obj.vector] = obj + obj = structure.Object( + int(time.time()) + 10, 42, 1, 7, object_payload=os.urandom(32)) + questionable.add(obj.vector) + pending.add(obj.vector) + + self.assertEqual(self.objects.select(questionable), pending)