From 07722fb606192b88fd76f6a0422e60cb224f100a Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Tue, 7 Feb 2017 19:38:52 +0100 Subject: [PATCH] Node negotiation error handling - complete the version and SSL handshake first, and only then feed errors into the stream and close connection - this allows more accurate error handling on both sides - also the timeOffset error trigger is now more accurate, but requires more nodes to upgrade --- src/class_receiveDataThread.py | 93 ++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 44 deletions(-) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 6e592304..6129aa7f 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -123,14 +123,10 @@ class receiveDataThread(threading.Thread): logger.debug('sock.recv retriable error') continue logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err)) - if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: - shared.timeOffsetWrongCount += 1 break # print 'Received', repr(self.data) if len(self.data) == dataLen: # If self.sock.recv returned no data: logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')') - if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: - shared.timeOffsetWrongCount += 1 break else: self.processData() @@ -264,13 +260,7 @@ class receiveDataThread(threading.Thread): # We have thus both sent and received a verack. self.connectionFullyEstablished() - def connectionFullyEstablished(self): - if self.connectionIsOrWasFullyEstablished: - # there is no reason to run this function a second time - return - self.connectionIsOrWasFullyEstablished = True - shared.timeOffsetWrongCount = 0 - + def sslHandshake(self): self.sslSock = self.sock if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.initiatedConnection)): @@ -320,6 +310,46 @@ class receiveDataThread(threading.Thread): return # SSL in the background should be blocking, otherwise the error handling is difficult self.sslSock.settimeout(None) + + def peerValidityChecks(self): + if self.remoteProtocolVersion < 3: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your is using an old protocol. Closing connection."))) + logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer)) + return False + if self.timeOffset > 3600: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection."))) + logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, self.timeOffset)) + shared.timeOffsetWrongCount += 1 + time.sleep(2) + return False + elif self.timeOffset < -3600: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection."))) + logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, self.timeOffset)) + shared.timeOffsetWrongCount += 1 + return False + else: + shared.timeOffsetWrongCount = 0 + if len(self.streamNumber) == 0: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="We don't have shared stream interests. Closing connection."))) + logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.') + return False + return True + + def connectionFullyEstablished(self): + if self.connectionIsOrWasFullyEstablished: + # there is no reason to run this function a second time + return + + self.sslHandshake() + if self.peerValidityChecks() == False: + time.sleep(2) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) + self.checkTimeOffsetNotification() + return + + self.connectionIsOrWasFullyEstablished = True + shared.timeOffsetWrongCount = 0 + # Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', (self.services, self.sslSock))) @@ -349,7 +379,7 @@ class receiveDataThread(threading.Thread): self.sendaddr() # This is one large addr message to this one peer. if not self.initiatedConnection and len(shared.connectedHostsList) > 200: logger.info ('We are connected to too many people. Closing connection.') - + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later."))) self.sendDataThreadQueue.put((0, 'shutdown','no data')) return self.sendBigInv() @@ -706,31 +736,12 @@ class receiveDataThread(threading.Thread): ignore this version message """ return + self.remoteProtocolVersion, = unpack('>L', data[:4]) self.services, = unpack('>q', data[4:12]) - if self.remoteProtocolVersion < 3: - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer)) - return + timestamp, = unpack('>Q', data[12:20]) - timeOffset = timestamp - int(time.time()) - if timeOffset > 3600: - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection."))) - logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, timeOffset)) - shared.timeOffsetWrongCount += 1 - time.sleep(2) - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - return - elif timeOffset < -3600: - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection."))) - logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, timeOffset)) - shared.timeOffsetWrongCount += 1 - time.sleep(2) - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - return - else: - shared.timeOffsetWrongCount = 0 - self.checkTimeOffsetNotification() + self.timeOffset = timestamp - int(time.time()) self.myExternalIP = socket.inet_ntoa(data[40:44]) # print 'myExternalIP', self.myExternalIP @@ -739,13 +750,13 @@ class receiveDataThread(threading.Thread): useragentLength, lengthOfUseragentVarint = decodeVarint( data[80:84]) readPosition = 80 + lengthOfUseragentVarint - useragent = data[readPosition:readPosition + useragentLength] + self.userAgent = data[readPosition:readPosition + useragentLength] # version check try: - userAgentName, userAgentVersion = useragent[1:-1].split(":", 2) + userAgentName, userAgentVersion = userAgent[1:-1].split(":", 2) except: - userAgentName = useragent + userAgentName = self.userAgent userAgentVersion = "0.0.0" if userAgentName == "PyBitmessage": myVersion = [int(n) for n in softwareVersion.split(".")] @@ -770,16 +781,11 @@ class receiveDataThread(threading.Thread): newStreamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:]) readPosition += lengthOfRemoteStreamNumber self.remoteStreams.append(newStreamNumber) - logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', useragent, ', '.join(str(x) for x in self.remoteStreams), timeOffset) + logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', self.userAgent, ', '.join(str(x) for x in self.remoteStreams), self.timeOffset) # find shared streams self.streamNumber = sorted(set(state.streamsInWhichIAmParticipating).intersection(self.remoteStreams)) - if len(self.streamNumber) == 0: - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.') - return - shared.connectedHostsList[ self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. self.sendDataThreadQueue.put((0, 'setStreamNumber', self.remoteStreams)) @@ -810,7 +816,6 @@ class receiveDataThread(threading.Thread): self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage( self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection))) - # Sends a verack message def sendverack(self): logger.debug('Sending verack')