@ -1,12 +1,11 @@
import collections
from threading import current_thread , enumerate as threadingEnumerate , RLock
import Queue
from threading import RLock
import sqlite3
import time
from helper_sql import *
from helper_sql import sqlQuery , SqlBulkExecute , sqlExecute
from storage import InventoryStorage , InventoryItem
class SqliteInventory ( InventoryStorage ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
@ -36,7 +35,9 @@ class SqliteInventory(InventoryStorage):
with self . lock :
if hash in self . _inventory :
return self . _inventory [ hash ]
rows = sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' , sqlite3 . Binary ( hash ) )
rows = sqlQuery (
' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' ,
sqlite3 . Binary ( hash ) )
if not rows :
raise KeyError ( hash )
return InventoryItem ( * rows [ 0 ] )
@ -63,18 +64,22 @@ class SqliteInventory(InventoryStorage):
def by_type_and_tag ( self , objectType , tag ) :
with self . lock :
values = [ value for value in self . _inventory . values ( ) if value . type == objectType and value . tag == tag ]
values + = ( InventoryItem ( * value ) for value in sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=? ' , objectType , sqlite3 . Binary ( tag ) ) )
values + = ( InventoryItem ( * value ) for value in sqlQuery (
' SELECT objecttype, streamnumber, payload, expirestime, tag \
FROM inventory WHERE objecttype = ? AND tag = ? ' , objectType, sqlite3.Binary(tag)))
return values
def unexpired_hashes_by_stream ( self , stream ) :
with self . lock :
t = int ( time . time ( ) )
hashes = [ x for x , value in self . _inventory . items ( ) if value . stream == stream and value . expires > t ]
hashes + = ( str ( payload ) for payload , in sqlQuery ( ' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
hashes + = ( str ( payload ) for payload , in sqlQuery (
' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
return hashes
def flush ( self ) :
with self . lock : # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with self . lock :
# If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute ( ) as sql :
for objectHash , value in self . _inventory . items ( ) :
sql . execute ( ' INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?) ' , sqlite3 . Binary ( objectHash ) , * value )
@ -82,8 +87,7 @@ class SqliteInventory(InventoryStorage):
def clean ( self ) :
with self . lock :
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
self . _objects . clear ( )
for objectHash , value in self . _inventory . items ( ) :
self . _objects [ objectHash ] = value . stream