diff --git a/src/inventory.py b/src/inventory.py index a796968a..b676415b 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,87 +1,37 @@ import collections +from importlib import import_module from threading import current_thread, enumerate as threadingEnumerate, RLock import Queue import time +import sys +from bmconfigparser import BMConfigParser from helper_sql import * from singleton import Singleton +# TODO make this dynamic, and watch out for frozen, like with messagetypes +import storage.sqlite +import storage.filesystem @Singleton -class Inventory(collections.MutableMapping): +class Inventory(): def __init__(self): - super(self.__class__, self).__init__() - self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). - self.numberOfInventoryLookupsPerformed = 0 - self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. - self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) - self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') + #super(self.__class__, self).__init__() + self._moduleName = BMConfigParser().safeGet("inventory", "storage") + #import_module("." + self._moduleName, "storage") + #import_module("storage." + self._moduleName) + self._className = "storage." + self._moduleName + "." + self._moduleName.title() + "Inventory" + self._inventoryClass = eval(self._className) + self._realInventory = self._inventoryClass() - def __contains__(self, hash): - with self.lock: - self.numberOfInventoryLookupsPerformed += 1 - if hash in self._inventory: - return True - return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) - - def __getitem__(self, hash): - with self.lock: - if hash in self._inventory: - return self._inventory[hash] - rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) - if not rows: - raise KeyError(hash) - return self.InventoryItem(*rows[0]) - - def __setitem__(self, hash, value): - with self.lock: - value = self.InventoryItem(*value) - self._inventory[hash] = value - self._streams[value.stream].add(hash) - - def __delitem__(self, hash): - raise NotImplementedError - - def __iter__(self): - with self.lock: - hashes = self._inventory.keys()[:] - hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) - return hashes.__iter__() - - def __len__(self): - with self.lock: - return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] - - def by_type_and_tag(self, type, tag): - with self.lock: - values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] - values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) - return values - - def hashes_by_stream(self, stream): - with self.lock: - return self._streams[stream] - - def unexpired_hashes_by_stream(self, stream): - with self.lock: - t = int(time.time()) - hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t] - hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) - return hashes - - def flush(self): - with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. - with SqlBulkExecute() as sql: - for objectHash, value in self._inventory.items(): - sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value) - self._inventory.clear() - - def clean(self): - with self.lock: - sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3)) - self._streams.clear() - for objectHash, value in self.items(): - self._streams[value.stream].add(objectHash) + # cheap inheritance copied from asyncore + def __getattr__(self, attr): + try: + realRet = getattr(self._realInventory, attr) + except AttributeError: + raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr)) + else: + return realRet class PendingDownloadQueue(Queue.Queue): diff --git a/src/storage/__init__.py b/src/storage/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/storage/filesystem.py b/src/storage/filesystem.py new file mode 100644 index 00000000..bb4d0e3f --- /dev/null +++ b/src/storage/filesystem.py @@ -0,0 +1,175 @@ +from binascii import hexlify, unhexlify +from os import listdir, makedirs, path, remove, rmdir +import string +from threading import RLock +import time +import traceback + +from paths import lookupAppdataFolder +from storage import InventoryStorage, InventoryItem + +class FilesystemInventory(InventoryStorage): + topDir = "inventory" + objectDir = "objects" + metadataFilename = "metadata" + dataFilename = "data" + + def __init__(self): + super(self.__class__, self).__init__() + self.baseDir = path.join(lookupAppdataFolder(), FilesystemInventory.topDir) + for createDir in [self.baseDir, path.join(self.baseDir, "objects")]: + if path.exists(createDir): + if not path.isdir(createDir): + raise IOError("%s exists but it's not a directory" % (createDir)) + else: + makedirs(createDir) + self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) + self._inventory = {} + self._load() + + def __contains__(self, hash): + retval = False + for streamDict in self._inventory.values(): + if hash in streamDict: + return True + return False + + def __getitem__(self, hash): + for streamDict in self._inventory.values(): + try: + retval = streamDict[hash] + except KeyError: + continue + if retval.payload is None: + retval = InventoryItem(retval.type, retval.stream, self.getData(hash), retval.expires, retval.tag) + return retval + raise KeyError(hash) + + def __setitem__(self, hash, value): + with self.lock: + value = InventoryItem(*value) + try: + makedirs(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash))) + except OSError: + pass + try: + with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename), 'w') as f: + f.write("%s,%s,%s,%s," % (value.type, value.stream, value.expires, hexlify(value.tag))) + with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename), 'w') as f: + f.write(value.payload) + except IOError: + raise KeyError + try: + self._inventory[value.stream][hash] = value + except KeyError: + self._inventory[value.stream] = {} + self._inventory[value.stream][hash] = value + + def __delitem__(self, hash): + for stream in self._inventory.keys(): + try: + del self._inventory[stream][hash] + except KeyError: + pass + with self.lock: + try: + remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename)) + except IOError: + pass + try: + remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename)) + except IOError: + pass + try: + rmdir(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash))) + except IOError: + pass + + def __iter__(self): + elems = [] + for streamDict in self._inventory.values(): + elems.extend (streamDict.keys()) + return elems.__iter__() + + def __len__(self): + retval = 0 + for streamDict in self._inventory.values(): + retval += len(streamDict) + return retval + + def _load(self): + newInventory = {} + for hashId in self.object_list(): + try: + objectType, streamNumber, expiresTime, tag = self.getMetadata(hashId) + try: + newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag) + except KeyError: + newInventory[streamNumber] = {} + newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag) + except KeyError: + print "error loading %s" % (hexlify(hashId)) + pass + self._inventory = newInventory + for i, v in self._inventory.items(): + print "loaded stream: %s, %i items" % (i, len(v)) + + def stream_list(self): + return self._inventory.keys() + + def object_list(self): + return [unhexlify(x) for x in listdir(path.join(self.baseDir, FilesystemInventory.objectDir))] + + def getData(self, hashId): + try: + with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.dataFilename), 'r') as f: + return f.read() + except IOError: + raise AttributeError + + def getMetadata(self, hashId): + try: + with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.metadataFilename), 'r') as f: + objectType, streamNumber, expiresTime, tag, undef = string.split(f.read(), ",", 4) + return [int(objectType), int(streamNumber), int(expiresTime), unhexlify(tag)] + except IOError: + raise KeyError + + def by_type_and_tag(self, objectType, tag): + retval = [] + for stream, streamDict in self._inventory: + for hashId, item in streamDict: + if item.type == objectType and item.tag == tag: + try: + if item.payload is None: + item.payload = self.getData(hashId) + except IOError: + continue + retval.append(InventoryItem(item.type, item.stream, item.payload, item.expires, item.tag)) + return retval + + def hashes_by_stream(self, stream): + try: + return self._inventory[stream].keys() + except KeyError: + return [] + + def unexpired_hashes_by_stream(self, stream): + t = int(time.time()) + try: + return [x for x, value in self._inventory[stream].items() if value.expires > t] + except KeyError: + return [] + + def flush(self): + self._load() + + def clean(self): + minTime = int(time.time()) - (60 * 60 * 30) + deletes = [] + for stream, streamDict in self._inventory.items(): + for hashId, item in streamDict.items(): + if item.expires < minTime: + deletes.append(hashId) + for hashId in deletes: + del self[hashId] diff --git a/src/storage/sqlite.py b/src/storage/sqlite.py new file mode 100644 index 00000000..38e9c0a2 --- /dev/null +++ b/src/storage/sqlite.py @@ -0,0 +1,81 @@ +import collections +from threading import current_thread, enumerate as threadingEnumerate, RLock +import Queue +import time + +from helper_sql import * +from storage import InventoryStorage, InventoryItem + +class SqliteInventory(InventoryStorage): + def __init__(self): + super(self.__class__, self).__init__() + self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). + self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. + self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) + + def __contains__(self, hash): + with self.lock: + self.numberOfInventoryLookupsPerformed += 1 + if hash in self._inventory: + return True + return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) + + def __getitem__(self, hash): + with self.lock: + if hash in self._inventory: + return self._inventory[hash] + rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) + if not rows: + raise KeyError(hash) + return InventoryItem(*rows[0]) + + def __setitem__(self, hash, value): + with self.lock: + value = InventoryItem(*value) + self._inventory[hash] = value + self._streams[value.stream].add(hash) + + def __delitem__(self, hash): + raise NotImplementedError + + def __iter__(self): + with self.lock: + hashes = self._inventory.keys()[:] + hashes += (x for x, in sqlQuery('SELECT hash FROM inventory')) + return hashes.__iter__() + + def __len__(self): + with self.lock: + return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] + + def by_type_and_tag(self, type, tag): + with self.lock: + values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] + values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) + return values + + def hashes_by_stream(self, stream): + with self.lock: + return self._streams[stream] + + def unexpired_hashes_by_stream(self, stream): + with self.lock: + t = int(time.time()) + hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t] + hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) + return hashes + + def flush(self): + with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for objectHash, value in self._inventory.items(): + sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value) + self._inventory.clear() + + def clean(self): + with self.lock: + sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3)) + self._streams.clear() + for objectHash, value in self.items(): + self._streams[value.stream].add(objectHash) + diff --git a/src/storage/storage.py b/src/storage/storage.py new file mode 100644 index 00000000..22dfbef5 --- /dev/null +++ b/src/storage/storage.py @@ -0,0 +1,51 @@ +import collections + +InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') + +class Storage(object): + pass +# def __init__(self): +# super(self.__class__, self).__init__() + +class InventoryStorage(Storage, collections.MutableMapping): + def __init__(self): +# super(self.__class__, self).__init__() + self.numberOfInventoryLookupsPerformed = 0 + + def __contains__(self, hash): + raise NotImplementedError + + def __getitem__(self, hash): + raise NotImplementedError + + def __setitem__(self, hash, value): + raise NotImplementedError + + def __delitem__(self, hash): + raise NotImplementedError + + def __iter__(self): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def by_type_and_tag(self, type, tag): + raise NotImplementedError + + def hashes_by_stream(self, stream): + raise NotImplementedError + + def unexpired_hashes_by_stream(self, stream): + raise NotImplementedError + + def flush(self): + raise NotImplementedError + + def clean(self): + raise NotImplementedError + +class MailboxStorage(Storage, collections.MutableMapping): + def __init__(self): +# super(self.__class__, self).__init__() + pass