This repository has been archived on 2025-01-29. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2025-01-29/src/storage/filesystem.py

269 lines
9.0 KiB
Python
Raw Normal View History

2019-09-24 11:21:53 +02:00
"""
2019-12-24 13:53:22 +01:00
Module for using filesystem (directory with files) for inventory storage
2019-09-24 11:21:53 +02:00
"""
2020-12-29 09:28:48 +01:00
import logging
2023-09-21 02:30:36 +02:00
import os
import time
from binascii import hexlify, unhexlify
from threading import RLock
from paths import lookupAppdataFolder
from .storage import InventoryItem, InventoryStorage
2020-12-29 09:28:48 +01:00
logger = logging.getLogger('default')
2019-09-24 11:20:20 +02:00
2019-12-24 13:53:22 +01:00
class FilesystemInventory(InventoryStorage):
"""Filesystem for inventory storage"""
topDir = "inventory"
objectDir = "objects"
metadataFilename = "metadata"
dataFilename = "data"
def __init__(self):
2019-09-24 11:21:53 +02:00
super(FilesystemInventory, self).__init__()
2023-09-21 02:30:36 +02:00
self.baseDir = os.path.join(
2019-12-24 13:53:22 +01:00
lookupAppdataFolder(), FilesystemInventory.topDir)
2023-09-21 02:30:36 +02:00
for createDir in [self.baseDir, os.path.join(self.baseDir, "objects")]:
if os.path.exists(createDir):
if not os.path.isdir(createDir):
2019-12-24 13:53:22 +01:00
raise IOError(
"%s exists but it's not a directory" % createDir)
else:
2023-09-21 02:30:36 +02:00
os.makedirs(createDir)
2019-12-24 13:53:22 +01:00
# Guarantees that two receiveDataThreads
# don't receive and process the same message
2019-09-24 11:20:20 +02:00
# concurrently (probably sent by a malicious individual)
self.lock = RLock()
self._inventory = {}
self._load()
2019-09-24 11:21:53 +02:00
def __contains__(self, hashval):
for streamDict in self._inventory.values():
2019-09-24 11:21:53 +02:00
if hashval in streamDict:
return True
return False
def __delitem__(self, hash_):
raise NotImplementedError
2019-09-24 11:21:53 +02:00
def __getitem__(self, hashval):
for streamDict in self._inventory.values():
try:
2019-09-24 11:21:53 +02:00
retval = streamDict[hashval]
except KeyError:
continue
if retval.payload is None:
2019-12-24 13:53:22 +01:00
retval = InventoryItem(
retval.type,
retval.stream,
self.getData(hashval),
retval.expires,
retval.tag)
return retval
2019-09-24 11:21:53 +02:00
raise KeyError(hashval)
2019-09-24 11:21:53 +02:00
def __setitem__(self, hashval, value):
with self.lock:
value = InventoryItem(*value)
try:
2023-09-21 02:30:36 +02:00
os.makedirs(os.path.join(
2019-12-24 13:53:22 +01:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode()))
except OSError:
pass
try:
2019-09-24 11:20:20 +02:00
with open(
2023-09-21 02:30:36 +02:00
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.metadataFilename,
),
"w",
) as f:
2019-12-24 13:53:22 +01:00
f.write("%s,%s,%s,%s," % (
value.type,
value.stream,
value.expires,
hexlify(value.tag).decode()))
2019-09-24 11:20:20 +02:00
with open(
2023-09-21 02:30:36 +02:00
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.dataFilename,
),
"wb",
2019-09-24 11:20:20 +02:00
) as f:
f.write(value.payload)
except IOError:
raise KeyError
try:
2019-09-24 11:21:53 +02:00
self._inventory[value.stream][hashval] = value
except KeyError:
self._inventory[value.stream] = {}
2019-09-24 11:21:53 +02:00
self._inventory[value.stream][hashval] = value
2019-09-24 11:21:53 +02:00
def delHashId(self, hashval):
"""Remove object from inventory"""
for stream in self._inventory:
try:
2019-09-24 11:21:53 +02:00
del self._inventory[stream][hashval]
except KeyError:
pass
with self.lock:
try:
2023-09-21 02:30:36 +02:00
os.remove(
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.metadataFilename))
except IOError:
pass
try:
2023-09-21 02:30:36 +02:00
os.remove(
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.dataFilename))
except IOError:
pass
try:
2023-09-21 02:30:36 +02:00
os.rmdir(os.path.join(
2019-12-24 13:53:22 +01:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashval).decode()))
except IOError:
pass
def __iter__(self):
elems = []
for streamDict in self._inventory.values():
2019-09-24 11:20:20 +02:00
elems.extend(streamDict.keys())
return elems.__iter__()
def __len__(self):
retval = 0
for streamDict in self._inventory.values():
retval += len(streamDict)
return retval
def _load(self):
newInventory = {}
for hashId in self.object_list():
try:
2019-12-24 13:53:22 +01:00
objectType, streamNumber, expiresTime, tag = self.getMetadata(
hashId)
try:
2019-09-24 11:20:20 +02:00
newInventory[streamNumber][hashId] = InventoryItem(
objectType, streamNumber, None, expiresTime, tag)
except KeyError:
newInventory[streamNumber] = {}
2019-09-24 11:20:20 +02:00
newInventory[streamNumber][hashId] = InventoryItem(
objectType, streamNumber, None, expiresTime, tag)
except KeyError:
2020-12-29 09:28:48 +01:00
logger.debug(
'error loading %s', hexlify(hashId), exc_info=True)
self._inventory = newInventory
def stream_list(self):
2019-09-24 11:21:53 +02:00
"""Return list of streams"""
return self._inventory.keys()
def object_list(self):
2019-09-24 11:21:53 +02:00
"""Return inventory vectors (hashes) from a directory"""
2023-09-21 02:30:36 +02:00
return [unhexlify(x) for x in os.listdir(os.path.join(
2019-12-24 13:53:22 +01:00
self.baseDir, FilesystemInventory.objectDir))]
def getData(self, hashId):
2019-09-24 11:21:53 +02:00
"""Get object data"""
try:
2019-09-24 11:20:20 +02:00
with open(
2023-09-21 02:30:36 +02:00
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.dataFilename,
),
"r",
) as f:
return f.read()
except IOError:
raise AttributeError
def getMetadata(self, hashId):
2019-09-24 11:21:53 +02:00
"""Get object metadata"""
try:
2019-09-24 11:20:20 +02:00
with open(
2023-09-21 02:30:36 +02:00
os.path.join(
2019-09-24 11:20:20 +02:00
self.baseDir,
FilesystemInventory.objectDir,
hexlify(hashId).decode(),
2019-09-24 11:20:20 +02:00
FilesystemInventory.metadataFilename,
),
"r",
) as f:
objectType, streamNumber, expiresTime, tag = f.read().split(
",", 4)[:4]
2019-12-24 13:53:22 +01:00
return [
int(objectType),
int(streamNumber),
int(expiresTime),
unhexlify(tag)]
except IOError:
raise KeyError
def by_type_and_tag(self, objectType, tag):
2019-09-24 11:20:20 +02:00
"""Get a list of objects filtered by object type and tag"""
retval = []
2019-12-24 13:53:22 +01:00
for streamDict in self._inventory.values():
for hashId, item in streamDict:
if item.type == objectType and item.tag == tag:
2019-09-24 11:20:20 +02:00
try:
if item.payload is None:
item.payload = self.getData(hashId)
except IOError:
continue
2019-12-24 13:53:22 +01:00
retval.append(InventoryItem(
item.type,
item.stream,
item.payload,
item.expires,
item.tag))
return retval
def hashes_by_stream(self, stream):
2019-09-24 11:21:53 +02:00
"""Return inventory vectors (hashes) for a stream"""
try:
return self._inventory[stream].keys()
except KeyError:
return []
def unexpired_hashes_by_stream(self, stream):
2019-09-24 11:21:53 +02:00
"""Return unexpired hashes in the inventory for a particular stream"""
try:
2023-09-21 02:30:36 +02:00
return [
x for x, value in self._inventory[stream].items()
if value.expires > int(time.time())]
except KeyError:
return []
def flush(self):
2019-09-24 11:21:53 +02:00
"""Flush the inventory and create a new, empty one"""
self._load()
def clean(self):
2019-09-24 11:21:53 +02:00
"""Clean out old items from the inventory"""
2023-09-21 02:30:36 +02:00
minTime = int(time.time()) - 60 * 60 * 30
deletes = []
2019-12-24 13:53:22 +01:00
for streamDict in self._inventory.values():
for hashId, item in streamDict.items():
if item.expires < minTime:
deletes.append(hashId)
for hashId in deletes:
self.delHashId(hashId)