154 lines
5.1 KiB
Python
154 lines
5.1 KiB
Python
|
"""Tests for the Inventory implementation"""
|
||
|
import os
|
||
|
import random
|
||
|
import tempfile
|
||
|
import time
|
||
|
import unittest
|
||
|
|
||
|
from minode import sql, shared, structure
|
||
|
|
||
|
|
||
|
# + __bool__
|
||
|
# + __contains__
|
||
|
# + __getitem__
|
||
|
# + __setitem__
|
||
|
# = cleanup
|
||
|
# + get
|
||
|
# + filter
|
||
|
# = select
|
||
|
# + vectors_to_send
|
||
|
|
||
|
|
||
|
class TestObjectsSQL(unittest.TestCase):
|
||
|
"""A test case for the sqlite inventory"""
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
shared.data_directory = tempfile.gettempdir()
|
||
|
cls.tearDownClass()
|
||
|
cls.objects = sql.Inventory()
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
cls.objects = None
|
||
|
os.remove(os.path.join(shared.data_directory, 'objects.dat'))
|
||
|
|
||
|
def test_set_get(self):
|
||
|
"""Put some objects and check presence and getting"""
|
||
|
obj = structure.Object(
|
||
|
int(time.time()), 42, 1, 1, object_payload=b'HELLO')
|
||
|
self.assertFalse(obj.vector in self.objects)
|
||
|
with self.assertRaises(KeyError):
|
||
|
self.objects[obj.vector] # pylint: disable=pointless-statement
|
||
|
self.assertIsNone(self.objects.get(obj.vector))
|
||
|
prev_len = len(self.objects)
|
||
|
self.objects[obj.vector] = obj
|
||
|
self.objects[obj.vector] = obj
|
||
|
self.assertTrue(self.objects)
|
||
|
self.assertEqual(len(self.objects), prev_len + 1)
|
||
|
self.assertTrue(obj.vector in self.objects)
|
||
|
obj1 = self.objects[obj.vector]
|
||
|
self.assertEqual(obj.vector, obj1.vector)
|
||
|
self.assertEqual(obj.data, obj1.data)
|
||
|
|
||
|
def test_vectors_to_send(self):
|
||
|
"""Check vectors_to_send method"""
|
||
|
needed = set()
|
||
|
for _ in range(10):
|
||
|
# wrong stream
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) + 10, 42, 1, random.randint(1, 3),
|
||
|
object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
# expired
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - 10, 42, 1, 4, object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
# interesting
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) + 10, 42, 1, 4, object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
needed.add(obj.vector)
|
||
|
|
||
|
self.assertEqual(set(self.objects.vectors_to_send(4)), needed)
|
||
|
self.assertTrue(set(self.objects.vectors_to_send()).difference(needed))
|
||
|
|
||
|
def test_filter(self):
|
||
|
"""Check the objects filtering"""
|
||
|
needed = set()
|
||
|
tagged = set()
|
||
|
tag = b'@' * 32
|
||
|
for _ in range(10):
|
||
|
# wrong type
|
||
|
obj = structure.Object(
|
||
|
int(time.time()), 0, 1, 5, object_payload=os.urandom(64))
|
||
|
self.objects[obj.vector] = obj
|
||
|
# wrong type, but the proper tag
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - 11000, 0, 4, random.randint(1, 5),
|
||
|
object_payload=tag + os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
tagged.add(obj.vector)
|
||
|
# wrong stream
|
||
|
obj = structure.Object(
|
||
|
int(time.time()), 33, 1, 1, object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
# interesting
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - 11000, 33, 1, 5,
|
||
|
object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
needed.add(obj.vector)
|
||
|
|
||
|
# stream and type
|
||
|
self.assertTrue(needed)
|
||
|
for obj in self.objects.filter(5, 33):
|
||
|
needed.remove(obj.vector)
|
||
|
self.assertFalse(needed)
|
||
|
|
||
|
# tag
|
||
|
self.assertTrue(tagged)
|
||
|
for obj in self.objects.filter(tag=tag):
|
||
|
tagged.remove(obj.vector)
|
||
|
self.assertFalse(tagged)
|
||
|
|
||
|
def test_cleanup(self):
|
||
|
"""Check cleaning up"""
|
||
|
for _ in range(10):
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - random.randint(4, 5) * 3600,
|
||
|
42, 1, 6, object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - 2 * 3600,
|
||
|
42, 1, 6, object_payload=os.urandom(32))
|
||
|
self.objects[obj.vector] = obj
|
||
|
|
||
|
for obj in self.objects.values():
|
||
|
if obj.is_expired():
|
||
|
break
|
||
|
else:
|
||
|
self.fail('No objects found to delete')
|
||
|
|
||
|
self.objects.cleanup()
|
||
|
self.assertTrue(self.objects)
|
||
|
for obj in self.objects.values():
|
||
|
self.assertFalse(obj.is_expired())
|
||
|
|
||
|
def test_select(self):
|
||
|
"""Check the select method"""
|
||
|
pending = set()
|
||
|
questionable = set()
|
||
|
|
||
|
for _ in range(5):
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) - 10, 42, 1, 7, object_payload=os.urandom(32))
|
||
|
questionable.add(obj.vector)
|
||
|
self.objects[obj.vector] = obj
|
||
|
obj = structure.Object(
|
||
|
int(time.time()) + 10, 42, 1, 7, object_payload=os.urandom(32))
|
||
|
questionable.add(obj.vector)
|
||
|
pending.add(obj.vector)
|
||
|
|
||
|
self.assertEqual(self.objects.select(questionable), pending)
|