2017-01-10 21:15:35 +01:00
import collections
2017-01-15 19:21:24 +01:00
from threading import current_thread , RLock
2017-01-10 21:15:35 +01:00
import time
from helper_sql import *
from singleton import Singleton
@Singleton
class Inventory ( collections . MutableMapping ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . _inventory = { } #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self . numberOfInventoryLookupsPerformed = 0
self . _streams = collections . defaultdict ( set ) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
2017-01-15 19:21:24 +01:00
self . lock = RLock ( ) # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self . InventoryItem = collections . namedtuple ( ' InventoryItem ' , ' type stream payload expires tag ' )
2017-01-10 21:15:35 +01:00
def __contains__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
self . numberOfInventoryLookupsPerformed + = 1
if hash in self . _inventory :
return True
return bool ( sqlQuery ( ' SELECT 1 FROM inventory WHERE hash=? ' , hash ) )
def __getitem__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
if hash in self . _inventory :
return self . _inventory [ hash ]
rows = sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' , hash )
if not rows :
raise KeyError ( hash )
2017-01-15 19:21:24 +01:00
return self . InventoryItem ( * rows [ 0 ] )
2017-01-10 21:15:35 +01:00
def __setitem__ ( self , hash , value ) :
2017-01-15 19:21:24 +01:00
with self . lock :
value = self . InventoryItem ( * value )
2017-01-10 21:15:35 +01:00
self . _inventory [ hash ] = value
self . _streams [ value . stream ] . add ( hash )
2017-01-15 22:01:10 +01:00
Missing ( ) . delete ( hash )
2017-01-10 21:15:35 +01:00
def __delitem__ ( self , hash ) :
raise NotImplementedError
def __iter__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
hashes = self . _inventory . keys ( ) [ : ]
hashes + = ( hash for hash , in sqlQuery ( ' SELECT hash FROM inventory ' ) )
return hashes . __iter__ ( )
def __len__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return len ( self . _inventory ) + sqlQuery ( ' SELECT count(*) FROM inventory ' ) [ 0 ] [ 0 ]
def by_type_and_tag ( self , type , tag ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
values = [ value for value in self . _inventory . values ( ) if value . type == type and value . tag == tag ]
2017-01-15 19:21:24 +01:00
values + = ( self . InventoryItem ( * value ) for value in sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=? ' , type , tag ) )
2017-01-10 21:15:35 +01:00
return values
def hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return self . _streams [ stream ]
def unexpired_hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
t = int ( time . time ( ) )
hashes = [ hash for hash , value in self . _inventory . items ( ) if value . stream == stream and value . expires > t ]
hashes + = ( payload for payload , in sqlQuery ( ' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
return hashes
def flush ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock : # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
2017-01-10 21:15:35 +01:00
with SqlBulkExecute ( ) as sql :
for hash , value in self . _inventory . items ( ) :
sql . execute ( ' INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?) ' , hash , * value )
self . _inventory . clear ( )
def clean ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
self . _streams . clear ( )
for hash , value in self . items ( ) :
self . _streams [ value . stream ] . add ( hash )
2017-01-15 19:21:24 +01:00
@Singleton
class Missing ( object ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . lock = RLock ( )
self . hashes = { }
def add ( self , objectHash ) :
with self . lock :
if not objectHash in self . hashes :
self . hashes [ objectHash ] = { ' peers ' : [ ] , ' requested ' : 0 }
self . hashes [ objectHash ] [ ' peers ' ] . append ( current_thread ( ) . peer )
def len ( self ) :
with self . lock :
return len ( self . hashes )
2017-01-15 22:01:10 +01:00
def removeObjectFromCurrentThread ( self , objectHash ) :
with self . lock :
try :
self . hashes [ objectHash ] [ ' peers ' ] . remove ( current_thread ( ) . peer )
except ValueError :
pass
if len ( self . hashes [ objectHash ] [ ' peers ' ] ) == 0 :
self . delete ( objectHash )
else :
self . hashes [ objectHash ] [ ' requested ' ] = time . time ( )
2017-01-15 19:50:28 +01:00
def pull ( self , count = 1 ) :
if count < 1 :
raise ValueError ( " Must be at least one " )
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-15 22:01:10 +01:00
since = time . time ( ) - 60 # once every minute
2017-01-15 20:47:33 +01:00
objectHashes = [ ]
for objectHash in self . hashes . keys ( ) :
if current_thread ( ) . peer in self . hashes [ objectHash ] [ ' peers ' ] and self . hashes [ objectHash ] [ ' requested ' ] < since :
objectHashes . append ( objectHash )
2017-01-15 22:01:10 +01:00
self . removeObjectFromCurrentThread ( objectHash )
2017-01-15 20:47:33 +01:00
if len ( objectHashes ) > = count :
break
2017-01-15 19:50:28 +01:00
return objectHashes
2017-01-15 19:21:24 +01:00
def delete ( self , objectHash ) :
with self . lock :
if objectHash in self . hashes :
del self . hashes [ objectHash ]
def threadEnd ( self ) :
with self . lock :
for objectHash in self . hashes :
2017-01-15 22:01:10 +01:00
self . removeObjectFromCurrentThread ( objectHash )