Flood mitigation optimisation

Flood mitigation was done both in the ObjectProcessorQueue as well as
receiveData threads. This patch removes the mitigation in receiveData
threads and cleans up the one in the ObjectProcessorQueue
This commit is contained in:
Peter Šurda 2016-01-22 11:17:10 +01:00
parent 5c4cafbcc7
commit e781420f4d
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 13 additions and 47 deletions

View File

@ -2504,9 +2504,7 @@ class MyForm(settingsmixin.SMainWindow):
for row in queryreturn:
payload, = row
objectType = 3
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(payload)
shared.objectProcessorQueue.put((objectType,payload))
shared.objectProcessorQueue.put((objectType,payload))
def click_pushButtonStatusIcon(self):
logger.debug('click_pushButtonStatusIcon')

View File

@ -39,11 +39,9 @@ class objectProcessor(threading.Thread):
"""
queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''')
with shared.objectProcessorQueueSizeLock:
for row in queryreturn:
objectType, data = row
shared.objectProcessorQueueSize += len(data)
shared.objectProcessorQueue.put((objectType,data))
for row in queryreturn:
objectType, data = row
shared.objectProcessorQueue.put((objectType,data))
sqlExecute('''DELETE FROM objectprocessorqueue''')
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
@ -70,19 +68,14 @@ class objectProcessor(threading.Thread):
except Exception as e:
logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc())
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
if shared.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql:
while shared.objectProcessorQueueSize > 1:
while shared.objectProcessorQueue.curSize > 1:
objectType, data = shared.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType,data)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
numberOfObjectsThatWereInTheObjectProcessorQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue))
shared.shutdown = 2

View File

@ -1,4 +1,4 @@
doTimingAttackMitigation = True
doTimingAttackMitigation = False
import errno
import time

View File

@ -28,6 +28,7 @@ import traceback
# Project imports.
from addresses import *
from class_objectProcessorQueue import ObjectProcessorQueue
import highlevelcrypto
import shared
#import helper_startup
@ -50,8 +51,6 @@ sendDataQueues = [] #each sendData thread puts its queue in this list.
inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
printLock = threading.Lock()
objectProcessorQueueSizeLock = threading.Lock()
objectProcessorQueueSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
@ -87,8 +86,7 @@ daemon = False
inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
maximumLengthOfTimeToBotherResendingMessages = 0
objectProcessorQueue = Queue.Queue(
) # receiveDataThreads dump objects they hear on the network into this queue to be processed.
objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed.
streamsInWhichIAmParticipating = {}
# sanity check, prevent doing ridiculous PoW
@ -397,10 +395,7 @@ def doCleanShutdown():
global shutdown
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
broadcastToSendDataQueues((0, 'shutdown', 'no data'))
with shared.objectProcessorQueueSizeLock:
data = 'no data'
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put(('checkShutdownVariable',data))
objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
@ -751,12 +746,7 @@ def _checkAndShareMsgWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's enqueue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now.
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
objectProcessorQueue.put((objectType,data))
def _checkAndShareGetpubkeyWithPeers(data):
if len(data) < 42:
@ -798,12 +788,7 @@ def _checkAndShareGetpubkeyWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now.
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
objectProcessorQueue.put((objectType,data))
def _checkAndSharePubkeyWithPeers(data):
if len(data) < 146 or len(data) > 440: # sanity check
@ -847,12 +832,7 @@ def _checkAndSharePubkeyWithPeers(data):
# Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now.
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
objectProcessorQueue.put((objectType,data))
def _checkAndShareBroadcastWithPeers(data):
@ -896,12 +876,7 @@ def _checkAndShareBroadcastWithPeers(data):
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now.
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
objectProcessorQueue.put((objectType,data))
def openKeysFile():
if 'linux' in sys.platform: