Refactor Inventory
This commit is contained in:
parent
7800272d3a
commit
554627dd92
|
@ -850,7 +850,6 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
TTL = 2.5 * 24 * 60 * 60
|
TTL = 2.5 * 24 * 60 * 60
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'')
|
objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'')
|
||||||
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
|
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
|
@ -898,7 +897,6 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
TTL = 28 * 24 * 60 * 60
|
TTL = 28 * 24 * 60 * 60
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'')
|
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'')
|
||||||
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
|
|
|
@ -48,12 +48,7 @@ from helper_threading import *
|
||||||
def connectToStream(streamNumber):
|
def connectToStream(streamNumber):
|
||||||
shared.streamsInWhichIAmParticipating[streamNumber] = 'no data'
|
shared.streamsInWhichIAmParticipating[streamNumber] = 'no data'
|
||||||
selfInitiatedConnections[streamNumber] = {}
|
selfInitiatedConnections[streamNumber] = {}
|
||||||
shared.inventorySets[streamNumber] = set()
|
|
||||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
|
||||||
for row in queryData:
|
|
||||||
shared.inventorySets[streamNumber].add(row[0])
|
|
||||||
|
|
||||||
|
|
||||||
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
||||||
# Some XP and Vista systems can only have 10 outgoing connections at a time.
|
# Some XP and Vista systems can only have 10 outgoing connections at a time.
|
||||||
maximumNumberOfHalfOpenConnections = 9
|
maximumNumberOfHalfOpenConnections = 9
|
||||||
|
|
|
@ -2305,16 +2305,12 @@ class MyForm(settingsmixin.SMainWindow):
|
||||||
# in the objectProcessorQueue to be processed
|
# in the objectProcessorQueue to be processed
|
||||||
if self.NewSubscriptionDialogInstance.ui.checkBoxDisplayMessagesAlreadyInInventory.isChecked():
|
if self.NewSubscriptionDialogInstance.ui.checkBoxDisplayMessagesAlreadyInInventory.isChecked():
|
||||||
status, addressVersion, streamNumber, ripe = decodeAddress(address)
|
status, addressVersion, streamNumber, ripe = decodeAddress(address)
|
||||||
shared.flushInventory()
|
shared.inventory.flush()
|
||||||
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
|
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
|
||||||
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
|
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
|
||||||
tag = doubleHashOfAddressData[32:]
|
tag = doubleHashOfAddressData[32:]
|
||||||
queryreturn = sqlQuery(
|
for value in shared.inventory.by_type_and_tag(3, tag):
|
||||||
'''select payload from inventory where objecttype=3 and tag=?''', tag)
|
shared.objectProcessorQueue.put((value.type, value.payload))
|
||||||
for row in queryreturn:
|
|
||||||
payload, = row
|
|
||||||
objectType = 3
|
|
||||||
shared.objectProcessorQueue.put((objectType,payload))
|
|
||||||
|
|
||||||
def click_pushButtonStatusIcon(self):
|
def click_pushButtonStatusIcon(self):
|
||||||
logger.debug('click_pushButtonStatusIcon')
|
logger.debug('click_pushButtonStatusIcon')
|
||||||
|
@ -4196,23 +4192,22 @@ class NewSubscriptionDialog(QtGui.QDialog):
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
||||||
_translate("MainWindow", "Address is an old type. We cannot display its past broadcasts."))
|
_translate("MainWindow", "Address is an old type. We cannot display its past broadcasts."))
|
||||||
else:
|
else:
|
||||||
shared.flushInventory()
|
shared.inventory.flush()
|
||||||
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
|
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
|
||||||
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
|
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()
|
||||||
tag = doubleHashOfAddressData[32:]
|
tag = doubleHashOfAddressData[32:]
|
||||||
queryreturn = sqlQuery(
|
count = len(shared.inventory.by_type_and_tag(3, tag))
|
||||||
'''select hash from inventory where objecttype=3 and tag=?''', tag)
|
if count == 0:
|
||||||
if len(queryreturn) == 0:
|
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
||||||
_translate("MainWindow", "There are no recent broadcasts from this address to display."))
|
_translate("MainWindow", "There are no recent broadcasts from this address to display."))
|
||||||
elif len(queryreturn) == 1:
|
elif count == 1:
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True)
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True)
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
||||||
_translate("MainWindow", "Display the %1 recent broadcast from this address.").arg(str(len(queryreturn))))
|
_translate("MainWindow", "Display the %1 recent broadcast from this address.").arg(count))
|
||||||
else:
|
else:
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True)
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True)
|
||||||
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
||||||
_translate("MainWindow", "Display the %1 recent broadcasts from this address.").arg(str(len(queryreturn))))
|
_translate("MainWindow", "Display the %1 recent broadcasts from this address.").arg(count))
|
||||||
|
|
||||||
|
|
||||||
class NewAddressDialog(QtGui.QDialog):
|
class NewAddressDialog(QtGui.QDialog):
|
||||||
|
|
|
@ -216,14 +216,8 @@ class receiveDataThread(threading.Thread):
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
logger.debug('Inventory (in memory) already has object listed in inv message.')
|
logger.debug('Inventory already has object listed in inv message.')
|
||||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash]
|
||||||
objectHash]
|
|
||||||
elif shared.isInSqlInventory(objectHash):
|
|
||||||
if shared.verbose >= 3:
|
|
||||||
logger.debug('Inventory (SQL on disk) already has object listed in inv message.')
|
|
||||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
|
||||||
objectHash]
|
|
||||||
else:
|
else:
|
||||||
# We don't have the object in our inventory. Let's request it.
|
# We don't have the object in our inventory. Let's request it.
|
||||||
self.sendgetdata(objectHash)
|
self.sendgetdata(objectHash)
|
||||||
|
@ -318,23 +312,10 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
def sendBigInv(self):
|
def sendBigInv(self):
|
||||||
# Select all hashes for objects in this stream.
|
# Select all hashes for objects in this stream.
|
||||||
queryreturn = sqlQuery(
|
|
||||||
'''SELECT hash FROM inventory WHERE expirestime>? and streamnumber=?''',
|
|
||||||
int(time.time()),
|
|
||||||
self.streamNumber)
|
|
||||||
bigInvList = {}
|
bigInvList = {}
|
||||||
for row in queryreturn:
|
for hash in shared.inventory.unexpired_hashes_by_stream(self.streamNumber):
|
||||||
hash, = row
|
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
|
||||||
bigInvList[hash] = 0
|
bigInvList[hash] = 0
|
||||||
# We also have messages in our inventory in memory (which is a python
|
|
||||||
# dictionary). Let's fetch those too.
|
|
||||||
with shared.inventoryLock:
|
|
||||||
for hash, storedValue in shared.inventory.items():
|
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
|
||||||
objectType, streamNumber, payload, expiresTime, tag = storedValue
|
|
||||||
if streamNumber == self.streamNumber and expiresTime > int(time.time()):
|
|
||||||
bigInvList[hash] = 0
|
|
||||||
numberOfObjectsInInvMessage = 0
|
numberOfObjectsInInvMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# Now let us start appending all of these hashes together. They will be
|
# Now let us start appending all of these hashes together. They will be
|
||||||
|
@ -440,9 +421,7 @@ class receiveDataThread(threading.Thread):
|
||||||
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
||||||
logger.debug('Inventory (in memory) has inventory item already.')
|
logger.debug('Inventory has inventory item already.')
|
||||||
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
|
||||||
logger.debug('Inventory (SQL on disk) has inventory item already.')
|
|
||||||
else:
|
else:
|
||||||
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
||||||
else:
|
else:
|
||||||
|
@ -453,7 +432,7 @@ class receiveDataThread(threading.Thread):
|
||||||
advertisedSet = set()
|
advertisedSet = set()
|
||||||
for i in range(numberOfItemsInInv):
|
for i in range(numberOfItemsInInv):
|
||||||
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
||||||
objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
|
objectsNewToMe = advertisedSet - shared.inventory.hashes_by_stream(self.streamNumber)
|
||||||
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||||
for item in objectsNewToMe:
|
for item in objectsNewToMe:
|
||||||
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation
|
||||||
|
@ -491,20 +470,10 @@ class receiveDataThread(threading.Thread):
|
||||||
if self.objectHashHolderInstance.hasHash(hash):
|
if self.objectHashHolderInstance.hasHash(hash):
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
self.antiIntersectionDelay()
|
self.antiIntersectionDelay()
|
||||||
elif hash in shared.inventory:
|
|
||||||
objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash]
|
|
||||||
shared.inventoryLock.release()
|
|
||||||
self.sendObject(payload)
|
|
||||||
else:
|
else:
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
queryreturn = sqlQuery(
|
if hash in shared.inventory:
|
||||||
'''select payload from inventory where hash=? and expirestime>=?''',
|
self.sendObject(shared.inventory[hash].payload)
|
||||||
hash,
|
|
||||||
int(time.time()))
|
|
||||||
if queryreturn != []:
|
|
||||||
for row in queryreturn:
|
|
||||||
payload, = row
|
|
||||||
self.sendObject(payload)
|
|
||||||
else:
|
else:
|
||||||
self.antiIntersectionDelay()
|
self.antiIntersectionDelay()
|
||||||
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
|
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
|
||||||
|
|
|
@ -46,19 +46,7 @@ class singleCleaner(threading.Thread, StoppableThread):
|
||||||
while shared.shutdown == 0:
|
while shared.shutdown == 0:
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
||||||
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
|
shared.inventory.flush()
|
||||||
with SqlBulkExecute() as sql:
|
|
||||||
for hash, storedValue in shared.inventory.items():
|
|
||||||
objectType, streamNumber, payload, expiresTime, tag = storedValue
|
|
||||||
sql.execute(
|
|
||||||
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
|
||||||
hash,
|
|
||||||
objectType,
|
|
||||||
streamNumber,
|
|
||||||
payload,
|
|
||||||
expiresTime,
|
|
||||||
tag)
|
|
||||||
del shared.inventory[hash]
|
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
|
@ -70,9 +58,7 @@ class singleCleaner(threading.Thread, StoppableThread):
|
||||||
shared.UISignalQueue.queue.clear()
|
shared.UISignalQueue.queue.clear()
|
||||||
if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380:
|
if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380:
|
||||||
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
|
timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
|
||||||
sqlExecute(
|
shared.inventory.clean()
|
||||||
'''DELETE FROM inventory WHERE expirestime<? ''',
|
|
||||||
int(time.time()) - (60 * 60 * 3))
|
|
||||||
# pubkeys
|
# pubkeys
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',
|
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',
|
||||||
|
@ -94,20 +80,6 @@ class singleCleaner(threading.Thread, StoppableThread):
|
||||||
elif status == 'msgsent':
|
elif status == 'msgsent':
|
||||||
resendMsg(ackData)
|
resendMsg(ackData)
|
||||||
|
|
||||||
# Let's also clear and reload shared.inventorySets to keep it from
|
|
||||||
# taking up an unnecessary amount of memory.
|
|
||||||
for streamNumber in shared.inventorySets:
|
|
||||||
shared.inventorySets[streamNumber] = set()
|
|
||||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
|
||||||
for row in queryData:
|
|
||||||
shared.inventorySets[streamNumber].add(row[0])
|
|
||||||
with shared.inventoryLock:
|
|
||||||
for hash, storedValue in shared.inventory.items():
|
|
||||||
objectType, streamNumber, payload, expiresTime, tag = storedValue
|
|
||||||
if not streamNumber in shared.inventorySets:
|
|
||||||
shared.inventorySets[streamNumber] = set()
|
|
||||||
shared.inventorySets[streamNumber].add(hash)
|
|
||||||
|
|
||||||
# Let us write out the knowNodes to disk if there is anything new to write out.
|
# Let us write out the knowNodes to disk if there is anything new to write out.
|
||||||
if shared.needToWriteKnownNodesToDisk:
|
if shared.needToWriteKnownNodesToDisk:
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
|
|
|
@ -158,7 +158,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 1
|
objectType = 1
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime,'')
|
objectType, streamNumber, payload, embeddedTime,'')
|
||||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
|
|
||||||
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
||||||
|
|
||||||
|
@ -249,7 +248,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 1
|
objectType = 1
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime,'')
|
objectType, streamNumber, payload, embeddedTime,'')
|
||||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
|
|
||||||
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
||||||
|
|
||||||
|
@ -340,7 +338,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 1
|
objectType = 1
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:])
|
objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:])
|
||||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
|
|
||||||
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
||||||
|
|
||||||
|
@ -463,7 +460,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 3
|
objectType = 3
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime, tag)
|
objectType, streamNumber, payload, embeddedTime, tag)
|
||||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
logger.info('sending inv (within sendBroadcast function) for object: ' + inventoryHash.encode('hex'))
|
logger.info('sending inv (within sendBroadcast function) for object: ' + inventoryHash.encode('hex'))
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'advertiseobject', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
@ -558,37 +554,20 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
privEncryptionKey = doubleHashOfToAddressData[:32] # The first half of the sha512 hash.
|
privEncryptionKey = doubleHashOfToAddressData[:32] # The first half of the sha512 hash.
|
||||||
tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash.
|
tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash.
|
||||||
shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')))
|
shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')))
|
||||||
|
|
||||||
queryreturn = sqlQuery(
|
|
||||||
'''SELECT payload FROM inventory WHERE objecttype=1 and tag=? ''', toTag)
|
|
||||||
if queryreturn != []: # if there are any pubkeys in our inventory with the correct tag..
|
|
||||||
for row in queryreturn:
|
|
||||||
payload, = row
|
|
||||||
if shared.decryptAndCheckPubkeyPayload(payload, toaddress) == 'successful':
|
|
||||||
needToRequestPubkey = False
|
|
||||||
sqlExecute(
|
|
||||||
'''UPDATE sent SET status='doingmsgpow', retrynumber=0 WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''',
|
|
||||||
toaddress)
|
|
||||||
del shared.neededPubkeys[tag]
|
|
||||||
break
|
|
||||||
#else: # There was something wrong with this pubkey object even
|
|
||||||
# though it had the correct tag- almost certainly because
|
|
||||||
# of malicious behavior or a badly programmed client. If
|
|
||||||
# there are any other pubkeys in our inventory with the correct
|
|
||||||
# tag then we'll try to decrypt those.
|
|
||||||
|
|
||||||
if needToRequestPubkey: # Obviously we had no success looking in the sql inventory. Let's look through the memory inventory.
|
for value in shared.inventory.by_type_and_tag(1, toTag):
|
||||||
with shared.inventoryLock:
|
if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table.
|
||||||
for hash, storedValue in shared.inventory.items():
|
needToRequestPubkey = False
|
||||||
objectType, streamNumber, payload, expiresTime, tag = storedValue
|
sqlExecute(
|
||||||
if objectType == 1 and tag == toTag:
|
'''UPDATE sent SET status='doingmsgpow', retrynumber=0 WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''',
|
||||||
if shared.decryptAndCheckPubkeyPayload(payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table.
|
toaddress)
|
||||||
needToRequestPubkey = False
|
del shared.neededPubkeys[tag]
|
||||||
sqlExecute(
|
break
|
||||||
'''UPDATE sent SET status='doingmsgpow', retrynumber=0 WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''',
|
#else: # There was something wrong with this pubkey object even
|
||||||
toaddress)
|
# though it had the correct tag- almost certainly because
|
||||||
del shared.neededPubkeys[tag]
|
# of malicious behavior or a badly programmed client. If
|
||||||
break
|
# there are any other pubkeys in our inventory with the correct
|
||||||
|
# tag then we'll try to decrypt those.
|
||||||
if needToRequestPubkey:
|
if needToRequestPubkey:
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
'''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''',
|
'''UPDATE sent SET status='doingpubkeypow' WHERE toaddress=? AND status='msgqueued' ''',
|
||||||
|
@ -811,7 +790,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 2
|
objectType = 2
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
|
objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
|
||||||
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
|
||||||
if shared.config.has_section(toaddress) or not checkBitfield(behaviorBitfield, shared.BITFIELD_DOESACK):
|
if shared.config.has_section(toaddress) or not checkBitfield(behaviorBitfield, shared.BITFIELD_DOESACK):
|
||||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp()))))
|
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp()))))
|
||||||
else:
|
else:
|
||||||
|
@ -921,7 +899,6 @@ class singleWorker(threading.Thread, StoppableThread):
|
||||||
objectType = 1
|
objectType = 1
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime, '')
|
objectType, streamNumber, payload, embeddedTime, '')
|
||||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
logger.info('sending inv (for the getpubkey message)')
|
logger.info('sending inv (for the getpubkey message)')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'advertiseobject', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
|
127
src/shared.py
127
src/shared.py
|
@ -49,8 +49,7 @@ addressGeneratorQueue = Queue.Queue()
|
||||||
knownNodesLock = threading.Lock()
|
knownNodesLock = threading.Lock()
|
||||||
knownNodes = {}
|
knownNodes = {}
|
||||||
sendDataQueues = [] #each sendData thread puts its queue in this list.
|
sendDataQueues = [] #each sendData thread puts its queue in this list.
|
||||||
inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
|
inventoryLock = threading.RLock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
|
||||||
inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
|
|
||||||
printLock = threading.Lock()
|
printLock = threading.Lock()
|
||||||
appdata = '' #holds the location of the application data storage directory
|
appdata = '' #holds the location of the application data storage directory
|
||||||
statusIconColor = 'red'
|
statusIconColor = 'red'
|
||||||
|
@ -85,7 +84,6 @@ lastTimeWeResetBytesSent = 0 # used for the bandwidth rate limit
|
||||||
sendDataLock = threading.Lock() # used for the bandwidth rate limit
|
sendDataLock = threading.Lock() # used for the bandwidth rate limit
|
||||||
receiveDataLock = threading.Lock() # used for the bandwidth rate limit
|
receiveDataLock = threading.Lock() # used for the bandwidth rate limit
|
||||||
daemon = False
|
daemon = False
|
||||||
inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
|
||||||
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
||||||
maximumLengthOfTimeToBotherResendingMessages = 0
|
maximumLengthOfTimeToBotherResendingMessages = 0
|
||||||
objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed.
|
objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed.
|
||||||
|
@ -135,6 +133,88 @@ NODE_SSL = 2
|
||||||
#Bitfield flags
|
#Bitfield flags
|
||||||
BITFIELD_DOESACK = 1
|
BITFIELD_DOESACK = 1
|
||||||
|
|
||||||
|
import collections
|
||||||
|
|
||||||
|
InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
|
||||||
|
|
||||||
|
|
||||||
|
class Inventory(collections.MutableMapping):
|
||||||
|
def __init__(self):
|
||||||
|
super(Inventory, self).__init__()
|
||||||
|
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
|
||||||
|
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
||||||
|
|
||||||
|
def __contains__(self, hash):
|
||||||
|
global numberOfInventoryLookupsPerformed
|
||||||
|
with inventoryLock:
|
||||||
|
numberOfInventoryLookupsPerformed += 1
|
||||||
|
if hash in self._inventory:
|
||||||
|
return True
|
||||||
|
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))
|
||||||
|
|
||||||
|
def __getitem__(self, hash):
|
||||||
|
with inventoryLock:
|
||||||
|
if hash in self._inventory:
|
||||||
|
return self._inventory[hash]
|
||||||
|
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
|
||||||
|
if not rows:
|
||||||
|
raise KeyError(hash)
|
||||||
|
return InventoryItem(*rows[0])
|
||||||
|
|
||||||
|
def __setitem__(self, hash, value):
|
||||||
|
with inventoryLock:
|
||||||
|
value = InventoryItem(*value)
|
||||||
|
self._inventory[hash] = value
|
||||||
|
self._streams[value.stream].add(hash)
|
||||||
|
|
||||||
|
def __delitem__(self, hash):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
with inventoryLock:
|
||||||
|
hashes = self._inventory.keys()[:]
|
||||||
|
hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory'))
|
||||||
|
return hashes.__iter__()
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
with inventoryLock:
|
||||||
|
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
|
||||||
|
|
||||||
|
def by_type_and_tag(self, type, tag):
|
||||||
|
with inventoryLock:
|
||||||
|
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
|
||||||
|
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
|
||||||
|
return values
|
||||||
|
|
||||||
|
def hashes_by_stream(self, stream):
|
||||||
|
with inventoryLock:
|
||||||
|
return self._streams[stream]
|
||||||
|
|
||||||
|
def unexpired_hashes_by_stream(self, stream):
|
||||||
|
with inventoryLock:
|
||||||
|
t = int(time.time())
|
||||||
|
hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t]
|
||||||
|
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
|
||||||
|
return hashes
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
|
||||||
|
with SqlBulkExecute() as sql:
|
||||||
|
for hash, value in self._inventory.items():
|
||||||
|
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value)
|
||||||
|
self._inventory.clear()
|
||||||
|
|
||||||
|
def clean(self):
|
||||||
|
with inventoryLock:
|
||||||
|
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
|
||||||
|
self._streams.clear()
|
||||||
|
for hash, value in self.items():
|
||||||
|
self._streams[value.stream].add(hash)
|
||||||
|
|
||||||
|
|
||||||
|
inventory = Inventory()
|
||||||
|
|
||||||
|
|
||||||
#Create a packet
|
#Create a packet
|
||||||
def CreatePacket(command, payload=''):
|
def CreatePacket(command, payload=''):
|
||||||
payload_length = len(payload)
|
payload_length = len(payload)
|
||||||
|
@ -145,9 +225,6 @@ def CreatePacket(command, payload=''):
|
||||||
b[Header.size:] = payload
|
b[Header.size:] = payload
|
||||||
return bytes(b)
|
return bytes(b)
|
||||||
|
|
||||||
def isInSqlInventory(hash):
|
|
||||||
queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash)
|
|
||||||
return queryreturn != []
|
|
||||||
|
|
||||||
def encodeHost(host):
|
def encodeHost(host):
|
||||||
if host.find('.onion') > -1:
|
if host.find('.onion') > -1:
|
||||||
|
@ -417,8 +494,8 @@ def doCleanShutdown():
|
||||||
UISignalQueue.put((
|
UISignalQueue.put((
|
||||||
'updateStatusBar',
|
'updateStatusBar',
|
||||||
'Flushing inventory in memory out to disk. This should normally only take a second...'))
|
'Flushing inventory in memory out to disk. This should normally only take a second...'))
|
||||||
flushInventory()
|
inventory.flush()
|
||||||
|
|
||||||
# Verify that the objectProcessor has finished exiting. It should have incremented the
|
# Verify that the objectProcessor has finished exiting. It should have incremented the
|
||||||
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
|
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
|
||||||
while shutdown == 1:
|
while shutdown == 1:
|
||||||
|
@ -452,15 +529,6 @@ def broadcastToSendDataQueues(data):
|
||||||
# logger.debug('running broadcastToSendDataQueues')
|
# logger.debug('running broadcastToSendDataQueues')
|
||||||
for q in sendDataQueues:
|
for q in sendDataQueues:
|
||||||
q.put(data)
|
q.put(data)
|
||||||
|
|
||||||
def flushInventory():
|
|
||||||
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
|
|
||||||
with SqlBulkExecute() as sql:
|
|
||||||
for hash, storedValue in inventory.items():
|
|
||||||
objectType, streamNumber, payload, expiresTime, tag = storedValue
|
|
||||||
sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
|
||||||
hash,objectType,streamNumber,payload,expiresTime,tag)
|
|
||||||
del inventory[hash]
|
|
||||||
|
|
||||||
def fixPotentiallyInvalidUTF8Data(text):
|
def fixPotentiallyInvalidUTF8Data(text):
|
||||||
try:
|
try:
|
||||||
|
@ -703,14 +771,9 @@ def _checkAndShareUndefinedObjectWithPeers(data):
|
||||||
logger.debug('We have already received this undefined object. Ignoring.')
|
logger.debug('We have already received this undefined object. Ignoring.')
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
elif isInSqlInventory(inventoryHash):
|
|
||||||
logger.debug('We have already received this undefined object (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
||||||
inventoryLock.release()
|
|
||||||
return
|
|
||||||
objectType, = unpack('>I', data[16:20])
|
objectType, = unpack('>I', data[16:20])
|
||||||
inventory[inventoryHash] = (
|
inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, data, embeddedTime,'')
|
objectType, streamNumber, data, embeddedTime,'')
|
||||||
inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
@ -735,15 +798,10 @@ def _checkAndShareMsgWithPeers(data):
|
||||||
logger.debug('We have already received this msg message. Ignoring.')
|
logger.debug('We have already received this msg message. Ignoring.')
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
elif isInSqlInventory(inventoryHash):
|
|
||||||
logger.debug('We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
||||||
inventoryLock.release()
|
|
||||||
return
|
|
||||||
# This msg message is valid. Let's let our peers know about it.
|
# This msg message is valid. Let's let our peers know about it.
|
||||||
objectType = 2
|
objectType = 2
|
||||||
inventory[inventoryHash] = (
|
inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, data, embeddedTime,'')
|
objectType, streamNumber, data, embeddedTime,'')
|
||||||
inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
||||||
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
@ -776,15 +834,10 @@ def _checkAndShareGetpubkeyWithPeers(data):
|
||||||
logger.debug('We have already received this getpubkey request. Ignoring it.')
|
logger.debug('We have already received this getpubkey request. Ignoring it.')
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
elif isInSqlInventory(inventoryHash):
|
|
||||||
logger.debug('We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
||||||
inventoryLock.release()
|
|
||||||
return
|
|
||||||
|
|
||||||
objectType = 0
|
objectType = 0
|
||||||
inventory[inventoryHash] = (
|
inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, data, embeddedTime,'')
|
objectType, streamNumber, data, embeddedTime,'')
|
||||||
inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
# This getpubkey request is valid. Forward to peers.
|
# This getpubkey request is valid. Forward to peers.
|
||||||
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
||||||
|
@ -820,14 +873,9 @@ def _checkAndSharePubkeyWithPeers(data):
|
||||||
logger.debug('We have already received this pubkey. Ignoring it.')
|
logger.debug('We have already received this pubkey. Ignoring it.')
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
elif isInSqlInventory(inventoryHash):
|
|
||||||
logger.debug('We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
||||||
inventoryLock.release()
|
|
||||||
return
|
|
||||||
objectType = 1
|
objectType = 1
|
||||||
inventory[inventoryHash] = (
|
inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
objectType, streamNumber, data, embeddedTime, tag)
|
||||||
inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
# This object is valid. Forward it to peers.
|
# This object is valid. Forward it to peers.
|
||||||
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
||||||
|
@ -864,15 +912,10 @@ def _checkAndShareBroadcastWithPeers(data):
|
||||||
logger.debug('We have already received this broadcast object. Ignoring.')
|
logger.debug('We have already received this broadcast object. Ignoring.')
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
elif isInSqlInventory(inventoryHash):
|
|
||||||
logger.debug('We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
||||||
inventoryLock.release()
|
|
||||||
return
|
|
||||||
# It is valid. Let's let our peers know about it.
|
# It is valid. Let's let our peers know about it.
|
||||||
objectType = 3
|
objectType = 3
|
||||||
inventory[inventoryHash] = (
|
inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, data, embeddedTime, tag)
|
objectType, streamNumber, data, embeddedTime, tag)
|
||||||
inventorySets[streamNumber].add(inventoryHash)
|
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
# This object is valid. Forward it to peers.
|
# This object is valid. Forward it to peers.
|
||||||
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
|
||||||
|
|
Reference in New Issue
Block a user