written test case for randomtrackingdict module
-removed logger from singleinstance module -written test case for randomtrackingdict module -Fixed CQ issues -removed logger from test_randomtrackingdict module -remove main method from randomtrackingdict.py and fixed travis-ci issue -replace print with test failure -fixed flake8 CQ for network.randomtrackingdict module
This commit is contained in:
parent
9fe4ad0489
commit
f67ffad69d
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
Track randomize ordered dict
|
||||
"""
|
||||
import random
|
||||
from threading import RLock
|
||||
from time import time
|
||||
|
||||
|
@ -128,41 +127,3 @@ class RandomTrackingDict(object):
|
|||
self.pendingLen += 1
|
||||
self.lastPoll = time()
|
||||
return retval
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# pylint: disable=redefined-outer-name
|
||||
def randString():
|
||||
"""helper function for tests, generates a random string"""
|
||||
retval = b''
|
||||
for _ in range(32):
|
||||
retval += chr(random.randint(0, 255))
|
||||
return retval
|
||||
|
||||
a = []
|
||||
k = RandomTrackingDict()
|
||||
d = {}
|
||||
|
||||
print "populating random tracking dict"
|
||||
a.append(time())
|
||||
for i in range(50000):
|
||||
k[randString()] = True
|
||||
a.append(time())
|
||||
print "done"
|
||||
|
||||
while k:
|
||||
retval = k.randomKeys(1000)
|
||||
if not retval:
|
||||
print "error getting random keys"
|
||||
try:
|
||||
k.randomKeys(100)
|
||||
print "bad"
|
||||
except KeyError:
|
||||
pass
|
||||
for i in retval:
|
||||
del k[i]
|
||||
a.append(time())
|
||||
|
||||
for x in range(len(a) - 1):
|
||||
print "%i: %.3f" % (x, a[x + 1] - a[x])
|
||||
|
|
|
@ -4,6 +4,7 @@ Module for using filesystem (directory with files) for inventory storage
|
|||
import string
|
||||
import time
|
||||
from binascii import hexlify, unhexlify
|
||||
from debug import logger
|
||||
from os import listdir, makedirs, path, remove, rmdir
|
||||
from threading import RLock
|
||||
|
||||
|
@ -162,7 +163,7 @@ class FilesystemInventory(InventoryStorage):
|
|||
newInventory[streamNumber][hashId] = InventoryItem(
|
||||
objectType, streamNumber, None, expiresTime, tag)
|
||||
except KeyError:
|
||||
print "error loading %s" % (hexlify(hashId))
|
||||
logger.warning('error loading %s', hexlify(hashId))
|
||||
self._inventory = newInventory
|
||||
# for i, v in self._inventory.items():
|
||||
# print "loaded stream: %s, %i items" % (i, len(v))
|
||||
|
|
47
src/tests/test_randomtrackingdict.py
Normal file
47
src/tests/test_randomtrackingdict.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
"""
|
||||
Tests for RandomTrackingDict class
|
||||
"""
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from network import randomtrackingdict
|
||||
from time import time
|
||||
|
||||
|
||||
class TestRandomTrackingDict(unittest.TestCase):
|
||||
"""Main protocol test case"""
|
||||
|
||||
@staticmethod
|
||||
def randString():
|
||||
"""helper function for tests, generates a random string"""
|
||||
retval = b''
|
||||
for _ in range(32):
|
||||
retval += chr(random.randint(0, 255))
|
||||
return retval
|
||||
|
||||
def test_check_randomtrackingdict(self):
|
||||
"""Check the logic of RandomTrackingDict class"""
|
||||
a = []
|
||||
k = randomtrackingdict.RandomTrackingDict()
|
||||
|
||||
a.append(time())
|
||||
for i in range(50000):
|
||||
k[self.randString()] = True
|
||||
a.append(time())
|
||||
|
||||
while k:
|
||||
retval = k.randomKeys(1000)
|
||||
if not retval:
|
||||
self.fail("error getting random keys")
|
||||
|
||||
try:
|
||||
k.randomKeys(100)
|
||||
self.fail("bad")
|
||||
except KeyError:
|
||||
pass
|
||||
for i in retval:
|
||||
del k[i]
|
||||
a.append(time())
|
||||
|
||||
for x in range(len(a) - 1):
|
||||
self.assertLess(a[x + 1] - a[x], 10)
|
Reference in New Issue
Block a user