"""Inventory implementation using sqlite""" import logging import os import sqlite3 import threading import time from . import shared, structure sqlite3.threadsafety = 3 class Inventory(): """sqlite inventory""" def __init__(self): self._lock = threading.Lock() self._deleted = 0 self._db = sqlite3.connect( os.path.join(shared.data_directory, 'objects.dat'), check_same_thread=False ) self._db.executescript(""" BEGIN; CREATE TABLE IF NOT EXISTS status (key text, value integer, UNIQUE(key) ON CONFLICT REPLACE); INSERT INTO status VALUES ('version', 1); CREATE TABLE IF NOT EXISTS objects (vector unique, expires integer, type integer, version integer, stream integer, tag, data, offset integer); COMMIT; """) self.rowid = len(self) or None try: self.lastvacuumtime = self._db.execute( "SELECT value FROM status WHERE key='lastvacuumtime'" ).fetchone()[0] except TypeError: self.lastvacuumtime = int(time.time()) self._db.execute( "INSERT INTO status VALUES ('lastvacuumtime', ?)", (self.lastvacuumtime,) ) self._db.commit() self._db.row_factory = self.__object @staticmethod def __object(cursor, row): if len(cursor.description) != 8: return row vector, expires, obj_type, version, stream, tag, data, offset = row return structure.Object( expires, obj_type, version, stream, data, offset, tag=tag, vector=vector) def cleanup(self): with self._lock: now = int(time.time()) cur = self._db.execute( 'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,)) self._db.commit() self._deleted += cur.rowcount logging.debug('Deleted %s expired objects', cur.rowcount) # conditional vacuum and validity check (TODO) # every 24 hours or after deleting a lot of items if self._deleted > 10000 or self.lastvacuumtime < now - 86400: logging.info('Doing VACUUM for objects') cur.execute('VACUUM') cur.execute( "INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,)) self._db.commit() self._deleted = 0 self.lastvacuumtime = now def filter(self, stream=None, object_type=None, tag=None): clauses = [] if stream: clauses.append(('stream = ?', stream)) if object_type is not None: clauses.append(('type = ?', object_type)) if tag: clauses.append(('tag = ?', tag)) clauses, params = zip(*clauses) cur = self._db.execute( 'SELECT * FROM objects WHERE ' # nosec B608 + ' AND '.join(clauses), params) return cur def vectors_to_send(self, stream=None): cur = self._db.execute( 'SELECT vector FROM objects WHERE expires > ? AND stream = ?' ' ORDER BY random()', (int(time.time()), stream or shared.stream) ) return [v for v, in cur] def get(self, vector, default=None): try: return self[vector] except KeyError: return default def keys(self): cur = self._db.execute('SELECT vector FROM objects') return (v for v, in cur) def values(self): return self._db.execute('SELECT * FROM objects') def popitem(self): if not self.rowid: raise KeyError('empty') cur = self._db.execute( 'SELECT vector FROM objects WHERE ROWID = ?', (self.rowid,)) vector = cur.fetchone()[0] obj = self.get(vector) del self[vector] return (vector, obj) def __contains__(self, vector): cur = self._db.execute( 'SELECT vector FROM objects WHERE vector = ?', (vector,)) return cur.fetchone() def __getitem__(self, vector): item = self._db.execute( 'SELECT * FROM objects WHERE vector = ?', (vector,)).fetchone() if item is None: raise KeyError(vector) return item def __delitem__(self, vector): with self._lock: # KeyError self._db.execute('DELETE FROM objects WHERE vector = ?', (vector,)) self._db.commit() self.rowid = len(self) def __setitem__(self, vector, obj): with self._lock: cur = self._db.execute( 'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', ( vector, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.tag, obj.data, obj.offset )) self._db.commit() self.rowid = cur.lastrowid def __bool__(self): return self._db.execute( 'SELECT vector from objects LIMIT 1').fetchone() is not None def __len__(self): cur = self._db.execute('SELECT count(*) FROM objects') return cur.fetchone()[0] def __del__(self): self._db.close()