2017-01-10 21:15:35 +01:00
import collections
2017-01-15 19:21:24 +01:00
from threading import current_thread , RLock
2017-01-10 21:15:35 +01:00
import time
from helper_sql import *
from singleton import Singleton
@Singleton
class Inventory ( collections . MutableMapping ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . _inventory = { } #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self . numberOfInventoryLookupsPerformed = 0
self . _streams = collections . defaultdict ( set ) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
2017-01-15 19:21:24 +01:00
self . lock = RLock ( ) # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self . InventoryItem = collections . namedtuple ( ' InventoryItem ' , ' type stream payload expires tag ' )
2017-01-10 21:15:35 +01:00
def __contains__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
self . numberOfInventoryLookupsPerformed + = 1
if hash in self . _inventory :
return True
return bool ( sqlQuery ( ' SELECT 1 FROM inventory WHERE hash=? ' , hash ) )
def __getitem__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
if hash in self . _inventory :
return self . _inventory [ hash ]
rows = sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' , hash )
if not rows :
raise KeyError ( hash )
2017-01-15 19:21:24 +01:00
return self . InventoryItem ( * rows [ 0 ] )
2017-01-10 21:15:35 +01:00
def __setitem__ ( self , hash , value ) :
2017-01-15 19:21:24 +01:00
with self . lock :
value = self . InventoryItem ( * value )
2017-01-10 21:15:35 +01:00
self . _inventory [ hash ] = value
self . _streams [ value . stream ] . add ( hash )
2017-01-16 15:17:23 +01:00
Missing ( ) . delete ( hash , True )
2017-01-10 21:15:35 +01:00
def __delitem__ ( self , hash ) :
raise NotImplementedError
def __iter__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
hashes = self . _inventory . keys ( ) [ : ]
hashes + = ( hash for hash , in sqlQuery ( ' SELECT hash FROM inventory ' ) )
return hashes . __iter__ ( )
def __len__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return len ( self . _inventory ) + sqlQuery ( ' SELECT count(*) FROM inventory ' ) [ 0 ] [ 0 ]
def by_type_and_tag ( self , type , tag ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
values = [ value for value in self . _inventory . values ( ) if value . type == type and value . tag == tag ]
2017-01-15 19:21:24 +01:00
values + = ( self . InventoryItem ( * value ) for value in sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=? ' , type , tag ) )
2017-01-10 21:15:35 +01:00
return values
def hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return self . _streams [ stream ]
def unexpired_hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
t = int ( time . time ( ) )
hashes = [ hash for hash , value in self . _inventory . items ( ) if value . stream == stream and value . expires > t ]
hashes + = ( payload for payload , in sqlQuery ( ' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
return hashes
def flush ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock : # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
2017-01-10 21:15:35 +01:00
with SqlBulkExecute ( ) as sql :
for hash , value in self . _inventory . items ( ) :
sql . execute ( ' INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?) ' , hash , * value )
self . _inventory . clear ( )
def clean ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
self . _streams . clear ( )
for hash , value in self . items ( ) :
self . _streams [ value . stream ] . add ( hash )
2017-01-15 19:21:24 +01:00
@Singleton
class Missing ( object ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . lock = RLock ( )
self . hashes = { }
2017-01-15 22:21:19 +01:00
self . stopped = False
2017-01-16 17:08:47 +01:00
# don't request the same object more frequently than this
2017-01-15 22:41:12 +01:00
self . frequency = 60
2017-01-16 17:08:47 +01:00
# after requesting and not receiving an object more than this times, consider it expired
self . maxRequestCount = 6
2017-01-16 15:17:23 +01:00
self . pending = { }
2017-01-15 19:21:24 +01:00
def add ( self , objectHash ) :
2017-01-15 22:21:19 +01:00
if self . stopped :
return
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-16 15:17:23 +01:00
if objectHash not in self . hashes :
2017-01-16 17:08:47 +01:00
self . hashes [ objectHash ] = { ' peers ' : [ ] , ' requested ' : 0 , ' requestedCount ' : 0 }
2017-01-15 19:21:24 +01:00
self . hashes [ objectHash ] [ ' peers ' ] . append ( current_thread ( ) . peer )
2017-01-16 15:17:23 +01:00
def addPending ( self , objectHash = None ) :
if self . stopped :
return
if current_thread ( ) . peer not in self . pending :
self . pending [ current_thread ( ) . peer ] = { ' objects ' : [ ] , ' requested ' : 0 , ' received ' : 0 }
if objectHash not in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] and not objectHash is None :
self . pending [ current_thread ( ) . peer ] [ ' objects ' ] . append ( objectHash )
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] = time . time ( )
2017-01-15 19:21:24 +01:00
def len ( self ) :
with self . lock :
return len ( self . hashes )
2017-01-15 22:01:10 +01:00
def removeObjectFromCurrentThread ( self , objectHash ) :
with self . lock :
try :
self . hashes [ objectHash ] [ ' peers ' ] . remove ( current_thread ( ) . peer )
2017-01-15 22:21:19 +01:00
except KeyError :
return
2017-01-15 22:01:10 +01:00
except ValueError :
pass
2017-01-15 22:41:12 +01:00
if len ( self . hashes [ objectHash ] [ ' peers ' ] ) == 0 and self . hashes [ objectHash ] [ ' requested ' ] < time . time ( ) - self . frequency :
2017-01-15 22:01:10 +01:00
self . delete ( objectHash )
else :
self . hashes [ objectHash ] [ ' requested ' ] = time . time ( )
2017-01-15 19:50:28 +01:00
def pull ( self , count = 1 ) :
if count < 1 :
raise ValueError ( " Must be at least one " )
2017-01-15 23:07:11 +01:00
objectHashes = [ ]
2017-01-16 15:17:23 +01:00
if self . stopped :
return objectHashes
2017-01-15 23:07:11 +01:00
try :
for objectHash in self . hashes . keys ( ) :
2017-01-16 15:17:23 +01:00
if len ( objectHashes ) > = count :
break
if current_thread ( ) . peer not in self . pending :
self . addPending ( )
if ( self . pending [ current_thread ( ) . peer ] [ ' requested ' ] > = time . time ( ) - self . frequency or \
self . pending [ current_thread ( ) . peer ] [ ' received ' ] > = time . time ( ) - self . frequency ) and \
len ( self . pending [ current_thread ( ) . peer ] [ ' objects ' ] ) > = count :
break
2017-01-16 17:08:47 +01:00
# requested too long ago or not at all from any thread
2017-01-15 23:07:11 +01:00
if self . hashes [ objectHash ] [ ' requested ' ] < time . time ( ) - self . frequency :
2017-01-16 17:08:47 +01:00
# ready requested from this thread but haven't received yet
2017-01-16 15:17:23 +01:00
if objectHash in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] :
2017-01-16 17:08:47 +01:00
# if still sending or receiving, request next
if self . pending [ current_thread ( ) . peer ] [ ' received ' ] > = time . time ( ) - self . frequency or \
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] > = time . time ( ) - self . frequency :
2017-01-16 15:17:23 +01:00
continue
2017-01-16 17:08:47 +01:00
# haven't requested or received anything recently, re-request (i.e. continue)
# the current node doesn't have the object
2017-01-16 15:17:23 +01:00
elif current_thread ( ) . peer not in self . hashes [ objectHash ] [ ' peers ' ] :
2017-01-15 23:07:11 +01:00
continue
2017-01-16 17:08:47 +01:00
# already requested too many times, remove all signs of this object
if self . hashes [ objectHash ] [ ' requestedCount ' ] > = self . maxRequestCount :
with self . lock :
del self . hashes [ objectHash ]
for thread in self . pending . keys ( ) :
if objectHash in self . pending [ thread ] [ ' objects ' ] :
self . pending [ thread ] [ ' objects ' ] . remove ( objectHash )
continue
# all ok, request
2017-01-16 15:17:23 +01:00
objectHashes . append ( objectHash )
self . hashes [ objectHash ] [ ' requested ' ] = time . time ( )
2017-01-16 17:08:47 +01:00
with self . lock :
self . hashes [ objectHash ] [ ' requestedCount ' ] + = 1
2017-01-16 15:17:23 +01:00
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] = time . time ( )
self . addPending ( objectHash )
2017-01-15 23:07:11 +01:00
except ( RuntimeError , KeyError , ValueError ) :
# the for cycle sometimes breaks if you remove elements
pass
return objectHashes
2017-01-15 19:21:24 +01:00
2017-01-16 15:17:23 +01:00
def delete ( self , objectHash , justReceived = False ) :
2017-01-15 19:21:24 +01:00
with self . lock :
if objectHash in self . hashes :
del self . hashes [ objectHash ]
2017-01-16 15:17:23 +01:00
if objectHash in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] :
self . pending [ current_thread ( ) . peer ] [ ' objects ' ] . remove ( objectHash )
if justReceived :
self . pending [ current_thread ( ) . peer ] [ ' received ' ] = time . time ( )
2017-01-15 19:21:24 +01:00
2017-01-15 22:21:19 +01:00
def stop ( self ) :
with self . lock :
self . hashes = { }
2017-01-16 15:17:23 +01:00
self . pending = { }
2017-01-15 22:21:19 +01:00
2017-01-15 19:21:24 +01:00
def threadEnd ( self ) :
with self . lock :
for objectHash in self . hashes :
2017-01-15 22:01:10 +01:00
self . removeObjectFromCurrentThread ( objectHash )
2017-01-16 15:17:23 +01:00
try :
del self . pending [ current_thread ( ) . peer ]
except KeyError :
pass