Trustedpeer fix and more refactoring

- fixed trustedPeer (thanks to anonymous bug reporter)
- moved trustedPeer and Peer into state.py
This commit is contained in:
Peter Šurda 2017-01-12 06:58:35 +01:00
parent e7470a4757
commit bd520a340f
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
9 changed files with 50 additions and 45 deletions

View File

@ -32,9 +32,9 @@ class outgoingSynSender(threading.Thread, StoppableThread):
# If the user has specified a trusted peer then we'll only # If the user has specified a trusted peer then we'll only
# ever connect to that. Otherwise we'll pick a random one from # ever connect to that. Otherwise we'll pick a random one from
# the known nodes # the known nodes
if shared.trustedPeer: if state.trustedPeer:
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
peer = shared.trustedPeer peer = state.trustedPeer
shared.knownNodes[self.streamNumber][peer] = time.time() shared.knownNodes[self.streamNumber][peer] = time.time()
shared.knownNodesLock.release() shared.knownNodesLock.release()
else: else:
@ -65,7 +65,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
try: try:
return peer return peer
except NameError: except NameError:
return shared.Peer('127.0.0.1', 8444) return state.Peer('127.0.0.1', 8444)
def stopThread(self): def stopThread(self):
super(outgoingSynSender, self).stopThread() super(outgoingSynSender, self).stopThread()
@ -79,7 +79,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
self.stop.wait(2) self.stop.wait(2)
while BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections') and not self._stopped: while BMConfigParser().safeGetBoolean('bitmessagesettings', 'sendoutgoingconnections') and not self._stopped:
self.name = "outgoingSynSender" self.name = "outgoingSynSender"
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8 maximumConnections = 1 if state.trustedPeer else 8 # maximum number of outgoing connections = 8
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections: while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
self.stop.wait(10) self.stop.wait(10)
if shared.shutdown: if shared.shutdown:

View File

@ -58,7 +58,7 @@ class receiveDataThread(threading.Thread):
objectHashHolderInstance): objectHashHolderInstance):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, port) self.peer = state.Peer(HOST, port)
self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.objectsThatWeHaveYetToGetFromThisPeer = {} self.objectsThatWeHaveYetToGetFromThisPeer = {}
@ -380,7 +380,7 @@ class receiveDataThread(threading.Thread):
# We don't need to do the timing attack mitigation if we are # We don't need to do the timing attack mitigation if we are
# only connected to the trusted peer because we can trust the # only connected to the trusted peer because we can trust the
# peer not to attack # peer not to attack
if sleepTime > 0 and doTimingAttackMitigation and shared.trustedPeer == None: if sleepTime > 0 and doTimingAttackMitigation and state.trustedPeer == None:
logger.debug('Timing attack mitigation: Sleeping for ' + str(sleepTime) + ' seconds.') logger.debug('Timing attack mitigation: Sleeping for ' + str(sleepTime) + ' seconds.')
time.sleep(sleepTime) time.sleep(sleepTime)
@ -450,7 +450,7 @@ class receiveDataThread(threading.Thread):
logger.info('inv message doesn\'t contain enough data. Ignoring.') logger.info('inv message doesn\'t contain enough data. Ignoring.')
return return
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object. if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.') logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.')
return return
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
@ -470,7 +470,7 @@ class receiveDataThread(threading.Thread):
objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber) objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe: for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.') logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.')
break break
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
@ -593,7 +593,7 @@ class receiveDataThread(threading.Thread):
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
with shared.knownNodesLock: with shared.knownNodesLock:
shared.knownNodes[recaddrStream] = {} shared.knownNodes[recaddrStream] = {}
peerFromAddrMessage = shared.Peer(hostStandardFormat, recaddrPort) peerFromAddrMessage = state.Peer(hostStandardFormat, recaddrPort)
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
with shared.knownNodesLock: with shared.knownNodesLock:
@ -637,7 +637,7 @@ class receiveDataThread(threading.Thread):
# if current connection is over a proxy, sent our own onion address at a random position # if current connection is over a proxy, sent our own onion address at a random position
if ownPosition == i and ".onion" in BMConfigParser().get("bitmessagesettings", "onionhostname") and \ if ownPosition == i and ".onion" in BMConfigParser().get("bitmessagesettings", "onionhostname") and \
hasattr(self.sock, "getproxytype") and self.sock.getproxytype() != "none" and not sentOwn: hasattr(self.sock, "getproxytype") and self.sock.getproxytype() != "none" and not sentOwn:
peer = shared.Peer(BMConfigParser().get("bitmessagesettings", "onionhostname"), BMConfigParser().getint("bitmessagesettings", "onionport")) peer = state.Peer(BMConfigParser().get("bitmessagesettings", "onionhostname"), BMConfigParser().getint("bitmessagesettings", "onionport"))
else: else:
# still may contain own onion address, but we don't change it # still may contain own onion address, but we don't change it
peer, = random.sample(shared.knownNodes[self.streamNumber], 1) peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
@ -802,9 +802,9 @@ class receiveDataThread(threading.Thread):
if not isHostInPrivateIPRange(self.peer.host): if not isHostInPrivateIPRange(self.peer.host):
with shared.knownNodesLock: with shared.knownNodesLock:
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time()) shared.knownNodes[self.streamNumber][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
if not self.initiatedConnection: if not self.initiatedConnection:
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 162000 # penalise inbound, 2 days minus 3 hours shared.knownNodes[self.streamNumber][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 162000 # penalise inbound, 2 days minus 3 hours
shared.needToWriteKnownNodesToDisk = True shared.needToWriteKnownNodesToDisk = True
self.sendverack() self.sendverack()

View File

@ -38,7 +38,7 @@ class sendDataThread(threading.Thread):
streamNumber, streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware): someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, PORT) self.peer = state.Peer(HOST, PORT)
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.services = 0 self.services = 0

View File

@ -10,6 +10,8 @@ import protocol
import errno import errno
import re import re
import state
# Only one singleListener thread will ever exist. It creates the # Only one singleListener thread will ever exist. It creates the
# receiveDataThread and sendDataThread for each incoming connection. Note # receiveDataThread and sendDataThread for each incoming connection. Note
# that it cannot set the stream number because it is not known yet- the # that it cannot set the stream number because it is not known yet- the
@ -60,7 +62,7 @@ class singleListener(threading.Thread, StoppableThread):
def run(self): def run(self):
# If there is a trusted peer then we don't want to accept # If there is a trusted peer then we don't want to accept
# incoming connections so we'll just abandon the thread # incoming connections so we'll just abandon the thread
if shared.trustedPeer: if state.trustedPeer:
return return
while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0: while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0:

View File

@ -6,21 +6,22 @@ import random
import sys import sys
from time import strftime, localtime from time import strftime, localtime
import shared import shared
import state
def createDefaultKnownNodes(appdata): def createDefaultKnownNodes(appdata):
############## Stream 1 ################ ############## Stream 1 ################
stream1 = {} stream1 = {}
#stream1[shared.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time()) #stream1[state.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time())
stream1[shared.Peer('5.45.99.75', 8444)] = int(time.time()) stream1[state.Peer('5.45.99.75', 8444)] = int(time.time())
stream1[shared.Peer('75.167.159.54', 8444)] = int(time.time()) stream1[state.Peer('75.167.159.54', 8444)] = int(time.time())
stream1[shared.Peer('95.165.168.168', 8444)] = int(time.time()) stream1[state.Peer('95.165.168.168', 8444)] = int(time.time())
stream1[shared.Peer('85.180.139.241', 8444)] = int(time.time()) stream1[state.Peer('85.180.139.241', 8444)] = int(time.time())
stream1[shared.Peer('158.222.211.81', 8080)] = int(time.time()) stream1[state.Peer('158.222.211.81', 8080)] = int(time.time())
stream1[shared.Peer('178.62.12.187', 8448)] = int(time.time()) stream1[state.Peer('178.62.12.187', 8448)] = int(time.time())
stream1[shared.Peer('24.188.198.204', 8111)] = int(time.time()) stream1[state.Peer('24.188.198.204', 8111)] = int(time.time())
stream1[shared.Peer('109.147.204.113', 1195)] = int(time.time()) stream1[state.Peer('109.147.204.113', 1195)] = int(time.time())
stream1[shared.Peer('178.11.46.221', 8444)] = int(time.time()) stream1[state.Peer('178.11.46.221', 8444)] = int(time.time())
############# Stream 2 ################# ############# Stream 2 #################
stream2 = {} stream2 = {}

View File

@ -24,7 +24,7 @@ def knownNodes():
for node_tuple in nodes.items(): for node_tuple in nodes.items():
try: try:
host, (port, lastseen) = node_tuple host, (port, lastseen) = node_tuple
peer = shared.Peer(host, port) peer = state.Peer(host, port)
except: except:
peer, lastseen = node_tuple peer, lastseen = node_tuple
shared.knownNodes[stream][peer] = lastseen shared.knownNodes[stream][peer] = lastseen
@ -32,7 +32,7 @@ def knownNodes():
shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(state.appdata) shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(state.appdata)
# your own onion address, if setup # your own onion address, if setup
if BMConfigParser().has_option('bitmessagesettings', 'onionhostname') and ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname'): if BMConfigParser().has_option('bitmessagesettings', 'onionhostname') and ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname'):
shared.knownNodes[1][shared.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport'))] = int(time.time()) shared.knownNodes[1][state.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport'))] = int(time.time())
if BMConfigParser().getint('bitmessagesettings', 'settingsversion') > 10: if BMConfigParser().getint('bitmessagesettings', 'settingsversion') > 10:
logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.') logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.')
raise SystemExit raise SystemExit
@ -47,17 +47,17 @@ def dns():
try: try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
shared.knownNodes[1][shared.Peer(item[4][0], 8080)] = int(time.time()) shared.knownNodes[1][state.Peer(item[4][0], 8080)] = int(time.time())
except: except:
logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.') logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.')
try: try:
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
shared.knownNodes[1][shared.Peer(item[4][0], 8444)] = int(time.time()) shared.knownNodes[1][state.Peer(item[4][0], 8444)] = int(time.time())
except: except:
logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.') logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.')
elif BMConfigParser().get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': elif BMConfigParser().get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
shared.knownNodes[1][shared.Peer('quzwelsuziwqgpt2.onion', 8444)] = int(time.time()) shared.knownNodes[1][state.Peer('quzwelsuziwqgpt2.onion', 8444)] = int(time.time())
logger.debug("Adding quzwelsuziwqgpt2.onion:8444 to knownNodes.") logger.debug("Adding quzwelsuziwqgpt2.onion:8444 to knownNodes.")
for port in [8080, 8444]: for port in [8080, 8444]:
logger.debug("Resolving %i through SOCKS...", port) logger.debug("Resolving %i through SOCKS...", port)
@ -90,7 +90,7 @@ def dns():
else: else:
if ip is not None: if ip is not None:
logger.info ('Adding ' + ip + ' to knownNodes based on SOCKS DNS bootstrap method') logger.info ('Adding ' + ip + ' to knownNodes based on SOCKS DNS bootstrap method')
shared.knownNodes[1][shared.Peer(ip, port)] = time.time() shared.knownNodes[1][state.Peer(ip, port)] = time.time()
else: else:
logger.info('DNS bootstrap skipped because the proxy type does not support DNS resolution.') logger.info('DNS bootstrap skipped because the proxy type does not support DNS resolution.')

View File

@ -25,7 +25,7 @@ def _loadTrustedPeer():
return return
host, port = trustedPeer.split(':') host, port = trustedPeer.split(':')
shared.trustedPeer = shared.Peer(host, int(port)) state.trustedPeer = state.Peer(host, int(port))
def loadConfig(): def loadConfig():
if state.appdata: if state.appdata:

View File

@ -93,18 +93,6 @@ ridiculousDifficulty = 20000000
# namecoin integration to "namecoind". # namecoin integration to "namecoind".
namecoinDefaultRpcPort = "8336" namecoinDefaultRpcPort = "8336"
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to
# this peer and the timing attack mitigation will be disabled in order
# to download data faster. The expected use case is where the user has
# a fast connection to a trusted server where they run a BitMessage
# daemon permanently. If they then run a second instance of the client
# on a local machine periodically when they want to check for messages
# it will sync with the network a lot faster without compromising
# security.
trustedPeer = None
def isAddressInMyAddressBook(address): def isAddressInMyAddressBook(address):
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''select address from addressbook where address=?''', '''select address from addressbook where address=?''',
@ -442,8 +430,6 @@ def decryptAndCheckPubkeyPayload(data, address):
logger.critical('Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s' % traceback.format_exc()) logger.critical('Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s' % traceback.format_exc())
return 'failed' return 'failed'
Peer = collections.namedtuple('Peer', ['host', 'port'])
def checkAndShareObjectWithPeers(data): def checkAndShareObjectWithPeers(data):
""" """
This function is called after either receiving an object off of the wire This function is called after either receiving an object off of the wire

View File

@ -1,3 +1,5 @@
import collections
neededPubkeys = {} neededPubkeys = {}
streamsInWhichIAmParticipating = {} streamsInWhichIAmParticipating = {}
sendDataQueues = [] #each sendData thread puts its queue in this list. sendDataQueues = [] #each sendData thread puts its queue in this list.
@ -12,3 +14,17 @@ socksIP = None
networkProtocolLastFailed = {'IPv4': 0, 'IPv6': 0, 'onion': 0} networkProtocolLastFailed = {'IPv4': 0, 'IPv6': 0, 'onion': 0}
appdata = '' #holds the location of the application data storage directory appdata = '' #holds the location of the application data storage directory
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to
# this peer and the timing attack mitigation will be disabled in order
# to download data faster. The expected use case is where the user has
# a fast connection to a trusted server where they run a BitMessage
# daemon permanently. If they then run a second instance of the client
# on a local machine periodically when they want to check for messages
# it will sync with the network a lot faster without compromising
# security.
trustedPeer = None
Peer = collections.namedtuple('Peer', ['host', 'port'])