MiNode/minode/sql.py

189 lines
6.2 KiB
Python

"""Inventory implementation using sqlite"""
import logging
import os
import sqlite3
import threading
import time
from . import shared, structure
sqlite3.threadsafety = 3
class Inventory():
"""sqlite inventory"""
def __init__(self):
self._lock = threading.Lock()
self._deleted = 0
self._pending = set()
self._db = sqlite3.connect(
os.path.join(shared.data_directory, 'objects.dat'),
check_same_thread=False
)
self._db.executescript("""
BEGIN;
CREATE TABLE IF NOT EXISTS status
(key text, value integer, UNIQUE(key) ON CONFLICT REPLACE);
INSERT INTO status VALUES ('version', 1);
CREATE TABLE IF NOT EXISTS objects
(vector unique, expires integer, type integer, version integer,
stream integer, tag, data, offset integer);
COMMIT;
""")
self.rowid = len(self) or None
try:
self.lastvacuumtime = self._db.execute(
"SELECT value FROM status WHERE key='lastvacuumtime'"
).fetchone()[0]
except TypeError:
self.lastvacuumtime = int(time.time())
self._db.execute(
"INSERT INTO status VALUES ('lastvacuumtime', ?)",
(self.lastvacuumtime,)
)
self._db.commit()
self._db.row_factory = self.__object
@staticmethod
def __object(cursor, row):
if len(cursor.description) != 8:
return row
vector, expires, obj_type, version, stream, tag, data, offset = row
return structure.Object(
expires, obj_type, version, stream, data, offset,
tag=tag, vector=vector)
def cleanup(self):
if len(self._pending) > 100:
logging.warning(
'Not cleaning up, %s objects pending', len(self._pending))
return
with self._lock:
now = int(time.time())
cur = self._db.execute(
'DELETE FROM objects WHERE expires < ?', (now - 3 * 3600,))
self._db.commit()
self._deleted += cur.rowcount
logging.info(
'Deleted %s expired objects, %s pending',
cur.rowcount, len(self._pending))
# conditional vacuum and validity check
# every 24 hours or after deleting a lot of items
if self._deleted > 10000 or self.lastvacuumtime < now - 86400:
logging.info('Doing VACUUM for objects')
cur.execute('VACUUM')
cur.execute(
"INSERT INTO status VALUES ('lastvacuumtime', ?)", (now,))
self._db.commit()
self.lastvacuumtime = now
self._deleted = 0
def filter(self, stream=None, object_type=None, tag=None):
clauses = []
if stream:
clauses.append(('stream = ?', stream))
if object_type:
clauses.append(('type = ?', object_type))
if tag:
clauses.append(('tag = ?', tag))
clauses, params = zip(*clauses)
cur = self._db.execute(
'SELECT * FROM objects WHERE ' # nosec B608
+ ' AND '.join(clauses), params)
return cur
def select(self, vectors):
chunk_size = 999
keys = tuple(vectors)
with self._lock:
for i in range(0, len(vectors), chunk_size):
chunk = keys[i:i+chunk_size]
cur = self._db.execute(
'SELECT vector FROM objects WHERE vector IN' # nosec B608
' ({})'.format(','.join('?' * len(chunk))),
chunk)
for v, in cur:
vectors.remove(v)
self._pending.update(vectors)
return vectors
def vectors_to_send(self, stream=None):
cur = self._db.execute(
'SELECT vector FROM objects WHERE expires > ? AND stream = ?'
' ORDER BY random()',
(int(time.time()), stream or shared.stream)
)
return [v for v, in cur]
def get(self, vector, default=None):
try:
return self[vector]
except KeyError:
return default
def keys(self):
cur = self._db.execute('SELECT vector FROM objects')
return (v for v, in cur)
def values(self):
return self._db.execute('SELECT * FROM objects')
def popitem(self):
if not self.rowid:
raise KeyError('empty')
cur = self._db.execute(
'SELECT vector FROM objects WHERE ROWID = ?', (self.rowid,))
vector = cur.fetchone()[0]
obj = self.get(vector)
del self[vector]
return (vector, obj)
def __contains__(self, vector):
cur = self._db.execute(
'SELECT vector FROM objects WHERE vector = ?', (vector,))
return cur.fetchone()
def __getitem__(self, vector):
cur = self._db.execute(
'SELECT * FROM objects WHERE vector = ?', (vector,))
cur.row_factory = self.__object
item = cur.fetchone()
if item is None:
raise KeyError(vector)
return item
def __delitem__(self, vector):
with self._lock: # KeyError
self._db.execute(
'DELETE FROM objects WHERE vector = ?', (vector,))
self._db.commit()
self.rowid = len(self)
def __setitem__(self, vector, obj):
with self._lock:
try:
cur = self._db.execute(
'INSERT INTO objects VALUES (?,?,?,?,?,?,?,?)', (
vector, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.tag, obj.data, obj.offset
))
except (sqlite3.DatabaseError, sqlite3.IntegrityError):
return
self._db.commit()
self.rowid = cur.lastrowid
self._pending.discard(vector)
def __bool__(self):
return self._db.execute(
'SELECT vector from objects LIMIT 1').fetchone() is not None
def __len__(self):
cur = self._db.execute('SELECT count(*) FROM objects')
return cur.fetchone()[0]
def __del__(self):
self._db.close()