Flood mitigation optimisation

Flood mitigation was done both in the ObjectProcessorQueue as well as
receiveData threads. This patch removes the mitigation in receiveData
threads and cleans up the one in the ObjectProcessorQueue
This commit is contained in:
Peter Šurda 2016-01-22 11:17:10 +01:00
parent 2043d796dd
commit e4f31d25fc
4 changed files with 13 additions and 47 deletions

View File

@ -2503,9 +2503,7 @@ class MyForm(settingsmixin.SMainWindow):
for row in queryreturn: for row in queryreturn:
payload, = row payload, = row
objectType = 3 objectType = 3
with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueue.put((objectType,payload))
shared.objectProcessorQueueSize += len(payload)
shared.objectProcessorQueue.put((objectType,payload))
def click_pushButtonStatusIcon(self): def click_pushButtonStatusIcon(self):
logger.debug('click_pushButtonStatusIcon') logger.debug('click_pushButtonStatusIcon')

View File

@ -39,11 +39,9 @@ class objectProcessor(threading.Thread):
""" """
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''') '''SELECT objecttype, data FROM objectprocessorqueue''')
with shared.objectProcessorQueueSizeLock: for row in queryreturn:
for row in queryreturn: objectType, data = row
objectType, data = row shared.objectProcessorQueue.put((objectType,data))
shared.objectProcessorQueueSize += len(data)
shared.objectProcessorQueue.put((objectType,data))
sqlExecute('''DELETE FROM objectprocessorqueue''') sqlExecute('''DELETE FROM objectprocessorqueue''')
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
@ -70,19 +68,14 @@ class objectProcessor(threading.Thread):
except Exception as e: except Exception as e:
logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc()) logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc())
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
if shared.shutdown: if shared.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0 numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql: with SqlBulkExecute() as sql:
while shared.objectProcessorQueueSize > 1: while shared.objectProcessorQueue.curSize > 1:
objectType, data = shared.objectProcessorQueue.get() objectType, data = shared.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType,data) objectType,data)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
numberOfObjectsThatWereInTheObjectProcessorQueue += 1 numberOfObjectsThatWereInTheObjectProcessorQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue)) logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue))
shared.shutdown = 2 shared.shutdown = 2

View File

@ -1,4 +1,4 @@
doTimingAttackMitigation = True doTimingAttackMitigation = False
import errno import errno
import time import time

View File

@ -28,6 +28,7 @@ import traceback
# Project imports. # Project imports.
from addresses import * from addresses import *
from class_objectProcessorQueue import ObjectProcessorQueue
import highlevelcrypto import highlevelcrypto
import shared import shared
#import helper_startup #import helper_startup
@ -50,8 +51,6 @@ sendDataQueues = [] #each sendData thread puts its queue in this list.
inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
printLock = threading.Lock() printLock = threading.Lock()
objectProcessorQueueSizeLock = threading.Lock()
objectProcessorQueueSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
appdata = '' #holds the location of the application data storage directory appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red' statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
@ -87,8 +86,7 @@ daemon = False
inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually. needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
maximumLengthOfTimeToBotherResendingMessages = 0 maximumLengthOfTimeToBotherResendingMessages = 0
objectProcessorQueue = Queue.Queue( objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed.
) # receiveDataThreads dump objects they hear on the network into this queue to be processed.
streamsInWhichIAmParticipating = {} streamsInWhichIAmParticipating = {}
# sanity check, prevent doing ridiculous PoW # sanity check, prevent doing ridiculous PoW
@ -397,10 +395,7 @@ def doCleanShutdown():
global shutdown global shutdown
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
broadcastToSendDataQueues((0, 'shutdown', 'no data')) broadcastToSendDataQueues((0, 'shutdown', 'no data'))
with shared.objectProcessorQueueSizeLock: objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
data = 'no data'
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put(('checkShutdownVariable',data))
knownNodesLock.acquire() knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
@ -751,12 +746,7 @@ def _checkAndShareMsgWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's enqueue it to be processed ourselves. # Now let's enqueue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now. objectProcessorQueue.put((objectType,data))
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
def _checkAndShareGetpubkeyWithPeers(data): def _checkAndShareGetpubkeyWithPeers(data):
if len(data) < 42: if len(data) < 42:
@ -798,12 +788,7 @@ def _checkAndShareGetpubkeyWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves. # Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now. objectProcessorQueue.put((objectType,data))
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
def _checkAndSharePubkeyWithPeers(data): def _checkAndSharePubkeyWithPeers(data):
if len(data) < 146 or len(data) > 440: # sanity check if len(data) < 146 or len(data) > 440: # sanity check
@ -847,12 +832,7 @@ def _checkAndSharePubkeyWithPeers(data):
# Now let's queue it to be processed ourselves. # Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now. objectProcessorQueue.put((objectType,data))
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
def _checkAndShareBroadcastWithPeers(data): def _checkAndShareBroadcastWithPeers(data):
@ -896,12 +876,7 @@ def _checkAndShareBroadcastWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves. # Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now. objectProcessorQueue.put((objectType,data))
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
def openKeysFile(): def openKeysFile():
if 'linux' in sys.platform: if 'linux' in sys.platform: