Node negotiation error handling

- complete the version and SSL handshake first, and only then feed
  errors into the stream and close connection
- this allows more accurate error handling on both sides
- also the timeOffset error trigger is now more accurate, but requires
  more nodes to upgrade
This commit is contained in:
Peter Šurda 2017-02-07 19:38:52 +01:00
parent 8515f9a9fc
commit 07722fb606
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87

View File

@ -123,14 +123,10 @@ class receiveDataThread(threading.Thread):
logger.debug('sock.recv retriable error') logger.debug('sock.recv retriable error')
continue continue
logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err)) logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err))
if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished:
shared.timeOffsetWrongCount += 1
break break
# print 'Received', repr(self.data) # print 'Received', repr(self.data)
if len(self.data) == dataLen: # If self.sock.recv returned no data: if len(self.data) == dataLen: # If self.sock.recv returned no data:
logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')') logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished:
shared.timeOffsetWrongCount += 1
break break
else: else:
self.processData() self.processData()
@ -264,13 +260,7 @@ class receiveDataThread(threading.Thread):
# We have thus both sent and received a verack. # We have thus both sent and received a verack.
self.connectionFullyEstablished() self.connectionFullyEstablished()
def connectionFullyEstablished(self): def sslHandshake(self):
if self.connectionIsOrWasFullyEstablished:
# there is no reason to run this function a second time
return
self.connectionIsOrWasFullyEstablished = True
shared.timeOffsetWrongCount = 0
self.sslSock = self.sock self.sslSock = self.sock
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL) and
protocol.haveSSL(not self.initiatedConnection)): protocol.haveSSL(not self.initiatedConnection)):
@ -320,6 +310,46 @@ class receiveDataThread(threading.Thread):
return return
# SSL in the background should be blocking, otherwise the error handling is difficult # SSL in the background should be blocking, otherwise the error handling is difficult
self.sslSock.settimeout(None) self.sslSock.settimeout(None)
def peerValidityChecks(self):
if self.remoteProtocolVersion < 3:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your is using an old protocol. Closing connection.")))
logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer))
return False
if self.timeOffset > 3600:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection.")))
logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, self.timeOffset))
shared.timeOffsetWrongCount += 1
time.sleep(2)
return False
elif self.timeOffset < -3600:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection.")))
logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, self.timeOffset))
shared.timeOffsetWrongCount += 1
return False
else:
shared.timeOffsetWrongCount = 0
if len(self.streamNumber) == 0:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="We don't have shared stream interests. Closing connection.")))
logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.')
return False
return True
def connectionFullyEstablished(self):
if self.connectionIsOrWasFullyEstablished:
# there is no reason to run this function a second time
return
self.sslHandshake()
if self.peerValidityChecks() == False:
time.sleep(2)
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
self.checkTimeOffsetNotification()
return
self.connectionIsOrWasFullyEstablished = True
shared.timeOffsetWrongCount = 0
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also # Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', (self.services, self.sslSock))) self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', (self.services, self.sslSock)))
@ -349,7 +379,7 @@ class receiveDataThread(threading.Thread):
self.sendaddr() # This is one large addr message to this one peer. self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > 200: if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
logger.info ('We are connected to too many people. Closing connection.') logger.info ('We are connected to too many people. Closing connection.')
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Server full, please try again later.")))
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
return return
self.sendBigInv() self.sendBigInv()
@ -706,31 +736,12 @@ class receiveDataThread(threading.Thread):
ignore this version message ignore this version message
""" """
return return
self.remoteProtocolVersion, = unpack('>L', data[:4]) self.remoteProtocolVersion, = unpack('>L', data[:4])
self.services, = unpack('>q', data[4:12]) self.services, = unpack('>q', data[4:12])
if self.remoteProtocolVersion < 3:
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer))
return
timestamp, = unpack('>Q', data[12:20]) timestamp, = unpack('>Q', data[12:20])
timeOffset = timestamp - int(time.time()) self.timeOffset = timestamp - int(time.time())
if timeOffset > 3600:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the future compared to mine. Closing connection.")))
logger.info("%s's time is too far in the future (%s seconds). Closing connection to it." % (self.peer, timeOffset))
shared.timeOffsetWrongCount += 1
time.sleep(2)
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
return
elif timeOffset < -3600:
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleErrorMessage(fatal=2, errorText="Your time is too far in the past compared to mine. Closing connection.")))
logger.info("%s's time is too far in the past (timeOffset %s seconds). Closing connection to it." % (self.peer, timeOffset))
shared.timeOffsetWrongCount += 1
time.sleep(2)
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
return
else:
shared.timeOffsetWrongCount = 0
self.checkTimeOffsetNotification()
self.myExternalIP = socket.inet_ntoa(data[40:44]) self.myExternalIP = socket.inet_ntoa(data[40:44])
# print 'myExternalIP', self.myExternalIP # print 'myExternalIP', self.myExternalIP
@ -739,13 +750,13 @@ class receiveDataThread(threading.Thread):
useragentLength, lengthOfUseragentVarint = decodeVarint( useragentLength, lengthOfUseragentVarint = decodeVarint(
data[80:84]) data[80:84])
readPosition = 80 + lengthOfUseragentVarint readPosition = 80 + lengthOfUseragentVarint
useragent = data[readPosition:readPosition + useragentLength] self.userAgent = data[readPosition:readPosition + useragentLength]
# version check # version check
try: try:
userAgentName, userAgentVersion = useragent[1:-1].split(":", 2) userAgentName, userAgentVersion = userAgent[1:-1].split(":", 2)
except: except:
userAgentName = useragent userAgentName = self.userAgent
userAgentVersion = "0.0.0" userAgentVersion = "0.0.0"
if userAgentName == "PyBitmessage": if userAgentName == "PyBitmessage":
myVersion = [int(n) for n in softwareVersion.split(".")] myVersion = [int(n) for n in softwareVersion.split(".")]
@ -770,16 +781,11 @@ class receiveDataThread(threading.Thread):
newStreamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:]) newStreamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:])
readPosition += lengthOfRemoteStreamNumber readPosition += lengthOfRemoteStreamNumber
self.remoteStreams.append(newStreamNumber) self.remoteStreams.append(newStreamNumber)
logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', useragent, ', '.join(str(x) for x in self.remoteStreams), timeOffset) logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', self.userAgent, ', '.join(str(x) for x in self.remoteStreams), self.timeOffset)
# find shared streams # find shared streams
self.streamNumber = sorted(set(state.streamsInWhichIAmParticipating).intersection(self.remoteStreams)) self.streamNumber = sorted(set(state.streamsInWhichIAmParticipating).intersection(self.remoteStreams))
if len(self.streamNumber) == 0:
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.')
return
shared.connectedHostsList[ shared.connectedHostsList[
self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.remoteStreams)) self.sendDataThreadQueue.put((0, 'setStreamNumber', self.remoteStreams))
@ -810,7 +816,6 @@ class receiveDataThread(threading.Thread):
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage( self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage(
self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection))) self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection)))
# Sends a verack message # Sends a verack message
def sendverack(self): def sendverack(self):
logger.debug('Sending verack') logger.debug('Sending verack')