diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 6e592304..6129aa7f 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -123,14 +123,10 @@ class receiveDataThread(threading.Thread): logger.debug('sock.recv retriable error') continue logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err)) - if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: - shared.timeOffsetWrongCount += 1 break # print 'Received', repr(self.data) if len(self.data) == dataLen: # If self.sock.recv returned no data: logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')') - if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: - shared.timeOffsetWrongCount += 1 break else: self.processData() @@ -264,13 +260,7 @@ class receiveDataThread(threading.Thread): # We have thus both sent and received a verack. self.connectionFullyEstablished() - def connectionFullyEstablished(self): - if self.connectionIsOrWasFullyEstablished: - # there is no reason to run this function a second time - return - self.connectionIsOrWasFullyEstablished = True - shared.timeOffsetWrongCount = 0 - + def sslHandshake(self): self.sslSock = self.sock if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and protocol.haveSSL(not self.initiatedConnection)): @@ -320,6 +310,46 @@ class receiveDataThread(threading.Thread): return # SSL in the background should be blocking, otherwise the error handling is difficult self.sslSock.settimeout(None) + + def peerValidityChecks(self): + if self.remoteProtocolVersion < 3: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your is using an old protocol. Closing connection."))) + logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer)) + return False + if self.timeOffset > 3600: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection."))) + logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, self.timeOffset)) + shared.timeOffsetWrongCount += 1 + time.sleep(2) + return False + elif self.timeOffset < -3600: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection."))) + logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, self.timeOffset)) + shared.timeOffsetWrongCount += 1 + return False + else: + shared.timeOffsetWrongCount = 0 + if len(self.streamNumber) == 0: + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="We don't have shared stream interests. Closing connection."))) + logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.') + return False + return True + + def connectionFullyEstablished(self): + if self.connectionIsOrWasFullyEstablished: + # there is no reason to run this function a second time + return + + self.sslHandshake() + if self.peerValidityChecks() == False: + time.sleep(2) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) + self.checkTimeOffsetNotification() + return + + self.connectionIsOrWasFullyEstablished = True + shared.timeOffsetWrongCount = 0 + # Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', (self.services, self.sslSock))) @@ -349,7 +379,7 @@ class receiveDataThread(threading.Thread): self.sendaddr() # This is one large addr message to this one peer. if not self.initiatedConnection and len(shared.connectedHostsList) > 200: logger.info ('We are connected to too many people. Closing connection.') - + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later."))) self.sendDataThreadQueue.put((0, 'shutdown','no data')) return self.sendBigInv() @@ -706,31 +736,12 @@ class receiveDataThread(threading.Thread): ignore this version message """ return + self.remoteProtocolVersion, = unpack('>L', data[:4]) self.services, = unpack('>q', data[4:12]) - if self.remoteProtocolVersion < 3: - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer)) - return + timestamp, = unpack('>Q', data[12:20]) - timeOffset = timestamp - int(time.time()) - if timeOffset > 3600: - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection."))) - logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, timeOffset)) - shared.timeOffsetWrongCount += 1 - time.sleep(2) - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - return - elif timeOffset < -3600: - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection."))) - logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, timeOffset)) - shared.timeOffsetWrongCount += 1 - time.sleep(2) - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - return - else: - shared.timeOffsetWrongCount = 0 - self.checkTimeOffsetNotification() + self.timeOffset = timestamp - int(time.time()) self.myExternalIP = socket.inet_ntoa(data[40:44]) # print 'myExternalIP', self.myExternalIP @@ -739,13 +750,13 @@ class receiveDataThread(threading.Thread): useragentLength, lengthOfUseragentVarint = decodeVarint( data[80:84]) readPosition = 80 + lengthOfUseragentVarint - useragent = data[readPosition:readPosition + useragentLength] + self.userAgent = data[readPosition:readPosition + useragentLength] # version check try: - userAgentName, userAgentVersion = useragent[1:-1].split(":", 2) + userAgentName, userAgentVersion = userAgent[1:-1].split(":", 2) except: - userAgentName = useragent + userAgentName = self.userAgent userAgentVersion = "0.0.0" if userAgentName == "PyBitmessage": myVersion = [int(n) for n in softwareVersion.split(".")] @@ -770,16 +781,11 @@ class receiveDataThread(threading.Thread): newStreamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:]) readPosition += lengthOfRemoteStreamNumber self.remoteStreams.append(newStreamNumber) - logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', useragent, ', '.join(str(x) for x in self.remoteStreams), timeOffset) + logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', self.userAgent, ', '.join(str(x) for x in self.remoteStreams), self.timeOffset) # find shared streams self.streamNumber = sorted(set(state.streamsInWhichIAmParticipating).intersection(self.remoteStreams)) - if len(self.streamNumber) == 0: - self.sendDataThreadQueue.put((0, 'shutdown','no data')) - logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.') - return - shared.connectedHostsList[ self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. self.sendDataThreadQueue.put((0, 'setStreamNumber', self.remoteStreams)) @@ -810,7 +816,6 @@ class receiveDataThread(threading.Thread): self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage( self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection))) - # Sends a verack message def sendverack(self): logger.debug('Sending verack')