Bootstrap provider mode and minor knownNodes changes

- if knownNodes grows to 20000, instead of ignoring new nodes, forget
  the 1000 oldest ones
- drop connection after sendaddr if too many connections, even if it's
  an outbound one
- if maximum total connections are lower than maximum outbound
  connections, active bootstrap provider mode
- in this mode, check all addresses received before announcing them
- so basically it only annouces those addresses it successfully
  connected to
This commit is contained in:
Peter Šurda 2017-02-27 23:31:12 +01:00
parent 5d068ec84a
commit 339e375958
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 48 additions and 14 deletions

View File

@ -386,9 +386,12 @@ class receiveDataThread(threading.Thread):
stream, 'advertisepeer', dataToSend)) stream, 'advertisepeer', dataToSend))
self.sendaddr() # This is one large addr message to this one peer. self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > \ if len(shared.connectedHostsList) > \
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200): BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200):
logger.info ('We are connected to too many people. Closing connection.') logger.info ('We are connected to too many people. Closing connection.')
if self.initiatedConnection:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Thank you for providing a listening node.")))
else:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later."))) self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later.")))
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
return return
@ -622,18 +625,28 @@ class receiveDataThread(threading.Thread):
knownnodes.knownNodes[recaddrStream] = {} knownnodes.knownNodes[recaddrStream] = {}
peerFromAddrMessage = state.Peer(hostStandardFormat, recaddrPort) peerFromAddrMessage = state.Peer(hostStandardFormat, recaddrPort)
if peerFromAddrMessage not in knownnodes.knownNodes[recaddrStream]: if peerFromAddrMessage not in knownnodes.knownNodes[recaddrStream]:
if len(knownnodes.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. knownnodes.trimKnownNodes(recAddrStream)
# only if recent
if timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800):
logger.debug('added new node ' + str(peerFromAddrMessage) + ' to knownNodes in stream ' + str(recaddrStream))
# bootstrap provider?
if BMConfigParser().safeGetInt('bitmessagesettings', 'maxoutboundconnections') >= \
BMConfigParser().safeGetInt('bitmessagesettings', 'maxtotalconnections', 200):
with knownnodes.knownNodesLock:
knownnodes.knownNodes[recaddrStream][peerFromAddrMessage] = int(time.time()) - 10800
# normal mode
else:
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
knownnodes.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode knownnodes.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
logger.debug('added new node ' + str(peerFromAddrMessage) + ' to knownNodes in stream ' + str(recaddrStream))
shared.needToWriteKnownNodesToDisk = True
hostDetails = ( hostDetails = (
timeSomeoneElseReceivedMessageFromThisNode, timeSomeoneElseReceivedMessageFromThisNode,
recaddrStream, recaddrServices, hostStandardFormat, recaddrPort) recaddrStream, recaddrServices, hostStandardFormat, recaddrPort)
protocol.broadcastToSendDataQueues(( protocol.broadcastToSendDataQueues((
recaddrStream, 'advertisepeer', hostDetails)) recaddrStream, 'advertisepeer', hostDetails))
else: shared.needToWriteKnownNodesToDisk = True
# only update if normal mode
elif BMConfigParser().safeGetInt('bitmessagesettings', 'maxoutboundconnections') < \
BMConfigParser().safeGetInt('bitmessagesettings', 'maxtotalconnections', 200):
timeLastReceivedMessageFromThisNode = knownnodes.knownNodes[recaddrStream][ timeLastReceivedMessageFromThisNode = knownnodes.knownNodes[recaddrStream][
peerFromAddrMessage] peerFromAddrMessage]
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())+900): # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right. if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())+900): # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right.
@ -822,6 +835,11 @@ class receiveDataThread(threading.Thread):
for stream in self.remoteStreams: for stream in self.remoteStreams:
knownnodes.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time()) knownnodes.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
if not self.initiatedConnection: if not self.initiatedConnection:
# bootstrap provider?
if BMConfigParser().safeGetInt('bitmessagesettings', 'maxoutboundconnections') >= \
BMConfigParser().safeGetInt('bitmessagesettings', 'maxtotalconnections', 200):
knownnodes.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 10800 # penalise inbound, 3 hours
else:
knownnodes.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 7200 # penalise inbound, 2 hours knownnodes.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 7200 # penalise inbound, 2 hours
shared.needToWriteKnownNodesToDisk = True shared.needToWriteKnownNodesToDisk = True

View File

@ -139,7 +139,12 @@ class singleListener(threading.Thread, StoppableThread):
# share the same external IP. This is here to prevent # share the same external IP. This is here to prevent
# connection flooding. # connection flooding.
# permit repeated connections from Tor # permit repeated connections from Tor
if HOST in shared.connectedHostsList and (".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') or not protocol.checkSocksIP(HOST)): if HOST in shared.connectedHostsList and \
(".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') or not protocol.checkSocksIP(HOST)):
# bootstrap provider? Then accept, we'll most likely drop it a little bit later
if BMConfigParser().safeGetInt('bitmessagesettings', 'maxoutboundconnections') >= \
BMConfigParser().safeGetInt('bitmessagesettings', 'maxtotalconnections', 200):
break
socketObject.close() socketObject.close()
logger.info('We are already connected to ' + str(HOST) + '. Ignoring connection.') logger.info('We are already connected to ' + str(HOST) + '. Ignoring connection.')
else: else:

View File

@ -6,9 +6,20 @@ import state
knownNodesLock = threading.Lock() knownNodesLock = threading.Lock()
knownNodes = {} knownNodes = {}
knownNodesMax = 20000
knownNodesTrimAmount = 2000
def saveKnownNodes(dirName = None): def saveKnownNodes(dirName = None):
if dirName is None: if dirName is None:
dirName = state.appdata dirName = state.appdata
with knownNodesLock: with knownNodesLock:
with open(dirName + 'knownnodes.dat', 'wb') as output: with open(dirName + 'knownnodes.dat', 'wb') as output:
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
def trimKnownNodes(recAddrStream = 1):
if len(knownNodes[recAddrStream]) < knownNodesMax:
return
with knownNodesLock:
oldestList = sorted(knownNodes[recAddrStream], key=knownNodes[recAddrStream].get)[:knownNodeTrimAmount]
for oldest in oldestList:
del knownNodes[recAddrStream][oldest]