Mitigate active internal intersection attack

There was a report that by quickly asking a large number of nodes if
they have an ACK object (which the attacker knows but it is injected
into the network by the recipient of the message), it can estimate how
an object propagates through the network, and eventually pinpoint an
originating IP address of the injection, i.e. the IP address of the
message recipient.

This patch mitigates against it by stalling when asked for a nonexisting
object (so that the attacker can't spam requests), and also upon
connection before sending its own inventory list (so that reconnecting
won't help the attacker). It estimates how long a short message takes to
propagate through the network based on how many nodes are in a stream
and bases the stalling time on that. Currently that is about 15 seconds.
Initial connection delay takes into account the time that already passed
since the connection was established.

This basically gives the attacker one shot per a combination of his own
nodes and the nodes he can connect to, and thus makes the attack much
more difficult to succeed.
This commit is contained in:
Peter Šurda 2016-02-13 12:54:23 +01:00
parent f43e01ed0e
commit 8f5d305242
2 changed files with 24 additions and 5 deletions

View File

@ -12,13 +12,14 @@ import time
import threading
class objectHashHolder(threading.Thread):
size = 10
def __init__(self, sendDataThreadMailbox):
threading.Thread.__init__(self)
self.shutdown = False
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
self.collectionOfHashLists = {}
self.collectionOfPeerLists = {}
for i in range(10):
for i in range(self.size):
self.collectionOfHashLists[i] = []
self.collectionOfPeerLists[i] = []
@ -32,14 +33,14 @@ class objectHashHolder(threading.Thread):
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
self.collectionOfPeerLists[iterator] = []
iterator += 1
iterator %= 10
iterator %= self.size
time.sleep(1)
def holdHash(self,hash):
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
self.collectionOfHashLists[random.randrange(0, self.size)].append(hash)
def holdPeer(self,peerDetails):
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails)
def close(self):
self.shutdown = True
self.shutdown = True

View File

@ -1,6 +1,7 @@
doTimingAttackMitigation = False
import errno
import math
import time
import threading
import shared
@ -19,6 +20,7 @@ import traceback
#import highlevelcrypto
from addresses import *
from class_objectHashHolder import objectHashHolder
from helper_generic import addDataPadding, isHostInPrivateIPRange
from helper_sql import sqlQuery
from debug import logger
@ -61,6 +63,7 @@ class receiveDataThread(threading.Thread):
self.initiatedConnection = True
self.selfInitiatedConnections[streamNumber][self] = 0
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
self.startTime = time.time()
def run(self):
logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
@ -125,6 +128,19 @@ class receiveDataThread(threading.Thread):
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
def antiIntersectionDelay(self, initial = False):
# estimated time for a small object to propagate across the whole network
delay = math.ceil(math.log(len(shared.knownNodes[self.streamNumber]) + 2, 20)) * (0.2 + objectHashHolder.size/2)
# +2 is to avoid problems with log(0) and log(1)
# 20 is avg connected nodes count
# 0.2 is avg message transmission time
now = time.time()
if initial and now - delay < self.startTime:
logger.info("Sleeping for %.2fs", delay - (now - self.startTime))
time.sleep(delay - (now - self.startTime))
elif not initial:
logger.info("Sleeping for %.2fs", delay)
time.sleep(delay)
def processData(self):
if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data.
@ -318,6 +334,7 @@ class receiveDataThread(threading.Thread):
bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0
payload = ''
self.antiIntersectionDelay(True)
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash, storedValue in bigInvList.items():
@ -483,6 +500,7 @@ class receiveDataThread(threading.Thread):
payload, = row
self.sendObject(payload)
else:
self.antiIntersectionDelay()
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))
# Our peer has requested (in a getdata message) that we send an object.