diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 95497099..753864a3 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2503,9 +2503,7 @@ class MyForm(settingsmixin.SMainWindow): for row in queryreturn: payload, = row objectType = 3 - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(payload) - shared.objectProcessorQueue.put((objectType,payload)) + shared.objectProcessorQueue.put((objectType,payload)) def click_pushButtonStatusIcon(self): logger.debug('click_pushButtonStatusIcon') diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index f7b0e893..48e90a22 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -39,11 +39,9 @@ class objectProcessor(threading.Thread): """ queryreturn = sqlQuery( '''SELECT objecttype, data FROM objectprocessorqueue''') - with shared.objectProcessorQueueSizeLock: - for row in queryreturn: - objectType, data = row - shared.objectProcessorQueueSize += len(data) - shared.objectProcessorQueue.put((objectType,data)) + for row in queryreturn: + objectType, data = row + shared.objectProcessorQueue.put((objectType,data)) sqlExecute('''DELETE FROM objectprocessorqueue''') logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) @@ -70,19 +68,14 @@ class objectProcessor(threading.Thread): except Exception as e: logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc()) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. - if shared.shutdown: time.sleep(.5) # Wait just a moment for most of the connections to close numberOfObjectsThatWereInTheObjectProcessorQueue = 0 with SqlBulkExecute() as sql: - while shared.objectProcessorQueueSize > 1: + while shared.objectProcessorQueue.curSize > 1: objectType, data = shared.objectProcessorQueue.get() sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', objectType,data) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. numberOfObjectsThatWereInTheObjectProcessorQueue += 1 logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue)) shared.shutdown = 2 diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 58a54346..59399d7a 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -1,4 +1,4 @@ -doTimingAttackMitigation = True +doTimingAttackMitigation = False import errno import time diff --git a/src/shared.py b/src/shared.py index 57748962..2414945e 100644 --- a/src/shared.py +++ b/src/shared.py @@ -28,6 +28,7 @@ import traceback # Project imports. from addresses import * +from class_objectProcessorQueue import ObjectProcessorQueue import highlevelcrypto import shared #import helper_startup @@ -50,8 +51,6 @@ sendDataQueues = [] #each sendData thread puts its queue in this list. inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) printLock = threading.Lock() -objectProcessorQueueSizeLock = threading.Lock() -objectProcessorQueueSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects. appdata = '' #holds the location of the application data storage directory statusIconColor = 'red' connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. @@ -87,8 +86,7 @@ daemon = False inventorySets = {1: set()} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually. maximumLengthOfTimeToBotherResendingMessages = 0 -objectProcessorQueue = Queue.Queue( - ) # receiveDataThreads dump objects they hear on the network into this queue to be processed. +objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed. streamsInWhichIAmParticipating = {} # sanity check, prevent doing ridiculous PoW @@ -397,10 +395,7 @@ def doCleanShutdown(): global shutdown shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. broadcastToSendDataQueues((0, 'shutdown', 'no data')) - with shared.objectProcessorQueueSizeLock: - data = 'no data' - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put(('checkShutdownVariable',data)) + objectProcessorQueue.put(('checkShutdownVariable', 'no data')) knownNodesLock.acquire() UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) @@ -751,12 +746,7 @@ def _checkAndShareMsgWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's enqueue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndShareGetpubkeyWithPeers(data): if len(data) < 42: @@ -798,12 +788,7 @@ def _checkAndShareGetpubkeyWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndSharePubkeyWithPeers(data): if len(data) < 146 or len(data) > 440: # sanity check @@ -847,12 +832,7 @@ def _checkAndSharePubkeyWithPeers(data): # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def _checkAndShareBroadcastWithPeers(data): @@ -896,12 +876,7 @@ def _checkAndShareBroadcastWithPeers(data): broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) # Now let's queue it to be processed ourselves. - # If we already have too much data in the queue to be processed, just sleep for now. - while shared.objectProcessorQueueSize > 120000000: - time.sleep(2) - with shared.objectProcessorQueueSizeLock: - shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def openKeysFile(): if 'linux' in sys.platform: