From e781420f4d8cec27b74d62752cb18573b105e0fd Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 22 Jan 2016 11:17:10 +0100 Subject: [PATCH] Flood mitigation optimisation Flood mitigation was done both in the ObjectProcessorQueue as well as receiveData threads. This patch removes the mitigation in receiveData threads and cleans up the one in the ObjectProcessorQueue --- src/bitmessageqt/__init__.py | 4 +--- src/class_objectProcessor.py | 15 ++++--------- src/class_receiveDataThread.py | 2 +- src/shared.py | 39 ++++++---------------------------- 4 files changed, 13 insertions(+), 47 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 631cb4ab..65cbb145 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2504,9 +2504,7 @@ class MyForm(settingsmixin.SMainWindow): for row in queryreturn: payload, = row objectType = 3 - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(payload) - shared.objectProcessorQueue.put((objectType,payload)) + shared.objectProcessorQueue.put((objectType,payload)) def click_pushButtonStatusIcon(self): logger.debug('click_pushButtonStatusIcon') diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index f7b0e893..48e90a22 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -39,11 +39,9 @@ class objectProcessor(threading.Thread): """ queryreturn = sqlQuery( '''SELECT objecttype, data FROM objectprocessorqueue''') - with shared.objectProcessorQueueSizeLock: - for row in queryreturn: - objectType, data = row - shared.objectProcessorQueueSize += len(data) - shared.objectProcessorQueue.put((objectType,data)) + for row in queryreturn: + objectType, data = row + shared.objectProcessorQueue.put((objectType,data)) sqlExecute('''DELETE FROM objectprocessorqueue''') logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) @@ -70,19 +68,14 @@ class objectProcessor(threading.Thread): except Exception as e: logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc()) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. - if shared.shutdown: time.sleep(.5) # Wait just a moment for most of the connections to close numberOfObjectsThatWereInTheObjectProcessorQueue = 0 with SqlBulkExecute() as sql: - while shared.objectProcessorQueueSize > 1: + while shared.objectProcessorQueue.curSize > 1: objectType, data = shared.objectProcessorQueue.get() sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', objectType,data) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. numberOfObjectsThatWereInTheObjectProcessorQueue += 1 logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue)) shared.shutdown = 2 diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 58a54346..59399d7a 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -1,4 +1,4 @@ -doTimingAttackMitigation = True +doTimingAttackMitigation = False import errno import time diff --git a/src/shared.py b/src/shared.py index 12ca77b3..ce77da31 100644 --- a/src/shared.py +++ b/src/shared.py @@ -28,6 +28,7 @@ import traceback # Project imports. from addresses import * +from class_objectProcessorQueue import ObjectProcessorQueue import highlevelcrypto import shared #import helper_startup @@ -50,8 +51,6 @@ sendDataQueues = [] #each sendData thread puts its queue in this list. inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) printLock = threading.Lock() -objectProcessorQueueSizeLock = threading.Lock() -objectProcessorQueueSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. appdata = '' #holds the location of the application data storage directory statusIconColor = 'red' connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. @@ -87,8 +86,7 @@ daemon = False inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually. maximumLengthOfTimeToBotherResendingMessages = 0 -objectProcessorQueue = Queue.Queue( - ) # receiveDataThreads dump objects they hear on the network into this queue to be processed. +objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. streamsInWhichIAmParticipating = {} # sanity check, prevent doing ridiculous PoW @@ -397,10 +395,7 @@ def doCleanShutdown(): global shutdown shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. broadcastToSendDataQueues((0, 'shutdown', 'no data')) - with shared.objectProcessorQueueSizeLock: - data = 'no data' - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put(('checkShutdownVariable',data)) + objectProcessorQueue.put(('checkShutdownVariable', 'no data')) knownNodesLock.acquire() UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) @@ -751,12 +746,7 @@ def _checkAndShareMsgWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's enqueue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndShareGetpubkeyWithPeers(data): if len(data) < 42: @@ -798,12 +788,7 @@ def _checkAndShareGetpubkeyWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndSharePubkeyWithPeers(data): if len(data) < 146 or len(data) > 440: # sanity check @@ -847,12 +832,7 @@ def _checkAndSharePubkeyWithPeers(data): # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndShareBroadcastWithPeers(data): @@ -896,12 +876,7 @@ def _checkAndShareBroadcastWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def openKeysFile(): if 'linux' in sys.platform: