Inventory storage abstraction

- can have multiple storage types for inventory
- sqlite is the old one, filesystem is a new available
This commit is contained in:
Peter Šurda 2017-05-27 19:03:27 +02:00
parent 1d87c63504
commit 36b5e2c04f
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 329 additions and 72 deletions

View File

@ -1,87 +1,37 @@
import collections import collections
from importlib import import_module
from threading import current_thread, enumerate as threadingEnumerate, RLock from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue import Queue
import time import time
import sys
from bmconfigparser import BMConfigParser
from helper_sql import * from helper_sql import *
from singleton import Singleton from singleton import Singleton
# TODO make this dynamic, and watch out for frozen, like with messagetypes
import storage.sqlite
import storage.filesystem
@Singleton @Singleton
class Inventory(collections.MutableMapping): class Inventory():
def __init__(self): def __init__(self):
super(self.__class__, self).__init__() #super(self.__class__, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). self._moduleName = BMConfigParser().safeGet("inventory", "storage")
self.numberOfInventoryLookupsPerformed = 0 #import_module("." + self._moduleName, "storage")
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. #import_module("storage." + self._moduleName)
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) self._className = "storage." + self._moduleName + "." + self._moduleName.title() + "Inventory"
self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') self._inventoryClass = eval(self._className)
self._realInventory = self._inventoryClass()
def __contains__(self, hash): # cheap inheritance copied from asyncore
with self.lock: def __getattr__(self, attr):
self.numberOfInventoryLookupsPerformed += 1 try:
if hash in self._inventory: realRet = getattr(self._realInventory, attr)
return True except AttributeError:
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr))
else:
def __getitem__(self, hash): return realRet
with self.lock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return self.InventoryItem(*rows[0])
def __setitem__(self, hash, value):
with self.lock:
value = self.InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)
def __delitem__(self, hash):
raise NotImplementedError
def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, type, tag):
with self.lock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values
def hashes_by_stream(self, stream):
with self.lock:
return self._streams[stream]
def unexpired_hashes_by_stream(self, stream):
with self.lock:
t = int(time.time())
hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes
def flush(self):
with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value)
self._inventory.clear()
def clean(self):
with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for objectHash, value in self.items():
self._streams[value.stream].add(objectHash)
class PendingDownloadQueue(Queue.Queue): class PendingDownloadQueue(Queue.Queue):

0
src/storage/__init__.py Normal file
View File

175
src/storage/filesystem.py Normal file
View File

@ -0,0 +1,175 @@
from binascii import hexlify, unhexlify
from os import listdir, makedirs, path, remove, rmdir
import string
from threading import RLock
import time
import traceback
from paths import lookupAppdataFolder
from storage import InventoryStorage, InventoryItem
class FilesystemInventory(InventoryStorage):
topDir = "inventory"
objectDir = "objects"
metadataFilename = "metadata"
dataFilename = "data"
def __init__(self):
super(self.__class__, self).__init__()
self.baseDir = path.join(lookupAppdataFolder(), FilesystemInventory.topDir)
for createDir in [self.baseDir, path.join(self.baseDir, "objects")]:
if path.exists(createDir):
if not path.isdir(createDir):
raise IOError("%s exists but it's not a directory" % (createDir))
else:
makedirs(createDir)
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self._inventory = {}
self._load()
def __contains__(self, hash):
retval = False
for streamDict in self._inventory.values():
if hash in streamDict:
return True
return False
def __getitem__(self, hash):
for streamDict in self._inventory.values():
try:
retval = streamDict[hash]
except KeyError:
continue
if retval.payload is None:
retval = InventoryItem(retval.type, retval.stream, self.getData(hash), retval.expires, retval.tag)
return retval
raise KeyError(hash)
def __setitem__(self, hash, value):
with self.lock:
value = InventoryItem(*value)
try:
makedirs(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash)))
except OSError:
pass
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename), 'w') as f:
f.write("%s,%s,%s,%s," % (value.type, value.stream, value.expires, hexlify(value.tag)))
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename), 'w') as f:
f.write(value.payload)
except IOError:
raise KeyError
try:
self._inventory[value.stream][hash] = value
except KeyError:
self._inventory[value.stream] = {}
self._inventory[value.stream][hash] = value
def __delitem__(self, hash):
for stream in self._inventory.keys():
try:
del self._inventory[stream][hash]
except KeyError:
pass
with self.lock:
try:
remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.metadataFilename))
except IOError:
pass
try:
remove(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash), FilesystemInventory.dataFilename))
except IOError:
pass
try:
rmdir(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hash)))
except IOError:
pass
def __iter__(self):
elems = []
for streamDict in self._inventory.values():
elems.extend (streamDict.keys())
return elems.__iter__()
def __len__(self):
retval = 0
for streamDict in self._inventory.values():
retval += len(streamDict)
return retval
def _load(self):
newInventory = {}
for hashId in self.object_list():
try:
objectType, streamNumber, expiresTime, tag = self.getMetadata(hashId)
try:
newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag)
except KeyError:
newInventory[streamNumber] = {}
newInventory[streamNumber][hashId] = InventoryItem(objectType, streamNumber, None, expiresTime, tag)
except KeyError:
print "error loading %s" % (hexlify(hashId))
pass
self._inventory = newInventory
for i, v in self._inventory.items():
print "loaded stream: %s, %i items" % (i, len(v))
def stream_list(self):
return self._inventory.keys()
def object_list(self):
return [unhexlify(x) for x in listdir(path.join(self.baseDir, FilesystemInventory.objectDir))]
def getData(self, hashId):
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.dataFilename), 'r') as f:
return f.read()
except IOError:
raise AttributeError
def getMetadata(self, hashId):
try:
with open(path.join(self.baseDir, FilesystemInventory.objectDir, hexlify(hashId), FilesystemInventory.metadataFilename), 'r') as f:
objectType, streamNumber, expiresTime, tag, undef = string.split(f.read(), ",", 4)
return [int(objectType), int(streamNumber), int(expiresTime), unhexlify(tag)]
except IOError:
raise KeyError
def by_type_and_tag(self, objectType, tag):
retval = []
for stream, streamDict in self._inventory:
for hashId, item in streamDict:
if item.type == objectType and item.tag == tag:
try:
if item.payload is None:
item.payload = self.getData(hashId)
except IOError:
continue
retval.append(InventoryItem(item.type, item.stream, item.payload, item.expires, item.tag))
return retval
def hashes_by_stream(self, stream):
try:
return self._inventory[stream].keys()
except KeyError:
return []
def unexpired_hashes_by_stream(self, stream):
t = int(time.time())
try:
return [x for x, value in self._inventory[stream].items() if value.expires > t]
except KeyError:
return []
def flush(self):
self._load()
def clean(self):
minTime = int(time.time()) - (60 * 60 * 30)
deletes = []
for stream, streamDict in self._inventory.items():
for hashId, item in streamDict.items():
if item.expires < minTime:
deletes.append(hashId)
for hashId in deletes:
del self[hashId]

81
src/storage/sqlite.py Normal file
View File

@ -0,0 +1,81 @@
import collections
from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue
import time
from helper_sql import *
from storage import InventoryStorage, InventoryItem
class SqliteInventory(InventoryStorage):
def __init__(self):
super(self.__class__, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
def __contains__(self, hash):
with self.lock:
self.numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))
def __getitem__(self, hash):
with self.lock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return InventoryItem(*rows[0])
def __setitem__(self, hash, value):
with self.lock:
value = InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)
def __delitem__(self, hash):
raise NotImplementedError
def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, type, tag):
with self.lock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values
def hashes_by_stream(self, stream):
with self.lock:
return self._streams[stream]
def unexpired_hashes_by_stream(self, stream):
with self.lock:
t = int(time.time())
hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes
def flush(self):
with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', objectHash, *value)
self._inventory.clear()
def clean(self):
with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for objectHash, value in self.items():
self._streams[value.stream].add(objectHash)

51
src/storage/storage.py Normal file
View File

@ -0,0 +1,51 @@
import collections
InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
class Storage(object):
pass
# def __init__(self):
# super(self.__class__, self).__init__()
class InventoryStorage(Storage, collections.MutableMapping):
def __init__(self):
# super(self.__class__, self).__init__()
self.numberOfInventoryLookupsPerformed = 0
def __contains__(self, hash):
raise NotImplementedError
def __getitem__(self, hash):
raise NotImplementedError
def __setitem__(self, hash, value):
raise NotImplementedError
def __delitem__(self, hash):
raise NotImplementedError
def __iter__(self):
raise NotImplementedError
def __len__(self):
raise NotImplementedError
def by_type_and_tag(self, type, tag):
raise NotImplementedError
def hashes_by_stream(self, stream):
raise NotImplementedError
def unexpired_hashes_by_stream(self, stream):
raise NotImplementedError
def flush(self):
raise NotImplementedError
def clean(self):
raise NotImplementedError
class MailboxStorage(Storage, collections.MutableMapping):
def __init__(self):
# super(self.__class__, self).__init__()
pass