From 36b5e2c04f31c8ded967b511c0ae4bd801f3f807 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 27 May 2017 19:03:27 +0200 Subject: [PATCH] Inventory storage abstraction - can have multiple storage types for inventory - sqlite is the old one, filesystem is a new available --- src/inventory.py | 94 +++++--------------- src/storage/__init__.py | 0 src/storage/filesystem.py | 175 ++++++++++++++++++++++++++++++++++++++ src/storage/sqlite.py | 81 ++++++++++++++++++ src/storage/storage.py | 51 +++++++++++ 5 files changed, 329 insertions(+), 72 deletions(-) create mode 100644 src/storage/__init__.py create mode 100644 src/storage/filesystem.py create mode 100644 src/storage/sqlite.py create mode 100644 src/storage/storage.py diff --git a/src/inventory.py b/src/inventory.py index a796968a..b676415b 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,87 +1,37 @@ import collections +from importlib import import_module from threading import current_thread, enumerate as threadingEnumerate, RLock import Queue import time +import sys +from bmconfigparser import BMConfigParser from helper_sql import * from singleton import Singleton +# TODO make this dynamic, and watch out for frozen, like with messagetypes +import storage.sqlite +import storage.filesystem @Singleton -class Inventory(collections.MutableMapping): +class Inventory(): def __init__(self): - super(self.__class__, self).__init__() - self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). - self.numberOfInventoryLookupsPerformed = 0 - self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. - self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) - self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') + #super(self.__class__, self).__init__() + self._moduleName = BMConfigParser().safeGet("inventory", "storage") + #import_module("." + self._moduleName, "storage") + #import_module("storage." + self._moduleName) + self._className = "storage." + self._moduleName + "." + self._moduleName.title() + "Inventory" + self._inventoryClass = eval(self._className) + self._realInventory = self._inventoryClass() - def __contains__(self, hash): - with self.lock: - self.numberOfInventoryLookupsPerformed += 1 - if hash in self._inventory: - return True - return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) - - def __getitem__(self, hash): - with self.lock: - if hash in self._inventory: - return self._inventory[hash] - rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) - if not rows: - raise KeyError(hash) - return self.InventoryItem(*rows[0]) - - def __setitem__(self, hash, value): - with self.lock: - value = self.InventoryItem(*value) - self._inventory[hash] = value - self._streams[value.stream].add(hash) - - def __delitem__(self, hash): - raise NotImplementedError - - def __iter__(self): - with self.lock: - hashes = self._inventory.keys()[:] - hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) - return hashes.__iter__() - - def __len__(self): - with self.lock: - return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] - - def by_type_and_tag(self, type, tag): - with self.lock: - values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] - values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) - return values - - def hashes_by_stream(self, stream): - with self.lock: - return self._streams[stream] - - def unexpired_hashes_by_stream(self, stream): - with self.lock: - t = int(time.time()) - hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t] - hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) - return hashes - - def flush(self): - with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. - with SqlBulkExecute() as sql: - for objectHash, value in self._inventory.items(): - sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value) - self._inventory.clear() - - def clean(self): - with self.lock: - sqlExecute('DELETE FROM inventory WHERE expirestime t] + except KeyError: + return [] + + def flush(self): + self._load() + + def clean(self): + minTime = int(time.time()) - (60 * 60 * 30) + deletes = [] + for stream, streamDict in self._inventory.items(): + for hashId, item in streamDict.items(): + if item.expires < minTime: + deletes.append(hashId) + for hashId in deletes: + del self[hashId] diff --git a/src/storage/sqlite.py b/src/storage/sqlite.py new file mode 100644 index 00000000..38e9c0a2 --- /dev/null +++ b/src/storage/sqlite.py @@ -0,0 +1,81 @@ +import collections +from threading import current_thread, enumerate as threadingEnumerate, RLock +import Queue +import time + +from helper_sql import * +from storage import InventoryStorage, InventoryItem + +class SqliteInventory(InventoryStorage): + def __init__(self): + super(self.__class__, self).__init__() + self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). + self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. + self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) + + def __contains__(self, hash): + with self.lock: + self.numberOfInventoryLookupsPerformed += 1 + if hash in self._inventory: + return True + return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) + + def __getitem__(self, hash): + with self.lock: + if hash in self._inventory: + return self._inventory[hash] + rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) + if not rows: + raise KeyError(hash) + return InventoryItem(*rows[0]) + + def __setitem__(self, hash, value): + with self.lock: + value = InventoryItem(*value) + self._inventory[hash] = value + self._streams[value.stream].add(hash) + + def __delitem__(self, hash): + raise NotImplementedError + + def __iter__(self): + with self.lock: + hashes = self._inventory.keys()[:] + hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) + return hashes.__iter__() + + def __len__(self): + with self.lock: + return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] + + def by_type_and_tag(self, type, tag): + with self.lock: + values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] + values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) + return values + + def hashes_by_stream(self, stream): + with self.lock: + return self._streams[stream] + + def unexpired_hashes_by_stream(self, stream): + with self.lock: + t = int(time.time()) + hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t] + hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) + return hashes + + def flush(self): + with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for objectHash, value in self._inventory.items(): + sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value) + self._inventory.clear() + + def clean(self): + with self.lock: + sqlExecute('DELETE FROM inventory WHERE expirestime