sqlite quality fixes

This commit is contained in:
lakshyacis 2019-12-24 18:23:37 +05:30
parent 6fad4f5665
commit 108b231c1c
No known key found for this signature in database
GPG Key ID: D2C539C8EC63E9EB
1 changed files with 34 additions and 17 deletions

View File

@ -1,6 +1,5 @@
""" """
src/storage/sqlite.py Sqlite Inventory
=========================
""" """
import sqlite3 import sqlite3
import time import time
@ -10,7 +9,7 @@ from helper_sql import sqlQuery, SqlBulkExecute, sqlExecute
from storage import InventoryStorage, InventoryItem from storage import InventoryStorage, InventoryItem
class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
"""Inventory using SQLite""" """Inventory using SQLite"""
def __init__(self): def __init__(self):
super(SqliteInventory, self).__init__() super(SqliteInventory, self).__init__()
@ -20,9 +19,11 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
# cache for existing objects, used for quick lookups if we have an object. # cache for existing objects, used for quick lookups if we have an object.
# This is used for example whenever we receive an inv message from a peer # This is used for example whenever we receive an inv message from a peer
# to check to see what items are new to us. # to check to see what items are new to us.
# We don't delete things out of it; instead, the singleCleaner thread clears and refills it. # We don't delete things out of it; instead,
# the singleCleaner thread clears and refills it.
self._objects = {} self._objects = {}
# Guarantees that two receiveDataThreads don't receive and process the same message concurrently # Guarantees that two receiveDataThreads don't receive
# and process the same message concurrently
# (probably sent by a malicious individual) # (probably sent by a malicious individual)
self.lock = RLock() self.lock = RLock()
@ -30,7 +31,9 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
with self.lock: with self.lock:
if hash_ in self._objects: if hash_ in self._objects:
return True return True
rows = sqlQuery('SELECT streamnumber FROM inventory WHERE hash=?', sqlite3.Binary(hash_)) rows = sqlQuery(
'SELECT streamnumber FROM inventory WHERE hash=?',
sqlite3.Binary(hash_))
if not rows: if not rows:
return False return False
self._objects[hash_] = rows[0][0] self._objects[hash_] = rows[0][0]
@ -41,8 +44,8 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
if hash_ in self._inventory: if hash_ in self._inventory:
return self._inventory[hash_] return self._inventory[hash_]
rows = sqlQuery( rows = sqlQuery(
'SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', 'SELECT objecttype, streamnumber, payload, expirestime, tag'
sqlite3.Binary(hash_)) ' FROM inventory WHERE hash=?', sqlite3.Binary(hash_))
if not rows: if not rows:
raise KeyError(hash_) raise KeyError(hash_)
return InventoryItem(*rows[0]) return InventoryItem(*rows[0])
@ -64,35 +67,49 @@ class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
def __len__(self): def __len__(self):
with self.lock: with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] return len(self._inventory) + sqlQuery(
'SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, objectType, tag): def by_type_and_tag(self, objectType, tag):
"""Return objects filtered by object type and tag"""
with self.lock: with self.lock:
values = [value for value in self._inventory.values() if value.type == objectType and value.tag == tag] values = [value for value in self._inventory.values()
if value.type == objectType and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery( values += (InventoryItem(*value) for value in sqlQuery(
'SELECT objecttype, streamnumber, payload, expirestime, tag \ 'SELECT objecttype, streamnumber, payload, expirestime, tag'
FROM inventory WHERE objecttype=? AND tag=?', objectType, sqlite3.Binary(tag))) ' FROM inventory WHERE objecttype=? AND tag=?',
objectType, sqlite3.Binary(tag)))
return values return values
def unexpired_hashes_by_stream(self, stream): def unexpired_hashes_by_stream(self, stream):
"""Return unexpired inventory vectors filtered by stream"""
with self.lock: with self.lock:
t = int(time.time()) t = int(time.time())
hashes = [x for x, value in self._inventory.items() if value.stream == stream and value.expires > t] hashes = [x for x, value in self._inventory.items()
if value.stream == stream and value.expires > t]
hashes += (str(payload) for payload, in sqlQuery( hashes += (str(payload) for payload, in sqlQuery(
'SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) 'SELECT hash FROM inventory WHERE streamnumber=?'
' AND expirestime>?', stream, t))
return hashes return hashes
def flush(self): def flush(self):
"""Flush cache"""
with self.lock: with self.lock:
# If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. # If you use both the inventoryLock and the sqlLock,
# always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql: with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items(): for objectHash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', sqlite3.Binary(objectHash), *value) sql.execute(
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
sqlite3.Binary(objectHash), *value)
self._inventory.clear() self._inventory.clear()
def clean(self): def clean(self):
"""Free memory / perform garbage collection"""
with self.lock: with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?', int(time.time()) - (60 * 60 * 3)) sqlExecute(
'DELETE FROM inventory WHERE expirestime<?',
int(time.time()) - (60 * 60 * 3))
self._objects.clear() self._objects.clear()
for objectHash, value in self._inventory.items(): for objectHash, value in self._inventory.items():
self._objects[objectHash] = value.stream self._objects[objectHash] = value.stream