Add ratings to peers

- outbound peers now have a rating
- it's also shown in the network status tab
- currently it's between -1 to +1, changes by 0.1 steps and uses a
hyperbolic function 0.05/(1.0 - rating) to convert rating to
probability with which we should connect to that node when randomly
chosen
- it increases when we successfully establish a full outbound connection
to a node, and decreases when we fail to do that
- onion nodes have priority when using SOCKS
This commit is contained in:
Peter Šurda 2017-07-05 09:17:01 +02:00
parent 9d09f9f3ce
commit a0bbd21efc
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
10 changed files with 143 additions and 73 deletions

View File

@ -4,6 +4,7 @@ import shared
from tr import _translate from tr import _translate
from inventory import Inventory, PendingDownloadQueue, PendingUpload from inventory import Inventory, PendingDownloadQueue, PendingUpload
import knownnodes
import l10n import l10n
import network.stats import network.stats
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
@ -87,15 +88,23 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
self.tableWidgetConnectionCount.setItem(0, 0, self.tableWidgetConnectionCount.setItem(0, 0,
QtGui.QTableWidgetItem("%s:%i" % (i.destination.host, i.destination.port)) QtGui.QTableWidgetItem("%s:%i" % (i.destination.host, i.destination.port))
) )
self.tableWidgetConnectionCount.setItem(0, 1, self.tableWidgetConnectionCount.setItem(0, 2,
QtGui.QTableWidgetItem("%s" % (i.userAgent)) QtGui.QTableWidgetItem("%s" % (i.userAgent))
) )
self.tableWidgetConnectionCount.setItem(0, 2, self.tableWidgetConnectionCount.setItem(0, 3,
QtGui.QTableWidgetItem("%s" % (i.tlsVersion)) QtGui.QTableWidgetItem("%s" % (i.tlsVersion))
) )
self.tableWidgetConnectionCount.setItem(0, 3, self.tableWidgetConnectionCount.setItem(0, 4,
QtGui.QTableWidgetItem("%s" % (",".join(map(str,i.streams)))) QtGui.QTableWidgetItem("%s" % (",".join(map(str,i.streams))))
) )
try:
# FIXME hard coded stream no
rating = knownnodes.knownNodes[1][i.destination]['rating']
except KeyError:
rating = "-"
self.tableWidgetConnectionCount.setItem(0, 1,
QtGui.QTableWidgetItem("%s" % (rating))
)
if i.isOutbound: if i.isOutbound:
brush = QtGui.QBrush(QtGui.QColor("yellow"), QtCore.Qt.SolidPattern) brush = QtGui.QBrush(QtGui.QColor("yellow"), QtCore.Qt.SolidPattern)
else: else:

View File

@ -117,6 +117,11 @@
<string>Peer</string> <string>Peer</string>
</property> </property>
</column> </column>
<column>
<property name="text">
<string>Rating</string>
</property>
</column>
<column> <column>
<property name="text"> <property name="text">
<string>User agent</string> <string>User agent</string>

View File

@ -98,9 +98,12 @@ class singleCleaner(threading.Thread, StoppableThread):
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
for stream in knownnodes.knownNodes: for stream in knownnodes.knownNodes:
for node in knownnodes.knownNodes[stream].keys(): for node in knownnodes.knownNodes[stream].keys():
if now - knownnodes.knownNodes[stream][node] > 2419200: # 28 days try:
shared.needToWriteKownNodesToDisk = True if now - knownnodes.knownNodes[stream][node]["lastseen"] > 2419200: # 28 days
del knownnodes.knownNodes[stream][node] shared.needToWriteKownNodesToDisk = True
del knownnodes.knownNodes[stream][node]
except TypeError:
print "Error in %s" % (str(node))
# Let us write out the knowNodes to disk if there is anything new to write out. # Let us write out the knowNodes to disk if there is anything new to write out.
if shared.needToWriteKnownNodesToDisk: if shared.needToWriteKnownNodesToDisk:

View File

@ -12,15 +12,15 @@ def createDefaultKnownNodes(appdata):
stream1 = {} stream1 = {}
#stream1[state.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time()) #stream1[state.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time())
stream1[state.Peer('5.45.99.75', 8444)] = int(time.time()) stream1[state.Peer('5.45.99.75', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('75.167.159.54', 8444)] = int(time.time()) stream1[state.Peer('75.167.159.54', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('95.165.168.168', 8444)] = int(time.time()) stream1[state.Peer('95.165.168.168', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('85.180.139.241', 8444)] = int(time.time()) stream1[state.Peer('85.180.139.241', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('158.222.217.190', 8080)] = int(time.time()) stream1[state.Peer('158.222.217.190', 8080)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('178.62.12.187', 8448)] = int(time.time()) stream1[state.Peer('178.62.12.187', 8448)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('24.188.198.204', 8111)] = int(time.time()) stream1[state.Peer('24.188.198.204', 8111)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('109.147.204.113', 1195)] = int(time.time()) stream1[state.Peer('109.147.204.113', 1195)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
stream1[state.Peer('178.11.46.221', 8444)] = int(time.time()) stream1[state.Peer('178.11.46.221', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False}
############# Stream 2 ################# ############# Stream 2 #################
stream2 = {} stream2 = {}

View File

@ -9,30 +9,31 @@ import knownnodes
import socks import socks
import state import state
def addKnownNode(stream, peer, lastseen=None, self=False):
if lastseen is None:
lastseen = time.time()
knownnodes.knownNodes[stream][peer] = {
"lastseen": lastseen,
"rating": 0,
"self": self,
}
def knownNodes(): def knownNodes():
try: try:
# We shouldn't have to use the knownnodes.knownNodesLock because this had with open(state.appdata + 'knownnodes.dat', 'rb') as pickleFile:
# better be the only thread accessing knownNodes right now. with knownnodes.knownNodesLock:
pickleFile = open(state.appdata + 'knownnodes.dat', 'rb') knownnodes.knownNodes = pickle.load(pickleFile)
loadedKnownNodes = pickle.load(pickleFile) # the old format was {Peer:lastseen, ...}
pickleFile.close() # the new format is {Peer:{"lastseen":i, "rating":f}}
# The old format of storing knownNodes was as a 'host: (port, time)' for stream in knownnodes.knownNodes.keys():
# mapping. The new format is as 'Peer: time' pairs. If we loaded for node, params in knownnodes.knownNodes[stream].items():
# data in the old format, transform it to the new style. if isinstance(params, (float, int)):
for stream, nodes in loadedKnownNodes.items(): addKnownNode(stream, node, params)
knownnodes.knownNodes[stream] = {}
for node_tuple in nodes.items():
try:
host, (port, lastseen) = node_tuple
peer = state.Peer(host, port)
except:
peer, lastseen = node_tuple
knownnodes.knownNodes[stream][peer] = lastseen
except: except:
knownnodes.knownNodes = defaultKnownNodes.createDefaultKnownNodes(state.appdata) knownnodes.knownNodes = defaultKnownNodes.createDefaultKnownNodes(state.appdata)
# your own onion address, if setup # your own onion address, if setup
if BMConfigParser().has_option('bitmessagesettings', 'onionhostname') and ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname'): if BMConfigParser().has_option('bitmessagesettings', 'onionhostname') and ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname'):
knownnodes.knownNodes[1][state.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport'))] = int(time.time()) addKnownNode(1, state.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport')), self=True)
if BMConfigParser().getint('bitmessagesettings', 'settingsversion') > 10: if BMConfigParser().getint('bitmessagesettings', 'settingsversion') > 10:
logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.') logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.')
raise SystemExit raise SystemExit
@ -47,17 +48,17 @@ def dns():
try: try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
knownnodes.knownNodes[1][state.Peer(item[4][0], 8080)] = int(time.time()) addKnownNode(1, state.Peer(item[4][0], 8080))
except: except:
logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.') logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.')
try: try:
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
knownnodes.knownNodes[1][state.Peer(item[4][0], 8444)] = int(time.time()) addKnownNode(1, state.Peer(item[4][0], 8444))
except: except:
logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.') logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.')
elif BMConfigParser().get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': elif BMConfigParser().get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
knownnodes.knownNodes[1][state.Peer('quzwelsuziwqgpt2.onion', 8444)] = int(time.time()) addKnownNode(1, state.Peer('quzwelsuziwqgpt2.onion', 8444))
logger.debug("Adding quzwelsuziwqgpt2.onion:8444 to knownNodes.") logger.debug("Adding quzwelsuziwqgpt2.onion:8444 to knownNodes.")
for port in [8080, 8444]: for port in [8080, 8444]:
logger.debug("Resolving %i through SOCKS...", port) logger.debug("Resolving %i through SOCKS...", port)
@ -90,7 +91,6 @@ def dns():
else: else:
if ip is not None: if ip is not None:
logger.info ('Adding ' + ip + ' to knownNodes based on SOCKS DNS bootstrap method') logger.info ('Adding ' + ip + ' to knownNodes based on SOCKS DNS bootstrap method')
knownnodes.knownNodes[1][state.Peer(ip, port)] = time.time() addKnownNode(1, state.Peer(ip, port))
else: else:
logger.info('DNS bootstrap skipped because the proxy type does not support DNS resolution.') logger.info('DNS bootstrap skipped because the proxy type does not support DNS resolution.')

View File

@ -16,10 +16,30 @@ def saveKnownNodes(dirName = None):
with open(dirName + 'knownnodes.dat', 'wb') as output: with open(dirName + 'knownnodes.dat', 'wb') as output:
pickle.dump(knownNodes, output) pickle.dump(knownNodes, output)
def increaseRating(peer):
increaseAmount = 0.1
maxRating = 1
with knownNodesLock:
for stream in knownNodes.keys():
try:
knownNodes[stream][peer]["rating"] = min(knownNodes[stream][peer]["rating"] + increaseAmount, maxRating)
except KeyError:
pass
def decreaseRating(peer):
decreaseAmount = 0.1
minRating = -1
with knownNodesLock:
for stream in knownNodes.keys():
try:
knownNodes[stream][peer]["rating"] = max(knownNodes[stream][peer]["rating"] - decreaseAmount, minRating)
except KeyError:
pass
def trimKnownNodes(recAddrStream = 1): def trimKnownNodes(recAddrStream = 1):
if len(knownNodes[recAddrStream]) < BMConfigParser().get("knownnodes", "maxnodes"): if len(knownNodes[recAddrStream]) < BMConfigParser().get("knownnodes", "maxnodes"):
return return
with knownNodesLock: with knownNodesLock:
oldestList = sorted(knownNodes[recAddrStream], key=knownNodes[recAddrStream].get)[:knownNodesTrimAmount] oldestList = sorted(knownNodes[recAddrStream], key=lambda x: x['lastseen'])[:knownNodesTrimAmount]
for oldest in oldestList: for oldest in oldestList:
del knownNodes[recAddrStream][oldest] del knownNodes[recAddrStream][oldest]

View File

@ -328,16 +328,21 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
decodedIP = protocol.checkIPAddress(ip) decodedIP = protocol.checkIPAddress(ip)
if stream not in state.streamsInWhichIAmParticipating: if stream not in state.streamsInWhichIAmParticipating:
continue continue
#print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream]))
if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive: if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive:
peer = state.Peer(decodedIP, port) peer = state.Peer(decodedIP, port)
if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime: if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
continue continue
if len(knownnodes.knownNodes[stream]) < BMConfigParser().get("knownnodes", "maxnodes"): if len(knownnodes.knownNodes[stream]) < BMConfigParser().get("knownnodes", "maxnodes"):
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
knownnodes.knownNodes[stream][peer] = seenTime try:
#knownnodes.knownNodes[stream][peer] = seenTime knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime
#AddrUploadQueue().put((stream, peer)) except (TypeError, KeyError):
knownnodes.knownNodes[stream][peer] = {
"lastseen": seenTime,
"rating": 0,
"self": False,
}
addrQueue.put((stream, peer, self))
return True return True
def bm_command_portcheck(self): def bm_command_portcheck(self):
@ -449,18 +454,22 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def assembleAddr(peerList): def assembleAddr(peerList):
if isinstance(peerList, state.Peer): if isinstance(peerList, state.Peer):
peerList = (peerList) peerList = (peerList)
# TODO handle max length, now it's done by upper layers if not peerList:
payload = addresses.encodeVarint(len(peerList)) return b''
for address in peerList: retval = b''
stream, peer, timestamp = address for i in range(0, len(peerList), BMProto.maxAddrCount):
payload += struct.pack( payload = addresses.encodeVarint(len(peerList[i:i + BMProto.maxAddrCount]))
'>Q', timestamp) # 64-bit time for address in peerList[i:i + BMProto.maxAddrCount]:
payload += struct.pack('>I', stream) stream, peer, timestamp = address
payload += struct.pack( payload += struct.pack(
'>q', 1) # service bit flags offered by this node '>Q', timestamp) # 64-bit time
payload += protocol.encodeHost(peer.host) payload += struct.pack('>I', stream)
payload += struct.pack('>H', peer.port) # remote port payload += struct.pack(
return protocol.CreatePacket('addr', payload) '>q', 1) # service bit flags offered by this node
payload += protocol.encodeHost(peer.host)
payload += struct.pack('>H', peer.port) # remote port
retval += protocol.CreatePacket('addr', payload)
return retval
def handle_close(self, reason=None): def handle_close(self, reason=None):
self.set_state("close") self.set_state("close")

View File

@ -7,6 +7,7 @@ from queues import portCheckerQueue, peerDiscoveryQueue
import state import state
def chooseConnection(stream): def chooseConnection(stream):
haveOnion = BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
if state.trustedPeer: if state.trustedPeer:
return state.trustedPeer return state.trustedPeer
try: try:
@ -17,5 +18,21 @@ def chooseConnection(stream):
retval = peerDiscoveryQueue.get(False) retval = peerDiscoveryQueue.get(False)
peerDiscoveryQueue.task_done() peerDiscoveryQueue.task_done()
except Queue.Empty: except Queue.Empty:
return random.choice(knownnodes.knownNodes[stream].keys()) for i in range(50):
peer = random.choice(knownnodes.knownNodes[stream].keys())
try:
rating = knownnodes.knownNodes[stream][peer]["rating"]
except TypeError:
print "Error in %s" % (peer)
rating = 0
if haveOnion and peer.host.endswith('.onion') and rating > 0:
rating *= 10
if rating > 1:
rating = 1
try:
if 0.05/(1.0-rating) > random.random():
return peer
except ZeroDivisionError:
return peer
raise ValueError
return retval return retval

View File

@ -135,7 +135,10 @@ class BMConnectionPool(object):
pending = len(self.outboundConnections) - established pending = len(self.outboundConnections) - established
if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"): if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"):
for i in range(state.maximumNumberOfHalfOpenConnections - pending): for i in range(state.maximumNumberOfHalfOpenConnections - pending):
chosen = chooseConnection(random.choice(self.streams)) try:
chosen = chooseConnection(random.choice(self.streams))
except ValueError:
continue
if chosen in self.outboundConnections: if chosen in self.outboundConnections:
continue continue
if chosen.host in self.inboundConnections: if chosen.host in self.inboundConnections:

View File

@ -98,6 +98,8 @@ class TCPConnection(BMProto, TLSDispatcher):
UISignalQueue.put(('updateNetworkStatusTab', 'no data')) UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.antiIntersectionDelay(True) self.antiIntersectionDelay(True)
self.fullyEstablished = True self.fullyEstablished = True
if self.isOutbound:
knownnodes.increaseRating(self.destination)
self.sendAddr() self.sendAddr()
self.sendBigInv() self.sendBigInv()
@ -117,7 +119,7 @@ class TCPConnection(BMProto, TLSDispatcher):
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
if len(knownnodes.knownNodes[stream]) > 0: if len(knownnodes.knownNodes[stream]) > 0:
filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > maxAddrCount: if elemCount > maxAddrCount:
elemCount = maxAddrCount elemCount = maxAddrCount
@ -126,25 +128,21 @@ class TCPConnection(BMProto, TLSDispatcher):
# sent 250 only if the remote isn't interested in it # sent 250 only if the remote isn't interested in it
if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams: if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > maxAddrCount / 2: if elemCount > maxAddrCount / 2:
elemCount = int(maxAddrCount / 2) elemCount = int(maxAddrCount / 2)
addrs[stream * 2] = random.sample(filtered.items(), elemCount) addrs[stream * 2] = random.sample(filtered.items(), elemCount)
if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams: if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams:
filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items()
if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)}
elemCount = len(filtered) elemCount = len(filtered)
if elemCount > maxAddrCount / 2: if elemCount > maxAddrCount / 2:
elemCount = int(maxAddrCount / 2) elemCount = int(maxAddrCount / 2)
addrs[stream * 2 + 1] = random.sample(filtered.items(), elemCount) addrs[stream * 2 + 1] = random.sample(filtered.items(), elemCount)
for substream in addrs.keys(): for substream in addrs.keys():
for peer, timestamp in addrs[substream]: for peer, params in addrs[substream]:
templist.append((substream, peer, timestamp)) templist.append((substream, peer, params["lastseen"]))
if len(templist) >= BMProto.maxAddrCount:
self.writeQueue.put(BMProto.assembleAddr(templist))
templist = []
# flush
if len(templist) > 0: if len(templist) > 0:
self.writeQueue.put(BMProto.assembleAddr(templist)) self.writeQueue.put(BMProto.assembleAddr(templist))
@ -163,16 +161,22 @@ class TCPConnection(BMProto, TLSDispatcher):
self.connectedAt = time.time() self.connectedAt = time.time()
def handle_read(self): def handle_read(self):
# try:
TLSDispatcher.handle_read(self) TLSDispatcher.handle_read(self)
# except socket.error as e: if self.isOutbound and self.fullyEstablished:
# logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e))) for s in self.streams:
try:
with knownnodes.knownNodesLock:
knownnodes.knownNodes[s][self.destination]["lastseen"] = time.time()
except KeyError:
pass
def handle_write(self): def handle_write(self):
# try:
TLSDispatcher.handle_write(self) TLSDispatcher.handle_write(self)
# except socket.error as e:
# logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e))) def handle_close(self, reason=None):
if self.isOutbound and not self.fullyEstablished:
knownnodes.decreaseRating(self.destination)
BMProto.handle_close(self, reason)
class Socks5BMConnection(Socks5Connection, TCPConnection): class Socks5BMConnection(Socks5Connection, TCPConnection):