From a0bbd21efcd95a7bff9984f52a7cf0e98d8ecea8 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 5 Jul 2017 09:17:01 +0200 Subject: [PATCH] Add ratings to peers - outbound peers now have a rating - it's also shown in the network status tab - currently it's between -1 to +1, changes by 0.1 steps and uses a hyperbolic function 0.05/(1.0 - rating) to convert rating to probability with which we should connect to that node when randomly chosen - it increases when we successfully establish a full outbound connection to a node, and decreases when we fail to do that - onion nodes have priority when using SOCKS --- src/bitmessageqt/networkstatus.py | 15 ++++++++-- src/bitmessageqt/networkstatus.ui | 5 ++++ src/class_singleCleaner.py | 9 ++++-- src/defaultKnownNodes.py | 18 ++++++------ src/helper_bootstrap.py | 46 +++++++++++++++---------------- src/knownnodes.py | 22 ++++++++++++++- src/network/bmproto.py | 43 +++++++++++++++++------------ src/network/connectionchooser.py | 19 ++++++++++++- src/network/connectionpool.py | 5 +++- src/network/tcp.py | 34 +++++++++++++---------- 10 files changed, 143 insertions(+), 73 deletions(-) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 4ede68f7..2609a3c4 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -4,6 +4,7 @@ import shared from tr import _translate from inventory import Inventory, PendingDownloadQueue, PendingUpload +import knownnodes import l10n import network.stats from retranslateui import RetranslateMixin @@ -87,15 +88,23 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): self.tableWidgetConnectionCount.setItem(0, 0, QtGui.QTableWidgetItem("%s:%i" % (i.destination.host, i.destination.port)) ) - self.tableWidgetConnectionCount.setItem(0, 1, + self.tableWidgetConnectionCount.setItem(0, 2, QtGui.QTableWidgetItem("%s" % (i.userAgent)) ) - self.tableWidgetConnectionCount.setItem(0, 2, + self.tableWidgetConnectionCount.setItem(0, 3, QtGui.QTableWidgetItem("%s" % (i.tlsVersion)) ) - self.tableWidgetConnectionCount.setItem(0, 3, + self.tableWidgetConnectionCount.setItem(0, 4, QtGui.QTableWidgetItem("%s" % (",".join(map(str,i.streams)))) ) + try: + # FIXME hard coded stream no + rating = knownnodes.knownNodes[1][i.destination]['rating'] + except KeyError: + rating = "-" + self.tableWidgetConnectionCount.setItem(0, 1, + QtGui.QTableWidgetItem("%s" % (rating)) + ) if i.isOutbound: brush = QtGui.QBrush(QtGui.QColor("yellow"), QtCore.Qt.SolidPattern) else: diff --git a/src/bitmessageqt/networkstatus.ui b/src/bitmessageqt/networkstatus.ui index 993cf1c6..f9cc57c4 100644 --- a/src/bitmessageqt/networkstatus.ui +++ b/src/bitmessageqt/networkstatus.ui @@ -117,6 +117,11 @@ Peer + + + Rating + + User agent diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index e6d98989..c21d8cb1 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -98,9 +98,12 @@ class singleCleaner(threading.Thread, StoppableThread): with knownnodes.knownNodesLock: for stream in knownnodes.knownNodes: for node in knownnodes.knownNodes[stream].keys(): - if now - knownnodes.knownNodes[stream][node] > 2419200: # 28 days - shared.needToWriteKownNodesToDisk = True - del knownnodes.knownNodes[stream][node] + try: + if now - knownnodes.knownNodes[stream][node]["lastseen"] > 2419200: # 28 days + shared.needToWriteKownNodesToDisk = True + del knownnodes.knownNodes[stream][node] + except TypeError: + print "Error in %s" % (str(node)) # Let us write out the knowNodes to disk if there is anything new to write out. if shared.needToWriteKnownNodesToDisk: diff --git a/src/defaultKnownNodes.py b/src/defaultKnownNodes.py index 5559f1a5..05b65014 100644 --- a/src/defaultKnownNodes.py +++ b/src/defaultKnownNodes.py @@ -12,15 +12,15 @@ def createDefaultKnownNodes(appdata): stream1 = {} #stream1[state.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time()) - stream1[state.Peer('5.45.99.75', 8444)] = int(time.time()) - stream1[state.Peer('75.167.159.54', 8444)] = int(time.time()) - stream1[state.Peer('95.165.168.168', 8444)] = int(time.time()) - stream1[state.Peer('85.180.139.241', 8444)] = int(time.time()) - stream1[state.Peer('158.222.217.190', 8080)] = int(time.time()) - stream1[state.Peer('178.62.12.187', 8448)] = int(time.time()) - stream1[state.Peer('24.188.198.204', 8111)] = int(time.time()) - stream1[state.Peer('109.147.204.113', 1195)] = int(time.time()) - stream1[state.Peer('178.11.46.221', 8444)] = int(time.time()) + stream1[state.Peer('5.45.99.75', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('75.167.159.54', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('95.165.168.168', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('85.180.139.241', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('158.222.217.190', 8080)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('178.62.12.187', 8448)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('24.188.198.204', 8111)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('109.147.204.113', 1195)] = {"lastseen": int(time.time()), "rating": 0, "self": False} + stream1[state.Peer('178.11.46.221', 8444)] = {"lastseen": int(time.time()), "rating": 0, "self": False} ############# Stream 2 ################# stream2 = {} diff --git a/src/helper_bootstrap.py b/src/helper_bootstrap.py index 33a533ca..7c7456e1 100644 --- a/src/helper_bootstrap.py +++ b/src/helper_bootstrap.py @@ -9,30 +9,31 @@ import knownnodes import socks import state +def addKnownNode(stream, peer, lastseen=None, self=False): + if lastseen is None: + lastseen = time.time() + knownnodes.knownNodes[stream][peer] = { + "lastseen": lastseen, + "rating": 0, + "self": self, + } + def knownNodes(): try: - # We shouldn't have to use the knownnodes.knownNodesLock because this had - # better be the only thread accessing knownNodes right now. - pickleFile = open(state.appdata + 'knownnodes.dat', 'rb') - loadedKnownNodes = pickle.load(pickleFile) - pickleFile.close() - # The old format of storing knownNodes was as a 'host: (port, time)' - # mapping. The new format is as 'Peer: time' pairs. If we loaded - # data in the old format, transform it to the new style. - for stream, nodes in loadedKnownNodes.items(): - knownnodes.knownNodes[stream] = {} - for node_tuple in nodes.items(): - try: - host, (port, lastseen) = node_tuple - peer = state.Peer(host, port) - except: - peer, lastseen = node_tuple - knownnodes.knownNodes[stream][peer] = lastseen + with open(state.appdata + 'knownnodes.dat', 'rb') as pickleFile: + with knownnodes.knownNodesLock: + knownnodes.knownNodes = pickle.load(pickleFile) + # the old format was {Peer:lastseen, ...} + # the new format is {Peer:{"lastseen":i, "rating":f}} + for stream in knownnodes.knownNodes.keys(): + for node, params in knownnodes.knownNodes[stream].items(): + if isinstance(params, (float, int)): + addKnownNode(stream, node, params) except: knownnodes.knownNodes = defaultKnownNodes.createDefaultKnownNodes(state.appdata) # your own onion address, if setup if BMConfigParser().has_option('bitmessagesettings', 'onionhostname') and ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname'): - knownnodes.knownNodes[1][state.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport'))] = int(time.time()) + addKnownNode(1, state.Peer(BMConfigParser().get('bitmessagesettings', 'onionhostname'), BMConfigParser().getint('bitmessagesettings', 'onionport')), self=True) if BMConfigParser().getint('bitmessagesettings', 'settingsversion') > 10: logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.') raise SystemExit @@ -47,17 +48,17 @@ def dns(): try: for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') - knownnodes.knownNodes[1][state.Peer(item[4][0], 8080)] = int(time.time()) + addKnownNode(1, state.Peer(item[4][0], 8080)) except: logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.') try: for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method') - knownnodes.knownNodes[1][state.Peer(item[4][0], 8444)] = int(time.time()) + addKnownNode(1, state.Peer(item[4][0], 8444)) except: logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.') elif BMConfigParser().get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': - knownnodes.knownNodes[1][state.Peer('quzwelsuziwqgpt2.onion', 8444)] = int(time.time()) + addKnownNode(1, state.Peer('quzwelsuziwqgpt2.onion', 8444)) logger.debug("Adding quzwelsuziwqgpt2.onion:8444 to knownNodes.") for port in [8080, 8444]: logger.debug("Resolving %i through SOCKS...", port) @@ -90,7 +91,6 @@ def dns(): else: if ip is not None: logger.info ('Adding ' + ip + ' to knownNodes based on SOCKS DNS bootstrap method') - knownnodes.knownNodes[1][state.Peer(ip, port)] = time.time() + addKnownNode(1, state.Peer(ip, port)) else: logger.info('DNS bootstrap skipped because the proxy type does not support DNS resolution.') - diff --git a/src/knownnodes.py b/src/knownnodes.py index 8f566d43..86d39cbe 100644 --- a/src/knownnodes.py +++ b/src/knownnodes.py @@ -16,10 +16,30 @@ def saveKnownNodes(dirName = None): with open(dirName + 'knownnodes.dat', 'wb') as output: pickle.dump(knownNodes, output) +def increaseRating(peer): + increaseAmount = 0.1 + maxRating = 1 + with knownNodesLock: + for stream in knownNodes.keys(): + try: + knownNodes[stream][peer]["rating"] = min(knownNodes[stream][peer]["rating"] + increaseAmount, maxRating) + except KeyError: + pass + +def decreaseRating(peer): + decreaseAmount = 0.1 + minRating = -1 + with knownNodesLock: + for stream in knownNodes.keys(): + try: + knownNodes[stream][peer]["rating"] = max(knownNodes[stream][peer]["rating"] - decreaseAmount, minRating) + except KeyError: + pass + def trimKnownNodes(recAddrStream = 1): if len(knownNodes[recAddrStream]) < BMConfigParser().get("knownnodes", "maxnodes"): return with knownNodesLock: - oldestList = sorted(knownNodes[recAddrStream], key=knownNodes[recAddrStream].get)[:knownNodesTrimAmount] + oldestList = sorted(knownNodes[recAddrStream], key=lambda x: x['lastseen'])[:knownNodesTrimAmount] for oldest in oldestList: del knownNodes[recAddrStream][oldest] diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 0c70f12e..c45f9ca3 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -328,16 +328,21 @@ class BMProto(AdvancedDispatcher, ObjectTracker): decodedIP = protocol.checkIPAddress(ip) if stream not in state.streamsInWhichIAmParticipating: continue - #print "maybe adding %s in stream %i to knownnodes (%i)" % (decodedIP, stream, len(knownnodes.knownNodes[stream])) if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive: peer = state.Peer(decodedIP, port) - if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer] > seenTime: + if peer in knownnodes.knownNodes[stream] and knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime: continue if len(knownnodes.knownNodes[stream]) < BMConfigParser().get("knownnodes", "maxnodes"): with knownnodes.knownNodesLock: - knownnodes.knownNodes[stream][peer] = seenTime - #knownnodes.knownNodes[stream][peer] = seenTime - #AddrUploadQueue().put((stream, peer)) + try: + knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime + except (TypeError, KeyError): + knownnodes.knownNodes[stream][peer] = { + "lastseen": seenTime, + "rating": 0, + "self": False, + } + addrQueue.put((stream, peer, self)) return True def bm_command_portcheck(self): @@ -449,18 +454,22 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def assembleAddr(peerList): if isinstance(peerList, state.Peer): peerList = (peerList) - # TODO handle max length, now it's done by upper layers - payload = addresses.encodeVarint(len(peerList)) - for address in peerList: - stream, peer, timestamp = address - payload += struct.pack( - '>Q', timestamp) # 64-bit time - payload += struct.pack('>I', stream) - payload += struct.pack( - '>q', 1) # service bit flags offered by this node - payload += protocol.encodeHost(peer.host) - payload += struct.pack('>H', peer.port) # remote port - return protocol.CreatePacket('addr', payload) + if not peerList: + return b'' + retval = b'' + for i in range(0, len(peerList), BMProto.maxAddrCount): + payload = addresses.encodeVarint(len(peerList[i:i + BMProto.maxAddrCount])) + for address in peerList[i:i + BMProto.maxAddrCount]: + stream, peer, timestamp = address + payload += struct.pack( + '>Q', timestamp) # 64-bit time + payload += struct.pack('>I', stream) + payload += struct.pack( + '>q', 1) # service bit flags offered by this node + payload += protocol.encodeHost(peer.host) + payload += struct.pack('>H', peer.port) # remote port + retval += protocol.CreatePacket('addr', payload) + return retval def handle_close(self, reason=None): self.set_state("close") diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 0770bfa6..2cce5036 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -7,6 +7,7 @@ from queues import portCheckerQueue, peerDiscoveryQueue import state def chooseConnection(stream): + haveOnion = BMConfigParser().safeGet("bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' if state.trustedPeer: return state.trustedPeer try: @@ -17,5 +18,21 @@ def chooseConnection(stream): retval = peerDiscoveryQueue.get(False) peerDiscoveryQueue.task_done() except Queue.Empty: - return random.choice(knownnodes.knownNodes[stream].keys()) + for i in range(50): + peer = random.choice(knownnodes.knownNodes[stream].keys()) + try: + rating = knownnodes.knownNodes[stream][peer]["rating"] + except TypeError: + print "Error in %s" % (peer) + rating = 0 + if haveOnion and peer.host.endswith('.onion') and rating > 0: + rating *= 10 + if rating > 1: + rating = 1 + try: + if 0.05/(1.0-rating) > random.random(): + return peer + except ZeroDivisionError: + return peer + raise ValueError return retval diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index ae76c318..9328720f 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -135,7 +135,10 @@ class BMConnectionPool(object): pending = len(self.outboundConnections) - established if established < BMConfigParser().safeGetInt("bitmessagesettings", "maxoutboundconnections"): for i in range(state.maximumNumberOfHalfOpenConnections - pending): - chosen = chooseConnection(random.choice(self.streams)) + try: + chosen = chooseConnection(random.choice(self.streams)) + except ValueError: + continue if chosen in self.outboundConnections: continue if chosen.host in self.inboundConnections: diff --git a/src/network/tcp.py b/src/network/tcp.py index bd925063..120cfdce 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -98,6 +98,8 @@ class TCPConnection(BMProto, TLSDispatcher): UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.antiIntersectionDelay(True) self.fullyEstablished = True + if self.isOutbound: + knownnodes.increaseRating(self.destination) self.sendAddr() self.sendBigInv() @@ -117,7 +119,7 @@ class TCPConnection(BMProto, TLSDispatcher): with knownnodes.knownNodesLock: if len(knownnodes.knownNodes[stream]) > 0: filtered = {k: v for k, v in knownnodes.knownNodes[stream].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount: elemCount = maxAddrCount @@ -126,25 +128,21 @@ class TCPConnection(BMProto, TLSDispatcher): # sent 250 only if the remote isn't interested in it if len(knownnodes.knownNodes[stream * 2]) > 0 and stream not in self.streams: filtered = {k: v for k, v in knownnodes.knownNodes[stream*2].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount / 2: elemCount = int(maxAddrCount / 2) addrs[stream * 2] = random.sample(filtered.items(), elemCount) if len(knownnodes.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streams: filtered = {k: v for k, v in knownnodes.knownNodes[stream*2+1].items() - if v > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} + if v["lastseen"] > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers)} elemCount = len(filtered) if elemCount > maxAddrCount / 2: elemCount = int(maxAddrCount / 2) addrs[stream * 2 + 1] = random.sample(filtered.items(), elemCount) for substream in addrs.keys(): - for peer, timestamp in addrs[substream]: - templist.append((substream, peer, timestamp)) - if len(templist) >= BMProto.maxAddrCount: - self.writeQueue.put(BMProto.assembleAddr(templist)) - templist = [] - # flush + for peer, params in addrs[substream]: + templist.append((substream, peer, params["lastseen"])) if len(templist) > 0: self.writeQueue.put(BMProto.assembleAddr(templist)) @@ -163,16 +161,22 @@ class TCPConnection(BMProto, TLSDispatcher): self.connectedAt = time.time() def handle_read(self): -# try: TLSDispatcher.handle_read(self) -# except socket.error as e: -# logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e))) + if self.isOutbound and self.fullyEstablished: + for s in self.streams: + try: + with knownnodes.knownNodesLock: + knownnodes.knownNodes[s][self.destination]["lastseen"] = time.time() + except KeyError: + pass def handle_write(self): -# try: TLSDispatcher.handle_write(self) -# except socket.error as e: -# logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e))) + + def handle_close(self, reason=None): + if self.isOutbound and not self.fullyEstablished: + knownnodes.decreaseRating(self.destination) + BMProto.handle_close(self, reason) class Socks5BMConnection(Socks5Connection, TCPConnection):