"""Inventory implementation using sqlite""" import os import sqlite3 import time from . import shared, structure sqlite3.threadsafety = 3 class Inventory(): """sqlite inventory""" def __init__(self): self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False ) self._db.executescript(""" BEGIN; CREATE TABLE IF NOT EXISTS status (vacuumtime integer); CREATE TABLE IF NOT EXISTS objects (vector unique, expires integer, type integer, version integer, stream integer, tag, data, offset integer); COMMIT; """) self.rowid = len(self) or None # self._cur.execute('INSERT INTO status VALUES (?)', int(time.time())) def __objects(self, cur): return ( structure.Object( expires, obj_type, version, stream, data, offset, tag=tag, vector=vector) for (vector, expires, obj_type, version, stream, tag, data, offset) in cur.fetchall() ) def cleanup(self): with shared.objects_lock: self._db.execute( 'DELETE FROM objects WHERE expires < ?', (int(time.time()) - 3 * 3600,) ) self._db.commit() # conditional vacuum and validity check def filter(self, stream=None, object_type=None, tag=None): clauses = [] if stream: clauses.append(('stream = ?', stream)) if object_type: clauses.append(('type = ?', object_type)) if tag: clauses.append(('tag = ?', tag)) clauses, params = zip(*clauses) cur = self._db.execute( 'SELECT * FROM objects WHERE ' + ' AND '.join(clauses), params) return self.__objects(cur) def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' ' ORDER BY random()', (int(time.time()), stream or shared.stream) ) return [v for v, in cur.fetchall()] def get(self, vector, default=None): try: return self[vector] except KeyError: return default def keys(self): cur = self._db.execute('SELECT vector FROM objects') return (v for v, in cur.fetchall()) def values(self): cur = self._db.execute('SELECT * FROM objects') return self.__objects(cur) def popitem(self): if not self.rowid: raise KeyError('empty') cur = self._db.execute( 'SELECT vector FROM objects WHERE ROWID = ?', (self.rowid,)) vector = cur.fetchone()[0] obj = self.get(vector) del self[vector] return (vector, obj) def __contains__(self, vector): cur = self._db.execute( 'SELECT vector FROM objects WHERE vector = ?', (vector,)) return cur.fetchone() def __getitem__(self, vector): cur = self._db.execute( 'SELECT * FROM objects WHERE vector = ?', (vector,)) item = cur.fetchone() if item is None: raise KeyError(vector) vector, expires, obj_type, version, stream, tag, data, offset = item return structure.Object( expires, obj_type, version, stream, data, offset, tag=tag, vector=vector ) def __delitem__(self, vector): with shared.objects_lock: # KeyError cur = self._db.execute( 'DELETE FROM objects WHERE vector = ?', (vector,)) self._db.commit() if cur.lastrowid == self.rowid: self.rowid = len(self) or None def __setitem__(self, vector, obj): with shared.objects_lock: cur = self._db.execute( 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', ( vector, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.tag, obj.data, obj.offset )) self._db.commit() self.rowid = cur.lastrowid def __len__(self): cur = self._db.execute('SELECT count(*) FROM objects') return cur.fetchone()[0] def __del__(self): self._db.close()