From f0bf3aad482b53b752212cabc20514529d1f70ff Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Sat, 7 Sep 2013 18:23:20 -0400 Subject: [PATCH] use locks when accessing dictionary inventory --- src/class_objectHashHolder.py | 2 +- src/class_receiveDataThread.py | 14 +++++++++----- src/class_singleCleaner.py | 32 +++++++++++++++++++------------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index 9c392765..90df7fd9 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -1,5 +1,5 @@ # objectHashHolder is a timer-driven thread. One objectHashHolder thread is used -# by each sendDataThread. It uses it whenever a sendDataThread needs to +# by each sendDataThread. The sendDataThread uses it whenever it needs to # advertise an object to peers. Instead of sending it out immediately, it must # wait a random number of seconds for each connection so that different peers # get different objects at different times. Thus an attacker who is diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index a8b38bc4..e751cef0 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -298,11 +298,12 @@ class receiveDataThread(threading.Thread): bigInvList[hash] = 0 # We also have messages in our inventory in memory (which is a python # dictionary). Let's fetch those too. - for hash, storedValue in shared.inventory.items(): - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - objectType, streamNumber, payload, receivedTime = storedValue - if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: - bigInvList[hash] = 0 + with shared.inventoryLock: + for hash, storedValue in shared.inventory.items(): + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + objectType, streamNumber, payload, receivedTime = storedValue + if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: + bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' # Now let us start appending all of these hashes together. They will be @@ -1496,11 +1497,14 @@ class receiveDataThread(threading.Thread): print 'received getdata request for item:', hash.encode('hex') shared.numberOfInventoryLookupsPerformed += 1 + shared.inventoryLock.acquire() if hash in shared.inventory: objectType, streamNumber, payload, receivedTime = shared.inventory[ hash] + shared.inventoryLock.release() self.sendData(objectType, payload) else: + shared.inventoryLock.release() queryreturn = sqlQuery( '''select objecttype, payload from inventory where hash=?''', hash) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 07d56424..653a2461 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -32,19 +32,20 @@ class singleCleaner(threading.Thread): shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - with SqlBulkExecute() as sql: - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time()) - 3600 > receivedTime: - sql.execute( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash, - objectType, - streamNumber, - payload, - receivedTime, - '') - del shared.inventory[hash] + with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if int(time.time()) - 3600 > receivedTime: + sql.execute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + receivedTime, + '') + del shared.inventory[hash] shared.UISignalQueue.put(('updateStatusBar', '')) shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. @@ -118,4 +119,9 @@ class singleCleaner(threading.Thread): queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) for row in queryData: shared.inventorySets[streamNumber].add(row[0]) + with shared.inventoryLock: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if streamNumber in shared.inventorySets: + shared.inventorySets[streamNumber].add(hash) time.sleep(300)