From 51b043684d8e81904c3786dc38045b74a6fca12b Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 13 Feb 2016 12:54:23 +0100 Subject: [PATCH] Mitigate active internal intersection attack There was a report that by quickly asking a large number of nodes if they have an ACK object (which the attacker knows but it is injected into the network by the recipient of the message), it can estimate how an object propagates through the network, and eventually pinpoint an originating IP address of the injection, i.e. the IP address of the message recipient. This patch mitigates against it by stalling when asked for a nonexisting object (so that the attacker can't spam requests), and also upon connection before sending its own inventory list (so that reconnecting won't help the attacker). It estimates how long a short message takes to propagate through the network based on how many nodes are in a stream and bases the stalling time on that. Currently that is about 15 seconds. Initial connection delay takes into account the time that already passed since the connection was established. This basically gives the attacker one shot per a combination of his own nodes and the nodes he can connect to, and thus makes the attack much more difficult to succeed. --- src/class_objectHashHolder.py | 11 ++++++----- src/class_receiveDataThread.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index c91b1c23..92faaf17 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -12,13 +12,14 @@ import time import threading class objectHashHolder(threading.Thread): + size = 10 def __init__(self, sendDataThreadMailbox): threading.Thread.__init__(self) self.shutdown = False self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread. self.collectionOfHashLists = {} self.collectionOfPeerLists = {} - for i in range(10): + for i in range(self.size): self.collectionOfHashLists[i] = [] self.collectionOfPeerLists[i] = [] @@ -32,14 +33,14 @@ class objectHashHolder(threading.Thread): self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator])) self.collectionOfPeerLists[iterator] = [] iterator += 1 - iterator %= 10 + iterator %= self.size time.sleep(1) def holdHash(self,hash): - self.collectionOfHashLists[random.randrange(0, 10)].append(hash) + self.collectionOfHashLists[random.randrange(0, self.size)].append(hash) def holdPeer(self,peerDetails): - self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails) + self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails) def close(self): - self.shutdown = True \ No newline at end of file + self.shutdown = True diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index d7693625..e7574bc5 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -1,6 +1,7 @@ doTimingAttackMitigation = False import errno +import math import time import threading import shared @@ -19,6 +20,7 @@ import traceback #import highlevelcrypto from addresses import * +from class_objectHashHolder import objectHashHolder from helper_generic import addDataPadding, isHostInPrivateIPRange from helper_sql import sqlQuery from debug import logger @@ -61,6 +63,7 @@ class receiveDataThread(threading.Thread): self.initiatedConnection = True self.selfInitiatedConnections[streamNumber][self] = 0 self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware + self.startTime = time.time() def run(self): logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) @@ -125,6 +128,19 @@ class receiveDataThread(threading.Thread): shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) + def antiIntersectionDelay(self, initial = False): + # estimated time for a small object to propagate across the whole network + delay = math.ceil(math.log(len(shared.knownNodes[self.streamNumber]) + 2, 20)) * (0.2 + objectHashHolder.size/2) + # +2 is to avoid problems with log(0) and log(1) + # 20 is avg connected nodes count + # 0.2 is avg message transmission time + now = time.time() + if initial and now - delay < self.startTime: + logger.info("Sleeping for %.2fs", delay - (now - self.startTime)) + time.sleep(delay - (now - self.startTime)) + elif not initial: + logger.info("Sleeping for %.2fs", delay) + time.sleep(delay) def processData(self): if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data. @@ -318,6 +334,7 @@ class receiveDataThread(threading.Thread): bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' + self.antiIntersectionDelay(True) # Now let us start appending all of these hashes together. They will be # sent out in a big inv message to our new peer. for hash, storedValue in bigInvList.items(): @@ -483,6 +500,7 @@ class receiveDataThread(threading.Thread): payload, = row self.sendObject(payload) else: + self.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) # Our peer has requested (in a getdata message) that we send an object.