MiNode/minode/sql.py

136 lines
4.2 KiB
Python

"""Inventory implementation using sqlite"""
import os
import sqlite3
import time
from . import shared, structure
sqlite3.threadsafety = 3
class Inventory():
"""sqlite inventory"""
def __init__(self):
self._db = sqlite3.connect(
os.path.join(shared.data_directory, 'objects.dat'),
check_same_thread=False
)
self._db.executescript("""
BEGIN;
CREATE TABLE IF NOT EXISTS status (vacuumtime integer);
CREATE TABLE IF NOT EXISTS objects
(vector unique, expires integer, type integer, version integer,
stream integer, tag, data, offset integer);
COMMIT;
""")
self.rowid = len(self) or None
# self._cur.execute('INSERT INTO status VALUES (?)', int(time.time()))
def __objects(self, cur):
return (
structure.Object(
expires, obj_type, version, stream, data, offset,
tag=tag, vector=vector)
for (vector, expires, obj_type, version, stream, tag, data, offset)
in cur.fetchall()
)
def cleanup(self):
with shared.objects_lock:
self._db.execute(
'DELETE FROM objects WHERE expires < ?',
(int(time.time()) - 3 * 3600,)
)
self._db.commit()
# conditional vacuum and validity check
def filter(self, stream=None, object_type=None, tag=None):
clauses = []
if stream:
clauses.append(('stream = ?', stream))
if object_type:
clauses.append(('type = ?', object_type))
if tag:
clauses.append(('tag = ?', tag))
clauses, params = zip(*clauses)
cur = self._db.execute(
'SELECT * FROM objects WHERE ' + ' AND '.join(clauses), params)
return self.__objects(cur)
def vectors_to_send(self, stream=None):
cur = self._db.execute(
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
' ORDER BY random()',
(int(time.time()), stream or shared.stream)
)
return [v for v, in cur.fetchall()]
def get(self, vector, default=None):
try:
return self[vector]
except KeyError:
return default
def keys(self):
cur = self._db.execute('SELECT vector FROM objects')
return (v for v, in cur.fetchall())
def values(self):
cur = self._db.execute('SELECT * FROM objects')
return self.__objects(cur)
def popitem(self):
if not self.rowid:
raise KeyError('empty')
cur = self._db.execute(
'SELECT vector FROM objects WHERE ROWID = ?', (self.rowid,))
vector = cur.fetchone()[0]
obj = self.get(vector)
del self[vector]
return (vector, obj)
def __contains__(self, vector):
cur = self._db.execute(
'SELECT vector FROM objects WHERE vector = ?', (vector,))
return cur.fetchone()
def __getitem__(self, vector):
cur = self._db.execute(
'SELECT * FROM objects WHERE vector = ?', (vector,))
item = cur.fetchone()
if item is None:
raise KeyError(vector)
vector, expires, obj_type, version, stream, tag, data, offset = item
return structure.Object(
expires, obj_type, version, stream, data, offset,
tag=tag, vector=vector
)
def __delitem__(self, vector):
with shared.objects_lock: # KeyError
cur = self._db.execute(
'DELETE FROM objects WHERE vector = ?', (vector,))
self._db.commit()
if cur.lastrowid == self.rowid:
self.rowid = len(self) or None
def __setitem__(self, vector, obj):
with shared.objects_lock:
cur = self._db.execute(
'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', (
vector, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.tag, obj.data, obj.offset
))
self._db.commit()
self.rowid = cur.lastrowid
def __len__(self):
cur = self._db.execute('SELECT count(*) FROM objects')
return cur.fetchone()[0]
def __del__(self):
self._db.close()