Isolate and optimize knownnodes processing #1579
|
@ -140,9 +140,28 @@ class singleCleaner(StoppableThread):
|
||||||
if state.thisapp.daemon or not state.enableGUI:
|
if state.thisapp.daemon or not state.enableGUI:
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
|
|
||||||
# inv/object tracking
|
|
||||||
for connection in BMConnectionPool().connections():
|
for connection in BMConnectionPool().connections():
|
||||||
connection.clean()
|
connection.clean() # inv/object tracking
|
||||||
|
if not connection.fullyEstablished:
|
||||||
|
continue
|
||||||
|
self.logger.warning(
|
||||||
|
'Cleaning up duplicate ports for host %s',
|
||||||
|
connection.destination.host)
|
||||||
|
# remove peers with same host and other ports from knownnodes
|
||||||
|
for stream in connection.streams:
|
||||||
|
if stream == 0: # FIXME: stream 0 is a protocol violation
|
||||||
|
self.logger.warning(
|
||||||
|
'Found stream 0 for node %s:%i',
|
||||||
|
connection.destination)
|
||||||
|
knownnodes.decreaseRating(connection.destination)
|
||||||
|
continue
|
||||||
|
for node in [
|
||||||
|
node for node in knownnodes.knownNodes[stream]
|
||||||
|
if node.host == connection.destination.host
|
||||||
|
and node.port != connection.destination.port
|
||||||
|
]:
|
||||||
|
self.logger.warning('port %s', node.port)
|
||||||
|
del knownnodes.knownNodes[stream][node]
|
||||||
|
|
||||||
# discovery tracking
|
# discovery tracking
|
||||||
exp = time.time() - singleCleaner.expireDiscoveredPeers
|
exp = time.time() - singleCleaner.expireDiscoveredPeers
|
||||||
|
|
|
@ -34,6 +34,9 @@ knownNodesForgetRating = -0.5
|
||||||
|
|
||||||
knownNodesActual = False
|
knownNodesActual = False
|
||||||
|
|
||||||
|
outages = {}
|
||||||
|
"""a dict with all hosts"""
|
||||||
|
|
||||||
logger = logging.getLogger('default')
|
logger = logging.getLogger('default')
|
||||||
|
|
||||||
DEFAULT_NODES = (
|
DEFAULT_NODES = (
|
||||||
|
@ -71,7 +74,13 @@ def json_deserialize_knownnodes(source):
|
||||||
for node in json.load(source):
|
for node in json.load(source):
|
||||||
peer = node['peer']
|
peer = node['peer']
|
||||||
info = node['info']
|
info = node['info']
|
||||||
peer = Peer(str(peer['host']), peer.get('port', 8444))
|
port = peer.get('port', 8444)
|
||||||
|
peer = Peer(str(peer['host']), port)
|
||||||
|
outages[peer.host] = {
|
||||||
|
'lastseen': info.get('lastseen', time.time()),
|
||||||
|
'port': port,
|
||||||
|
'stream': info.get('stream', 1)
|
||||||
|
}
|
||||||
knownNodes[node['stream']][peer] = info
|
knownNodes[node['stream']][peer] = info
|
||||||
if not (knownNodesActual
|
if not (knownNodesActual
|
||||||
or info.get('self')) and peer not in DEFAULT_NODES:
|
or info.get('self')) and peer not in DEFAULT_NODES:
|
||||||
|
@ -121,11 +130,18 @@ def addKnownNode(stream, peer, lastseen=None, is_self=False):
|
||||||
else:
|
else:
|
||||||
lastseen = int(lastseen)
|
lastseen = int(lastseen)
|
||||||
try:
|
try:
|
||||||
|
prev = outages[peer.host]
|
||||||
|
if peer.port == prev['port']:
|
||||||
|
outages[peer.host]['lastseen'] = lastseen
|
||||||
info = knownNodes[stream].get(peer)
|
info = knownNodes[stream].get(peer)
|
||||||
if lastseen > info['lastseen']:
|
|
||||||
info['lastseen'] = lastseen
|
info['lastseen'] = lastseen
|
||||||
except (KeyError, TypeError):
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
except TypeError:
|
||||||
|
# don't update expired node if have enough nodes in that stream
|
||||||
|
if len(knownNodes[stream]) > 64:
|
||||||
|
return
|
||||||
|
rating = -0.2
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -133,6 +149,25 @@ def addKnownNode(stream, peer, lastseen=None, is_self=False):
|
||||||
if len(knownNodes[stream]) > BMConfigParser().safeGetInt(
|
if len(knownNodes[stream]) > BMConfigParser().safeGetInt(
|
||||||
"knownnodes", "maxnodes"):
|
"knownnodes", "maxnodes"):
|
||||||
return
|
return
|
||||||
|
try:
|
||||||
|
prev = outages[peer.host]
|
||||||
|
except KeyError:
|
||||||
|
outages[peer.host] = {
|
||||||
|
'stream': stream,
|
||||||
|
'port': peer.port,
|
||||||
|
'lastseen': lastseen
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
if stream == prev['stream']:
|
||||||
|
if lastseen - prev['lastseen'] > 3600 * 24:
|
||||||
|
# more than a day ago, this should be port change
|
||||||
|
try:
|
||||||
|
del knownNodes[stream][Peer(peer.host, prev['port'])]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
outages[peer.host]['port'] = peer.port
|
||||||
|
else:
|
||||||
|
rating = -0.2
|
||||||
|
|
||||||
knownNodes[stream][peer] = {
|
knownNodes[stream][peer] = {
|
||||||
'lastseen': lastseen,
|
'lastseen': lastseen,
|
||||||
|
|
Reference in New Issue
Block a user