PyBitmessage-2021-04-27/src/storage/sqlite.py

124 lines
4.6 KiB
Python
Raw Normal View History

2019-09-25 11:46:28 +02:00
"""
2019-12-24 13:53:37 +01:00
Sqlite Inventory
2019-09-25 11:46:28 +02:00
"""
import sqlite3
import time
2019-09-25 11:46:28 +02:00
from threading import RLock
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
from storage import InventoryItem, InventoryStorage
2019-09-25 09:37:29 +02:00
2019-12-24 13:53:37 +01:00
class SqliteInventory(InventoryStorage): # pylint: disable=too-many-ancestors
2019-09-25 11:46:28 +02:00
"""Inventory using SQLite"""
def __init__(self):
2019-09-25 11:46:28 +02:00
super(SqliteInventory, self).__init__()
2019-09-24 11:20:20 +02:00
# of objects (like msg payloads and pubkey payloads)
# Does not include protocol headers (the first 24 bytes of each packet).
self._inventory = {}
# cache for existing objects, used for quick lookups if we have an object.
# This is used for example whenever we receive an inv message from a peer
# to check to see what items are new to us.
2019-12-24 13:53:37 +01:00
# We don't delete things out of it; instead,
# the singleCleaner thread clears and refills it.
2019-09-24 11:20:20 +02:00
self._objects = {}
2019-12-24 13:53:37 +01:00
# Guarantees that two receiveDataThreads don't receive
# and process the same message concurrently
2019-09-24 11:20:20 +02:00
# (probably sent by a malicious individual)
self.lock = RLock()
2019-09-25 11:46:28 +02:00
def __contains__(self, hash_):
with self.lock:
2019-09-25 11:46:28 +02:00
if hash_ in self._objects:
return True
2019-12-24 13:53:37 +01:00
rows = sqlQuery(
'SELECT streamnumber FROM inventory WHERE hash=?',
sqlite3.Binary(hash_))
if not rows:
return False
2019-09-25 11:46:28 +02:00
self._objects[hash_] = rows[0][0]
return True
2019-09-25 11:46:28 +02:00
def __getitem__(self, hash_):
with self.lock:
2019-09-25 11:46:28 +02:00
if hash_ in self._inventory:
return self._inventory[hash_]
2019-09-25 09:37:29 +02:00
rows = sqlQuery(
2019-12-24 13:53:37 +01:00
'SELECT objecttype, streamnumber, payload, expirestime, tag'
' FROM inventory WHERE hash=?', sqlite3.Binary(hash_))
if not rows:
2019-09-25 11:46:28 +02:00
raise KeyError(hash_)
return InventoryItem(*rows[0])
2019-09-25 11:46:28 +02:00
def __setitem__(self, hash_, value):
with self.lock:
value = InventoryItem(*value)
2019-09-25 11:46:28 +02:00
self._inventory[hash_] = value
self._objects[hash_] = value.stream
2019-09-25 11:46:28 +02:00
def __delitem__(self, hash_):
raise NotImplementedError
def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with self.lock:
2019-12-24 13:53:37 +01:00
return len(self._inventory) + sqlQuery(
'SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, objectType, tag=None):
"""
Get all inventory items of certain *objectType*
with *tag* if given.
"""
query = [
'SELECT objecttype, streamnumber, payload, expirestime, tag'
' FROM inventory WHERE objecttype=?', objectType]
if tag:
query[0] += ' AND tag=?'
query.append(sqlite3.Binary(tag))
with self.lock:
values = [
value for value in self._inventory.values()
if value.type == objectType
and tag is None or value.tag == tag
] + [InventoryItem(*value) for value in sqlQuery(*query)]
return values
def unexpired_hashes_by_stream(self, stream):
2019-12-24 13:53:37 +01:00
"""Return unexpired inventory vectors filtered by stream"""
with self.lock:
t = int(time.time())
2019-12-24 13:53:37 +01:00
hashes = [x for x, value in self._inventory.items()
if value.stream == stream and value.expires > t]
2019-09-25 09:37:29 +02:00
hashes += (str(payload) for payload, in sqlQuery(
2019-12-24 13:53:37 +01:00
'SELECT hash FROM inventory WHERE streamnumber=?'
' AND expirestime>?', stream, t))
return hashes
def flush(self):
2019-12-24 13:53:37 +01:00
"""Flush cache"""
2019-09-25 09:37:29 +02:00
with self.lock:
2019-12-24 13:53:37 +01:00
# If you use both the inventoryLock and the sqlLock,
# always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
2019-12-24 13:53:37 +01:00
sql.execute(
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
sqlite3.Binary(objectHash), *value)
self._inventory.clear()
def clean(self):
2019-12-24 13:53:37 +01:00
"""Free memory / perform garbage collection"""
with self.lock:
2019-12-24 13:53:37 +01:00
sqlExecute(
'DELETE FROM inventory WHERE expirestime<?',
int(time.time()) - (60 * 60 * 3))
self._objects.clear()
2017-11-14 23:20:15 +01:00
for objectHash, value in self._inventory.items():
self._objects[objectHash] = value.stream