From 7ebe837eb0bf7b1f6658c07a494117f8c8b09af2 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 26 Feb 2017 17:46:02 +0100 Subject: [PATCH] Make some network parameters configurable - maxtotalconnections = maximum number of total full connections (incoming + outgoing) the node will allow. Default 200 as it was. - maxbootstrapconnections = number of additional (to total) connection that will act in bootstrap mode, closing after sending the list of addresses. Default 20 as it was. - maxaddrperstreamsend = initial address list maximum size, per participating stream. Default 500. Child streams get half. The response is chunked into pieces of max. 1000 addresses as that's the protocol limit. --- src/bmconfigparser.py | 6 ++-- src/class_receiveDataThread.py | 50 ++++++++++++++++++++++++++-------- src/class_singleListener.py | 5 +++- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/bmconfigparser.py b/src/bmconfigparser.py index bc16b05a..412d2fa5 100644 --- a/src/bmconfigparser.py +++ b/src/bmconfigparser.py @@ -31,13 +31,13 @@ class BMConfigParser(ConfigParser.SafeConfigParser): return False return False - def safeGetInt(self, section, field): + def safeGetInt(self, section, field, default=0): if self.has_option(section, field): try: return self.getint(section, field) except ValueError: - return 0 - return 0 + return default + return default def safeGet(self, section, option, default = None): if self.has_option(section, option): diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 3875957e..83941612 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -386,7 +386,8 @@ class receiveDataThread(threading.Thread): stream, 'advertisepeer', dataToSend)) self.sendaddr() # This is one large addr message to this one peer. - if not self.initiatedConnection and len(shared.connectedHostsList) > 200: + if not self.initiatedConnection and len(shared.connectedHostsList) > \ + BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200): logger.info ('We are connected to too many people. Closing connection.') self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later."))) self.sendDataThreadQueue.put((0, 'shutdown','no data')) @@ -648,9 +649,25 @@ class receiveDataThread(threading.Thread): # peer (with the full exchange of version and verack # messages). def sendaddr(self): + def sendChunk(): + if numberOfAddressesInAddrMessage == 0: + return + self.sendDataThreadQueue.put((0, 'sendRawData', \ + protocol.CreatePacket('addr', \ + encodeVarint(numberOfAddressesInAddrMessage) + payload))) + # We are going to share a maximum number of 1000 addrs (per overlapping # stream) with our peer. 500 from overlapping streams, 250 from the # left child stream, and 250 from the right child stream. + maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500) + + # protocol defines this as a maximum in one chunk + protocolAddrLimit = 1000 + + # init + numberOfAddressesInAddrMessage = 0 + payload = '' + for stream in self.streamNumber: addrsInMyStream = {} addrsInChildStreamLeft = {} @@ -661,8 +678,8 @@ class receiveDataThread(threading.Thread): filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) - if elemCount > 500: - elemCount = 500 + if elemCount > maxAddrCount: + elemCount = maxAddrCount # only if more recent than 3 hours addrsInMyStream = random.sample(filtered.items(), elemCount) # sent 250 only if the remote isn't interested in it @@ -670,18 +687,16 @@ class receiveDataThread(threading.Thread): filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) - if elemCount > 250: - elemCount = 250 + if elemCount > maxAddrCount / 2: + elemCount = int(maxAddrCount / 2) addrsInChildStreamLeft = random.sample(filtered.items(), elemCount) if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streamNumber: filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) - if elemCount > 250: - elemCount = 250 + if elemCount > maxAddrCount / 2: + elemCount = int(maxAddrCount / 2) addrsInChildStreamRight = random.sample(filtered.items(), elemCount) - numberOfAddressesInAddrMessage = 0 - payload = '' for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream: numberOfAddressesInAddrMessage += 1 payload += pack( @@ -691,6 +706,10 @@ class receiveDataThread(threading.Thread): '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) payload += pack('>H', PORT) # remote port + if numberOfAddressesInAddrMessage >= protocolAddrLimit: + sendChunk() + payload = '' + numberOfAddressesInAddrMessage = 0 for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft: numberOfAddressesInAddrMessage += 1 payload += pack( @@ -700,6 +719,10 @@ class receiveDataThread(threading.Thread): '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) payload += pack('>H', PORT) # remote port + if numberOfAddressesInAddrMessage >= protocolAddrLimit: + sendChunk() + payload = '' + numberOfAddressesInAddrMessage = 0 for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight: numberOfAddressesInAddrMessage += 1 payload += pack( @@ -709,10 +732,13 @@ class receiveDataThread(threading.Thread): '>q', 1) # service bit flags offered by this node payload += protocol.encodeHost(HOST) payload += pack('>H', PORT) # remote port + if numberOfAddressesInAddrMessage >= protocolAddrLimit: + sendChunk() + payload = '' + numberOfAddressesInAddrMessage = 0 - payload = encodeVarint(numberOfAddressesInAddrMessage) + payload - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('addr', payload))) - + # flush + sendChunk() # We have received a version message def recversion(self, data): diff --git a/src/class_singleListener.py b/src/class_singleListener.py index fb736698..322f4b67 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -107,7 +107,10 @@ class singleListener(threading.Thread, StoppableThread): # connections. while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and state.shutdown == 0: self.stop.wait(10) - while len(shared.connectedHostsList) > 220 and state.shutdown == 0: + while len(shared.connectedHostsList) > \ + BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200) + \ + BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections", 20) \ + and state.shutdown == 0: logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.') self.stop.wait(10)