use locks when accessing dictionary inventory
This commit is contained in:
parent
831edf0d24
commit
f0bf3aad48
|
@ -1,5 +1,5 @@
|
||||||
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
||||||
# by each sendDataThread. It uses it whenever a sendDataThread needs to
|
# by each sendDataThread. The sendDataThread uses it whenever it needs to
|
||||||
# advertise an object to peers. Instead of sending it out immediately, it must
|
# advertise an object to peers. Instead of sending it out immediately, it must
|
||||||
# wait a random number of seconds for each connection so that different peers
|
# wait a random number of seconds for each connection so that different peers
|
||||||
# get different objects at different times. Thus an attacker who is
|
# get different objects at different times. Thus an attacker who is
|
||||||
|
|
|
@ -298,11 +298,12 @@ class receiveDataThread(threading.Thread):
|
||||||
bigInvList[hash] = 0
|
bigInvList[hash] = 0
|
||||||
# We also have messages in our inventory in memory (which is a python
|
# We also have messages in our inventory in memory (which is a python
|
||||||
# dictionary). Let's fetch those too.
|
# dictionary). Let's fetch those too.
|
||||||
for hash, storedValue in shared.inventory.items():
|
with shared.inventoryLock:
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
for hash, storedValue in shared.inventory.items():
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
bigInvList[hash] = 0
|
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
|
||||||
|
bigInvList[hash] = 0
|
||||||
numberOfObjectsInInvMessage = 0
|
numberOfObjectsInInvMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# Now let us start appending all of these hashes together. They will be
|
# Now let us start appending all of these hashes together. They will be
|
||||||
|
@ -1496,11 +1497,14 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'received getdata request for item:', hash.encode('hex')
|
print 'received getdata request for item:', hash.encode('hex')
|
||||||
|
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
|
shared.inventoryLock.acquire()
|
||||||
if hash in shared.inventory:
|
if hash in shared.inventory:
|
||||||
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
||||||
hash]
|
hash]
|
||||||
|
shared.inventoryLock.release()
|
||||||
self.sendData(objectType, payload)
|
self.sendData(objectType, payload)
|
||||||
else:
|
else:
|
||||||
|
shared.inventoryLock.release()
|
||||||
queryreturn = sqlQuery(
|
queryreturn = sqlQuery(
|
||||||
'''select objecttype, payload from inventory where hash=?''',
|
'''select objecttype, payload from inventory where hash=?''',
|
||||||
hash)
|
hash)
|
||||||
|
|
|
@ -32,19 +32,20 @@ class singleCleaner(threading.Thread):
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
||||||
|
|
||||||
with SqlBulkExecute() as sql:
|
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
|
||||||
for hash, storedValue in shared.inventory.items():
|
with SqlBulkExecute() as sql:
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
for hash, storedValue in shared.inventory.items():
|
||||||
if int(time.time()) - 3600 > receivedTime:
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
sql.execute(
|
if int(time.time()) - 3600 > receivedTime:
|
||||||
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
sql.execute(
|
||||||
hash,
|
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
||||||
objectType,
|
hash,
|
||||||
streamNumber,
|
objectType,
|
||||||
payload,
|
streamNumber,
|
||||||
receivedTime,
|
payload,
|
||||||
'')
|
receivedTime,
|
||||||
del shared.inventory[hash]
|
'')
|
||||||
|
del shared.inventory[hash]
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
|
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
|
||||||
|
@ -118,4 +119,9 @@ class singleCleaner(threading.Thread):
|
||||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
||||||
for row in queryData:
|
for row in queryData:
|
||||||
shared.inventorySets[streamNumber].add(row[0])
|
shared.inventorySets[streamNumber].add(row[0])
|
||||||
|
with shared.inventoryLock:
|
||||||
|
for hash, storedValue in shared.inventory.items():
|
||||||
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
|
if streamNumber in shared.inventorySets:
|
||||||
|
shared.inventorySets[streamNumber].add(hash)
|
||||||
time.sleep(300)
|
time.sleep(300)
|
||||||
|
|
Reference in New Issue
Block a user