Started a test case for the objects
This commit is contained in:
parent
7c9901eecb
commit
bf8eca8eee
153
minode/tests/test_objects.py
Normal file
153
minode/tests/test_objects.py
Normal file
|
@ -0,0 +1,153 @@
|
|||
"""Tests for the Inventory implementation"""
|
||||
import os
|
||||
import random
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from minode import sql, shared, structure
|
||||
|
||||
|
||||
# + __bool__
|
||||
# + __contains__
|
||||
# + __getitem__
|
||||
# + __setitem__
|
||||
# = cleanup
|
||||
# + get
|
||||
# + filter
|
||||
# = select
|
||||
# + vectors_to_send
|
||||
|
||||
|
||||
class TestObjectsSQL(unittest.TestCase):
|
||||
"""A test case for the sqlite inventory"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
shared.data_directory = tempfile.gettempdir()
|
||||
cls.tearDownClass()
|
||||
cls.objects = sql.Inventory()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.objects = None
|
||||
os.remove(os.path.join(shared.data_directory, 'objects.dat'))
|
||||
|
||||
def test_set_get(self):
|
||||
"""Put some objects and check presence and getting"""
|
||||
obj = structure.Object(
|
||||
int(time.time()), 42, 1, 1, object_payload=b'HELLO')
|
||||
self.assertFalse(obj.vector in self.objects)
|
||||
with self.assertRaises(KeyError):
|
||||
self.objects[obj.vector] # pylint: disable=pointless-statement
|
||||
self.assertIsNone(self.objects.get(obj.vector))
|
||||
prev_len = len(self.objects)
|
||||
self.objects[obj.vector] = obj
|
||||
self.objects[obj.vector] = obj
|
||||
self.assertTrue(self.objects)
|
||||
self.assertEqual(len(self.objects), prev_len + 1)
|
||||
self.assertTrue(obj.vector in self.objects)
|
||||
obj1 = self.objects[obj.vector]
|
||||
self.assertEqual(obj.vector, obj1.vector)
|
||||
self.assertEqual(obj.data, obj1.data)
|
||||
|
||||
def test_vectors_to_send(self):
|
||||
"""Check vectors_to_send method"""
|
||||
needed = set()
|
||||
for _ in range(10):
|
||||
# wrong stream
|
||||
obj = structure.Object(
|
||||
int(time.time()) + 10, 42, 1, random.randint(1, 3),
|
||||
object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
# expired
|
||||
obj = structure.Object(
|
||||
int(time.time()) - 10, 42, 1, 4, object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
# interesting
|
||||
obj = structure.Object(
|
||||
int(time.time()) + 10, 42, 1, 4, object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
needed.add(obj.vector)
|
||||
|
||||
self.assertEqual(set(self.objects.vectors_to_send(4)), needed)
|
||||
self.assertTrue(set(self.objects.vectors_to_send()).difference(needed))
|
||||
|
||||
def test_filter(self):
|
||||
"""Check the objects filtering"""
|
||||
needed = set()
|
||||
tagged = set()
|
||||
tag = b'@' * 32
|
||||
for _ in range(10):
|
||||
# wrong type
|
||||
obj = structure.Object(
|
||||
int(time.time()), 0, 1, 5, object_payload=os.urandom(64))
|
||||
self.objects[obj.vector] = obj
|
||||
# wrong type, but the proper tag
|
||||
obj = structure.Object(
|
||||
int(time.time()) - 11000, 0, 4, random.randint(1, 5),
|
||||
object_payload=tag + os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
tagged.add(obj.vector)
|
||||
# wrong stream
|
||||
obj = structure.Object(
|
||||
int(time.time()), 33, 1, 1, object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
# interesting
|
||||
obj = structure.Object(
|
||||
int(time.time()) - 11000, 33, 1, 5,
|
||||
object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
needed.add(obj.vector)
|
||||
|
||||
# stream and type
|
||||
self.assertTrue(needed)
|
||||
for obj in self.objects.filter(5, 33):
|
||||
needed.remove(obj.vector)
|
||||
self.assertFalse(needed)
|
||||
|
||||
# tag
|
||||
self.assertTrue(tagged)
|
||||
for obj in self.objects.filter(tag=tag):
|
||||
tagged.remove(obj.vector)
|
||||
self.assertFalse(tagged)
|
||||
|
||||
def test_cleanup(self):
|
||||
"""Check cleaning up"""
|
||||
for _ in range(10):
|
||||
obj = structure.Object(
|
||||
int(time.time()) - random.randint(4, 5) * 3600,
|
||||
42, 1, 6, object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
obj = structure.Object(
|
||||
int(time.time()) - 2 * 3600,
|
||||
42, 1, 6, object_payload=os.urandom(32))
|
||||
self.objects[obj.vector] = obj
|
||||
|
||||
for obj in self.objects.values():
|
||||
if obj.is_expired():
|
||||
break
|
||||
else:
|
||||
self.fail('No objects found to delete')
|
||||
|
||||
self.objects.cleanup()
|
||||
self.assertTrue(self.objects)
|
||||
for obj in self.objects.values():
|
||||
self.assertFalse(obj.is_expired())
|
||||
|
||||
def test_select(self):
|
||||
"""Check the select method"""
|
||||
pending = set()
|
||||
questionable = set()
|
||||
|
||||
for _ in range(5):
|
||||
obj = structure.Object(
|
||||
int(time.time()) - 10, 42, 1, 7, object_payload=os.urandom(32))
|
||||
questionable.add(obj.vector)
|
||||
self.objects[obj.vector] = obj
|
||||
obj = structure.Object(
|
||||
int(time.time()) + 10, 42, 1, 7, object_payload=os.urandom(32))
|
||||
questionable.add(obj.vector)
|
||||
pending.add(obj.vector)
|
||||
|
||||
self.assertEqual(self.objects.select(questionable), pending)
|
Loading…
Reference in New Issue
Block a user