Make some network parameters configurable

- maxtotalconnections = maximum number of total full connections
  (incoming + outgoing) the node will allow. Default 200 as it was.
- maxbootstrapconnections = number of additional (to total) connection
  that will act in bootstrap mode, closing after sending the list of
  addresses. Default 20 as it was.
- maxaddrperstreamsend = initial address list maximum size, per
  participating stream. Default 500. Child streams get half. The
  response is chunked into pieces of max. 1000 addresses as that's the
  protocol limit.
This commit is contained in:
Peter Šurda 2017-02-26 17:46:02 +01:00
parent 0fa0599cd4
commit 7ebe837eb0
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 45 additions and 16 deletions

View File

@ -31,13 +31,13 @@ class BMConfigParser(ConfigParser.SafeConfigParser):
return False return False
return False return False
def safeGetInt(self, section, field): def safeGetInt(self, section, field, default=0):
if self.has_option(section, field): if self.has_option(section, field):
try: try:
return self.getint(section, field) return self.getint(section, field)
except ValueError: except ValueError:
return 0 return default
return 0 return default
def safeGet(self, section, option, default = None): def safeGet(self, section, option, default = None):
if self.has_option(section, option): if self.has_option(section, option):

View File

@ -386,7 +386,8 @@ class receiveDataThread(threading.Thread):
stream, 'advertisepeer', dataToSend)) stream, 'advertisepeer', dataToSend))
self.sendaddr() # This is one large addr message to this one peer. self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > 200: if not self.initiatedConnection and len(shared.connectedHostsList) > \
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200):
logger.info ('We are connected to too many people. Closing connection.') logger.info ('We are connected to too many people. Closing connection.')
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later."))) self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later.")))
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
@ -648,9 +649,25 @@ class receiveDataThread(threading.Thread):
# peer (with the full exchange of version and verack # peer (with the full exchange of version and verack
# messages). # messages).
def sendaddr(self): def sendaddr(self):
def sendChunk():
if numberOfAddressesInAddrMessage == 0:
return
self.sendDataThreadQueue.put((0, 'sendRawData', \
protocol.CreatePacket('addr', \
encodeVarint(numberOfAddressesInAddrMessage) + payload)))
# We are going to share a maximum number of 1000 addrs (per overlapping # We are going to share a maximum number of 1000 addrs (per overlapping
# stream) with our peer. 500 from overlapping streams, 250 from the # stream) with our peer. 500 from overlapping streams, 250 from the
# left child stream, and 250 from the right child stream. # left child stream, and 250 from the right child stream.
maxAddrCount = BMConfigParser().safeGetInt("bitmessagesettings", "maxaddrperstreamsend", 500)
# protocol defines this as a maximum in one chunk
protocolAddrLimit = 1000
# init
numberOfAddressesInAddrMessage = 0
payload = ''
for stream in self.streamNumber: for stream in self.streamNumber:
addrsInMyStream = {} addrsInMyStream = {}
addrsInChildStreamLeft = {} addrsInChildStreamLeft = {}
@ -661,8 +678,8 @@ class receiveDataThread(threading.Thread):
filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > 500: if elemCount > maxAddrCount:
elemCount = 500 elemCount = maxAddrCount
# only if more recent than 3 hours # only if more recent than 3 hours
addrsInMyStream = random.sample(filtered.items(), elemCount) addrsInMyStream = random.sample(filtered.items(), elemCount)
# sent 250 only if the remote isn't interested in it # sent 250 only if the remote isn't interested in it
@ -670,18 +687,16 @@ class receiveDataThread(threading.Thread):
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > 250: if elemCount > maxAddrCount / 2:
elemCount = 250 elemCount = int(maxAddrCount / 2)
addrsInChildStreamLeft = random.sample(filtered.items(), elemCount) addrsInChildStreamLeft = random.sample(filtered.items(), elemCount)
if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streamNumber: if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streamNumber:
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > 250: if elemCount > maxAddrCount / 2:
elemCount = 250 elemCount = int(maxAddrCount / 2)
addrsInChildStreamRight = random.sample(filtered.items(), elemCount) addrsInChildStreamRight = random.sample(filtered.items(), elemCount)
numberOfAddressesInAddrMessage = 0
payload = ''
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream: for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInMyStream:
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack( payload += pack(
@ -691,6 +706,10 @@ class receiveDataThread(threading.Thread):
'>q', 1) # service bit flags offered by this node '>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST) payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port payload += pack('>H', PORT) # remote port
if numberOfAddressesInAddrMessage >= protocolAddrLimit:
sendChunk()
payload = ''
numberOfAddressesInAddrMessage = 0
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft: for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamLeft:
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack( payload += pack(
@ -700,6 +719,10 @@ class receiveDataThread(threading.Thread):
'>q', 1) # service bit flags offered by this node '>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST) payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port payload += pack('>H', PORT) # remote port
if numberOfAddressesInAddrMessage >= protocolAddrLimit:
sendChunk()
payload = ''
numberOfAddressesInAddrMessage = 0
for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight: for (HOST, PORT), timeLastReceivedMessageFromThisNode in addrsInChildStreamRight:
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack( payload += pack(
@ -709,10 +732,13 @@ class receiveDataThread(threading.Thread):
'>q', 1) # service bit flags offered by this node '>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(HOST) payload += protocol.encodeHost(HOST)
payload += pack('>H', PORT) # remote port payload += pack('>H', PORT) # remote port
if numberOfAddressesInAddrMessage >= protocolAddrLimit:
sendChunk()
payload = ''
numberOfAddressesInAddrMessage = 0
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload # flush
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('addr', payload))) sendChunk()
# We have received a version message # We have received a version message
def recversion(self, data): def recversion(self, data):

View File

@ -107,7 +107,10 @@ class singleListener(threading.Thread, StoppableThread):
# connections. # connections.
while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and state.shutdown == 0: while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and state.shutdown == 0:
self.stop.wait(10) self.stop.wait(10)
while len(shared.connectedHostsList) > 220 and state.shutdown == 0: while len(shared.connectedHostsList) > \
BMConfigParser().safeGetInt("bitmessagesettings", "maxtotalconnections", 200) + \
BMConfigParser().safeGetInt("bitmessagesettings", "maxbootstrapconnections", 20) \
and state.shutdown == 0:
logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.') logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
self.stop.wait(10) self.stop.wait(10)