2017-05-27 19:03:27 +02:00
import collections
from threading import current_thread , enumerate as threadingEnumerate , RLock
import Queue
2017-12-02 02:48:10 +01:00
import sqlite3
2017-05-27 19:03:27 +02:00
import time
from helper_sql import *
from storage import InventoryStorage , InventoryItem
class SqliteInventory ( InventoryStorage ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . _inventory = { } #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self . _streams = collections . defaultdict ( set ) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
self . lock = RLock ( ) # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
def __contains__ ( self , hash ) :
with self . lock :
if hash in self . _inventory :
return True
2018-02-01 21:24:50 +01:00
return bool ( sqlQuery ( ' SELECT 1 FROM inventory WHERE hash=? ' , sqlite3 . Binary ( hash ) ) )
2017-05-27 19:03:27 +02:00
def __getitem__ ( self , hash ) :
with self . lock :
if hash in self . _inventory :
return self . _inventory [ hash ]
2018-02-01 21:24:50 +01:00
rows = sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' , sqlite3 . Binary ( hash ) )
2017-05-27 19:03:27 +02:00
if not rows :
raise KeyError ( hash )
return InventoryItem ( * rows [ 0 ] )
def __setitem__ ( self , hash , value ) :
with self . lock :
value = InventoryItem ( * value )
self . _inventory [ hash ] = value
self . _streams [ value . stream ] . add ( hash )
def __delitem__ ( self , hash ) :
raise NotImplementedError
def __iter__ ( self ) :
with self . lock :
hashes = self . _inventory . keys ( ) [ : ]
hashes + = ( x for x , in sqlQuery ( ' SELECT hash FROM inventory ' ) )
return hashes . __iter__ ( )
def __len__ ( self ) :
with self . lock :
return len ( self . _inventory ) + sqlQuery ( ' SELECT count(*) FROM inventory ' ) [ 0 ] [ 0 ]
2017-06-24 12:13:35 +02:00
def by_type_and_tag ( self , objectType , tag ) :
2017-05-27 19:03:27 +02:00
with self . lock :
2017-06-24 23:09:08 +02:00
values = [ value for value in self . _inventory . values ( ) if value . type == objectType and value . tag == tag ]
2017-12-02 02:48:10 +01:00
values + = ( InventoryItem ( * value ) for value in sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=? ' , objectType , sqlite3 . Binary ( tag ) ) )
2017-05-27 19:03:27 +02:00
return values
def hashes_by_stream ( self , stream ) :
with self . lock :
return self . _streams [ stream ]
def unexpired_hashes_by_stream ( self , stream ) :
with self . lock :
t = int ( time . time ( ) )
hashes = [ x for x , value in self . _inventory . items ( ) if value . stream == stream and value . expires > t ]
2018-02-01 21:24:50 +01:00
hashes + = ( str ( payload ) for payload , in sqlQuery ( ' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
2017-05-27 19:03:27 +02:00
return hashes
def flush ( self ) :
with self . lock : # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute ( ) as sql :
for objectHash , value in self . _inventory . items ( ) :
2018-02-01 21:24:50 +01:00
sql . execute ( ' INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?) ' , sqlite3 . Binary ( objectHash ) , * value )
2017-05-27 19:03:27 +02:00
self . _inventory . clear ( )
def clean ( self ) :
with self . lock :
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
self . _streams . clear ( )
2017-11-14 23:20:15 +01:00
for objectHash , value in self . _inventory . items ( ) :
2017-05-27 19:03:27 +02:00
self . _streams [ value . stream ] . add ( objectHash )