2017-01-10 21:15:35 +01:00
import collections
2017-01-19 19:48:12 +01:00
from threading import current_thread , enumerate as threadingEnumerate , RLock
2017-01-10 21:15:35 +01:00
import time
from helper_sql import *
from singleton import Singleton
@Singleton
class Inventory ( collections . MutableMapping ) :
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . _inventory = { } #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self . numberOfInventoryLookupsPerformed = 0
self . _streams = collections . defaultdict ( set ) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
2017-01-15 19:21:24 +01:00
self . lock = RLock ( ) # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self . InventoryItem = collections . namedtuple ( ' InventoryItem ' , ' type stream payload expires tag ' )
2017-01-10 21:15:35 +01:00
def __contains__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
self . numberOfInventoryLookupsPerformed + = 1
if hash in self . _inventory :
return True
return bool ( sqlQuery ( ' SELECT 1 FROM inventory WHERE hash=? ' , hash ) )
def __getitem__ ( self , hash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
if hash in self . _inventory :
return self . _inventory [ hash ]
rows = sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=? ' , hash )
if not rows :
raise KeyError ( hash )
2017-01-15 19:21:24 +01:00
return self . InventoryItem ( * rows [ 0 ] )
2017-01-10 21:15:35 +01:00
def __setitem__ ( self , hash , value ) :
2017-01-15 19:21:24 +01:00
with self . lock :
value = self . InventoryItem ( * value )
2017-01-10 21:15:35 +01:00
self . _inventory [ hash ] = value
self . _streams [ value . stream ] . add ( hash )
2017-01-19 19:48:12 +01:00
PendingDownload ( ) . delete ( hash )
2017-01-10 21:15:35 +01:00
def __delitem__ ( self , hash ) :
raise NotImplementedError
def __iter__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
hashes = self . _inventory . keys ( ) [ : ]
2017-01-19 20:04:45 +01:00
hashes + = ( x for x , in sqlQuery ( ' SELECT hash FROM inventory ' ) )
2017-01-10 21:15:35 +01:00
return hashes . __iter__ ( )
def __len__ ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return len ( self . _inventory ) + sqlQuery ( ' SELECT count(*) FROM inventory ' ) [ 0 ] [ 0 ]
def by_type_and_tag ( self , type , tag ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
values = [ value for value in self . _inventory . values ( ) if value . type == type and value . tag == tag ]
2017-01-15 19:21:24 +01:00
values + = ( self . InventoryItem ( * value ) for value in sqlQuery ( ' SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=? ' , type , tag ) )
2017-01-10 21:15:35 +01:00
return values
def hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
return self . _streams [ stream ]
def unexpired_hashes_by_stream ( self , stream ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
t = int ( time . time ( ) )
2017-01-19 20:04:45 +01:00
hashes = [ x for x , value in self . _inventory . items ( ) if value . stream == stream and value . expires > t ]
2017-01-10 21:15:35 +01:00
hashes + = ( payload for payload , in sqlQuery ( ' SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>? ' , stream , t ) )
return hashes
def flush ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock : # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
2017-01-10 21:15:35 +01:00
with SqlBulkExecute ( ) as sql :
2017-01-19 20:04:45 +01:00
for objectHash , value in self . _inventory . items ( ) :
sql . execute ( ' INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?) ' , objectHash , * value )
2017-01-10 21:15:35 +01:00
self . _inventory . clear ( )
def clean ( self ) :
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-10 21:15:35 +01:00
sqlExecute ( ' DELETE FROM inventory WHERE expirestime<? ' , int ( time . time ( ) ) - ( 60 * 60 * 3 ) )
self . _streams . clear ( )
2017-01-19 20:04:45 +01:00
for objectHash , value in self . items ( ) :
self . _streams [ value . stream ] . add ( objectHash )
2017-01-15 19:21:24 +01:00
@Singleton
2017-01-19 19:48:12 +01:00
class PendingDownload ( object ) :
# keep a track of objects that have been advertised to us but we haven't downloaded them yet
2017-01-15 19:21:24 +01:00
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . lock = RLock ( )
self . hashes = { }
2017-01-15 22:21:19 +01:00
self . stopped = False
2017-01-16 17:08:47 +01:00
# don't request the same object more frequently than this
2017-01-15 22:41:12 +01:00
self . frequency = 60
2017-01-16 17:08:47 +01:00
# after requesting and not receiving an object more than this times, consider it expired
2017-01-19 19:48:12 +01:00
self . maxRequestCount = 3
2017-01-16 15:17:23 +01:00
self . pending = { }
2017-01-15 19:21:24 +01:00
def add ( self , objectHash ) :
2017-01-15 22:21:19 +01:00
if self . stopped :
return
2017-01-15 19:21:24 +01:00
with self . lock :
2017-01-16 15:17:23 +01:00
if objectHash not in self . hashes :
2017-01-16 17:08:47 +01:00
self . hashes [ objectHash ] = { ' peers ' : [ ] , ' requested ' : 0 , ' requestedCount ' : 0 }
2017-01-15 19:21:24 +01:00
self . hashes [ objectHash ] [ ' peers ' ] . append ( current_thread ( ) . peer )
2017-01-16 15:17:23 +01:00
def addPending ( self , objectHash = None ) :
if self . stopped :
return
if current_thread ( ) . peer not in self . pending :
self . pending [ current_thread ( ) . peer ] = { ' objects ' : [ ] , ' requested ' : 0 , ' received ' : 0 }
if objectHash not in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] and not objectHash is None :
self . pending [ current_thread ( ) . peer ] [ ' objects ' ] . append ( objectHash )
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] = time . time ( )
2017-01-15 19:21:24 +01:00
def len ( self ) :
with self . lock :
return len ( self . hashes )
2017-01-15 19:50:28 +01:00
def pull ( self , count = 1 ) :
if count < 1 :
raise ValueError ( " Must be at least one " )
2017-01-15 23:07:11 +01:00
objectHashes = [ ]
2017-01-16 23:17:56 +01:00
unreachableObjects = [ ]
2017-01-16 15:17:23 +01:00
if self . stopped :
return objectHashes
2017-01-16 23:17:56 +01:00
start = time . time ( )
2017-01-15 23:07:11 +01:00
try :
for objectHash in self . hashes . keys ( ) :
2017-01-16 19:36:58 +01:00
with self . lock :
if len ( objectHashes ) > = count :
break
if current_thread ( ) . peer not in self . pending :
self . addPending ( )
if ( self . pending [ current_thread ( ) . peer ] [ ' requested ' ] > = time . time ( ) - self . frequency or \
self . pending [ current_thread ( ) . peer ] [ ' received ' ] > = time . time ( ) - self . frequency ) and \
len ( self . pending [ current_thread ( ) . peer ] [ ' objects ' ] ) > = count :
break
2017-01-16 23:17:56 +01:00
if len ( self . hashes [ objectHash ] [ ' peers ' ] ) == 0 :
unreachableObjects . append ( objectHash )
continue
2017-01-16 19:36:58 +01:00
# requested too long ago or not at all from any thread
if self . hashes [ objectHash ] [ ' requested ' ] < time . time ( ) - self . frequency :
# ready requested from this thread but haven't received yet
if objectHash in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] :
# if still sending or receiving, request next
if self . pending [ current_thread ( ) . peer ] [ ' received ' ] > = time . time ( ) - self . frequency or \
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] > = time . time ( ) - self . frequency :
continue
# haven't requested or received anything recently, re-request (i.e. continue)
# the current node doesn't have the object
elif current_thread ( ) . peer not in self . hashes [ objectHash ] [ ' peers ' ] :
2017-01-16 15:17:23 +01:00
continue
2017-01-16 19:36:58 +01:00
# already requested too many times, remove all signs of this object
if self . hashes [ objectHash ] [ ' requestedCount ' ] > = self . maxRequestCount :
2017-01-16 17:08:47 +01:00
del self . hashes [ objectHash ]
for thread in self . pending . keys ( ) :
if objectHash in self . pending [ thread ] [ ' objects ' ] :
self . pending [ thread ] [ ' objects ' ] . remove ( objectHash )
2017-01-16 19:36:58 +01:00
continue
# all ok, request
objectHashes . append ( objectHash )
self . hashes [ objectHash ] [ ' requested ' ] = time . time ( )
2017-01-16 17:08:47 +01:00
self . hashes [ objectHash ] [ ' requestedCount ' ] + = 1
2017-01-16 19:36:58 +01:00
self . pending [ current_thread ( ) . peer ] [ ' requested ' ] = time . time ( )
self . addPending ( objectHash )
2017-01-15 23:07:11 +01:00
except ( RuntimeError , KeyError , ValueError ) :
# the for cycle sometimes breaks if you remove elements
pass
2017-01-16 23:17:56 +01:00
for objectHash in unreachableObjects :
with self . lock :
del self . hashes [ objectHash ]
# logger.debug("Pull took %.3f seconds", time.time() - start)
2017-01-15 23:07:11 +01:00
return objectHashes
2017-01-15 19:21:24 +01:00
2017-01-16 23:17:56 +01:00
def delete ( self , objectHash ) :
2017-01-15 19:21:24 +01:00
with self . lock :
if objectHash in self . hashes :
del self . hashes [ objectHash ]
2017-01-17 01:07:39 +01:00
if hasattr ( current_thread ( ) , ' peer ' ) and current_thread ( ) . peer in self . pending :
2017-01-17 00:32:05 +01:00
self . pending [ current_thread ( ) . peer ] [ ' received ' ] = time . time ( )
2017-01-16 23:17:56 +01:00
for thread in self . pending . keys ( ) :
with self . lock :
if objectHash in self . pending [ thread ] [ ' objects ' ] :
self . pending [ thread ] [ ' objects ' ] . remove ( objectHash )
2017-01-15 19:21:24 +01:00
2017-01-15 22:21:19 +01:00
def stop ( self ) :
with self . lock :
self . hashes = { }
2017-01-16 15:17:23 +01:00
self . pending = { }
2017-01-15 22:21:19 +01:00
2017-01-15 19:21:24 +01:00
def threadEnd ( self ) :
2017-01-16 19:36:58 +01:00
while True :
2017-01-16 15:17:23 +01:00
try :
2017-01-16 19:36:58 +01:00
with self . lock :
2017-01-16 23:17:56 +01:00
if current_thread ( ) . peer in self . pending :
for objectHash in self . pending [ current_thread ( ) . peer ] [ ' objects ' ] :
if objectHash in self . hashes :
self . hashes [ objectHash ] [ ' peers ' ] . remove ( current_thread ( ) . peer )
except ( KeyError ) :
2017-01-16 15:17:23 +01:00
pass
2017-01-16 19:36:58 +01:00
else :
break
2017-01-16 23:17:56 +01:00
with self . lock :
try :
del self . pending [ current_thread ( ) . peer ]
except KeyError :
pass
2017-01-19 19:48:12 +01:00
class PendingUploadDeadlineException ( Exception ) :
pass
@Singleton
class PendingUpload ( object ) :
# keep a track of objects that we have created but haven't distributed yet
def __init__ ( self ) :
super ( self . __class__ , self ) . __init__ ( )
self . lock = RLock ( )
self . hashes = { }
# end by this time in any case
self . deadline = 0
self . maxLen = 0
def add ( self , objectHash = None ) :
with self . lock :
# add a new object into existing thread lists
if objectHash :
if objectHash not in self . hashes :
self . hashes [ objectHash ] = [ ]
for thread in threadingEnumerate ( ) :
if thread . isAlive ( ) and hasattr ( thread , ' peer ' ) and \
thread . peer not in self . hashes [ objectHash ] :
self . hashes [ objectHash ] . append ( thread . peer )
# add all objects into the current thread
else :
2017-01-19 20:04:45 +01:00
for objectHash in self . hashes :
if current_thread ( ) . peer not in self . hashes [ objectHash ] :
self . hashes [ objectHash ] . append ( current_thread ( ) . peer )
2017-01-19 19:48:12 +01:00
def len ( self ) :
with self . lock :
return sum ( len ( self . hashes [ x ] ) > 0 for x in self . hashes )
def _progress ( self ) :
with self . lock :
return float ( sum ( len ( self . hashes [ x ] ) for x in self . hashes ) )
def progress ( self , throwDeadline = True ) :
if self . maxLen < self . _progress ( ) :
self . maxLen = self . _progress ( )
if self . deadline < time . time ( ) :
if self . deadline > 0 and throwDeadline :
raise PendingUploadDeadlineException
self . deadline = time . time ( ) + 20
try :
return 1.0 - self . _progress ( ) / self . maxLen
except ZeroDivisionError :
return 1.0
def delete ( self , objectHash ) :
if not hasattr ( current_thread ( ) , ' peer ' ) :
return
with self . lock :
if objectHash in self . hashes and current_thread ( ) . peer in self . hashes [ objectHash ] :
self . hashes [ objectHash ] . remove ( current_thread ( ) . peer )
if len ( self . hashes [ objectHash ] ) == 0 :
del self . hashes [ objectHash ]
def stop ( self ) :
with self . lock :
self . hashes = { }
def threadEnd ( self ) :
while True :
try :
with self . lock :
for objectHash in self . hashes :
if current_thread ( ) . peer in self . hashes [ objectHash ] :
self . hashes [ objectHash ] . remove ( current_thread ( ) . peer )
if len ( self . hashes [ objectHash ] ) == 0 :
del self . hashes [ objectHash ]
except ( KeyError , RuntimeError ) :
pass
else :
break