Inventory checking performance optimisation
- caching of whether an object exists in inventory was somehow removed since storage refactoring (or it never worked). Now existence checking is cached in the sqlite storage backend
This commit is contained in:
parent
053f434e04
commit
96b8cff0d1
|
@ -11,14 +11,18 @@ class SqliteInventory(InventoryStorage):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(self.__class__, self).__init__()
|
super(self.__class__, self).__init__()
|
||||||
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
|
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
|
||||||
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
self._objects = {} # cache for existing objects, used for quick lookups if we have an object. This is used for example whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it.
|
||||||
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
|
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
|
||||||
|
|
||||||
def __contains__(self, hash):
|
def __contains__(self, hash):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
if hash in self._inventory:
|
if hash in self._objects:
|
||||||
|
return True
|
||||||
|
rows = sqlQuery('SELECT streamnumber FROM inventory WHERE hash=?', sqlite3.Binary(hash))
|
||||||
|
if not rows:
|
||||||
|
return False
|
||||||
|
self._objects[hash] = rows[0][0]
|
||||||
return True
|
return True
|
||||||
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', sqlite3.Binary(hash)))
|
|
||||||
|
|
||||||
def __getitem__(self, hash):
|
def __getitem__(self, hash):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
@ -33,7 +37,7 @@ class SqliteInventory(InventoryStorage):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
value = InventoryItem(*value)
|
value = InventoryItem(*value)
|
||||||
self._inventory[hash] = value
|
self._inventory[hash] = value
|
||||||
self._streams[value.stream].add(hash)
|
self._objects[hash] = value.stream
|
||||||
|
|
||||||
def __delitem__(self, hash):
|
def __delitem__(self, hash):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
@ -54,10 +58,6 @@ class SqliteInventory(InventoryStorage):
|
||||||
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', objectType, sqlite3.Binary(tag)))
|
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', objectType, sqlite3.Binary(tag)))
|
||||||
return values
|
return values
|
||||||
|
|
||||||
def hashes_by_stream(self, stream):
|
|
||||||
with self.lock:
|
|
||||||
return self._streams[stream]
|
|
||||||
|
|
||||||
def unexpired_hashes_by_stream(self, stream):
|
def unexpired_hashes_by_stream(self, stream):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
t = int(time.time())
|
t = int(time.time())
|
||||||
|
@ -75,7 +75,7 @@ class SqliteInventory(InventoryStorage):
|
||||||
def clean(self):
|
def clean(self):
|
||||||
with self.lock:
|
with self.lock:
|
||||||
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
|
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
|
||||||
self._streams.clear()
|
self._objects.clear()
|
||||||
for objectHash, value in self._inventory.items():
|
for objectHash, value in self._inventory.items():
|
||||||
self._streams[value.stream].add(objectHash)
|
self._objects[objectHash] = value.stream
|
||||||
|
|
||||||
|
|
|
@ -33,9 +33,6 @@ class InventoryStorage(Storage, collections.MutableMapping):
|
||||||
def by_type_and_tag(self, objectType, tag):
|
def by_type_and_tag(self, objectType, tag):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def hashes_by_stream(self, stream):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
def unexpired_hashes_by_stream(self, stream):
|
def unexpired_hashes_by_stream(self, stream):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user