Mitigate active internal intersection attack
There was a report that by quickly asking a large number of nodes if they have an ACK object (which the attacker knows but it is injected into the network by the recipient of the message), it can estimate how an object propagates through the network, and eventually pinpoint an originating IP address of the injection, i.e. the IP address of the message recipient. This patch mitigates against it by stalling when asked for a nonexisting object (so that the attacker can't spam requests), and also upon connection before sending its own inventory list (so that reconnecting won't help the attacker). It estimates how long a short message takes to propagate through the network based on how many nodes are in a stream and bases the stalling time on that. Currently that is about 15 seconds. Initial connection delay takes into account the time that already passed since the connection was established. This basically gives the attacker one shot per a combination of his own nodes and the nodes he can connect to, and thus makes the attack much more difficult to succeed.
This commit is contained in:
parent
84f2202745
commit
51b043684d
|
@ -12,13 +12,14 @@ import time
|
|||
import threading
|
||||
|
||||
class objectHashHolder(threading.Thread):
|
||||
size = 10
|
||||
def __init__(self, sendDataThreadMailbox):
|
||||
threading.Thread.__init__(self)
|
||||
self.shutdown = False
|
||||
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
|
||||
self.collectionOfHashLists = {}
|
||||
self.collectionOfPeerLists = {}
|
||||
for i in range(10):
|
||||
for i in range(self.size):
|
||||
self.collectionOfHashLists[i] = []
|
||||
self.collectionOfPeerLists[i] = []
|
||||
|
||||
|
@ -32,14 +33,14 @@ class objectHashHolder(threading.Thread):
|
|||
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
|
||||
self.collectionOfPeerLists[iterator] = []
|
||||
iterator += 1
|
||||
iterator %= 10
|
||||
iterator %= self.size
|
||||
time.sleep(1)
|
||||
|
||||
def holdHash(self,hash):
|
||||
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
|
||||
self.collectionOfHashLists[random.randrange(0, self.size)].append(hash)
|
||||
|
||||
def holdPeer(self,peerDetails):
|
||||
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
|
||||
self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails)
|
||||
|
||||
def close(self):
|
||||
self.shutdown = True
|
||||
self.shutdown = True
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
doTimingAttackMitigation = False
|
||||
|
||||
import errno
|
||||
import math
|
||||
import time
|
||||
import threading
|
||||
import shared
|
||||
|
@ -19,6 +20,7 @@ import traceback
|
|||
|
||||
#import highlevelcrypto
|
||||
from addresses import *
|
||||
from class_objectHashHolder import objectHashHolder
|
||||
from helper_generic import addDataPadding, isHostInPrivateIPRange
|
||||
from helper_sql import sqlQuery
|
||||
from debug import logger
|
||||
|
@ -61,6 +63,7 @@ class receiveDataThread(threading.Thread):
|
|||
self.initiatedConnection = True
|
||||
self.selfInitiatedConnections[streamNumber][self] = 0
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||
self.startTime = time.time()
|
||||
|
||||
def run(self):
|
||||
logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
|
||||
|
@ -125,6 +128,19 @@ class receiveDataThread(threading.Thread):
|
|||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
|
||||
|
||||
def antiIntersectionDelay(self, initial = False):
|
||||
# estimated time for a small object to propagate across the whole network
|
||||
delay = math.ceil(math.log(len(shared.knownNodes[self.streamNumber]) + 2, 20)) * (0.2 + objectHashHolder.size/2)
|
||||
# +2 is to avoid problems with log(0) and log(1)
|
||||
# 20 is avg connected nodes count
|
||||
# 0.2 is avg message transmission time
|
||||
now = time.time()
|
||||
if initial and now - delay < self.startTime:
|
||||
logger.info("Sleeping for %.2fs", delay - (now - self.startTime))
|
||||
time.sleep(delay - (now - self.startTime))
|
||||
elif not initial:
|
||||
logger.info("Sleeping for %.2fs", delay)
|
||||
time.sleep(delay)
|
||||
|
||||
def processData(self):
|
||||
if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data.
|
||||
|
@ -318,6 +334,7 @@ class receiveDataThread(threading.Thread):
|
|||
bigInvList[hash] = 0
|
||||
numberOfObjectsInInvMessage = 0
|
||||
payload = ''
|
||||
self.antiIntersectionDelay(True)
|
||||
# Now let us start appending all of these hashes together. They will be
|
||||
# sent out in a big inv message to our new peer.
|
||||
for hash, storedValue in bigInvList.items():
|
||||
|
@ -483,6 +500,7 @@ class receiveDataThread(threading.Thread):
|
|||
payload, = row
|
||||
self.sendObject(payload)
|
||||
else:
|
||||
self.antiIntersectionDelay()
|
||||
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
|
||||
|
||||
# Our peer has requested (in a getdata message) that we send an object.
|
||||
|
|
Loading…
Reference in New Issue
Block a user