Network subsystem freezing fixes

- queues were too short
- some error handling was missing
- remove nonblocking repeats in receive data thread
- singleCleaner shouldn't wait unnecessarily
This commit is contained in:
Peter Šurda 2017-02-02 15:52:32 +01:00
parent 4f70eaa01f
commit ba130e03e5
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 14 additions and 11 deletions

View File

@ -195,7 +195,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
self.sock.close() self.sock.close()
return return
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber, sd.setup(self.sock, peer.host, peer.port, self.streamNumber,

View File

@ -82,7 +82,7 @@ class receiveDataThread(threading.Thread):
def run(self): def run(self):
logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
while True: while state.shutdown == 0:
dataLen = len(self.data) dataLen = len(self.data)
try: try:
ssl = False ssl = False
@ -99,12 +99,13 @@ class receiveDataThread(threading.Thread):
logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')') logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
break break
except socket.error as err: except socket.error as err:
if err.errno == 2 or (sys.platform == 'win32' and err.errno == 10035) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK): # if err.errno == 2 or (sys.platform == 'win32' and err.errno == 10035) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK):
if ssl: # if ssl:
select.select([self.sslSock], [], []) # select.select([self.sslSock], [], [])
else: # else:
select.select([self.sock], [], []) # select.select([self.sock], [], [])
continue # logger.error('sock.recv retriable error')
# continue
logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err)) logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err))
if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished:
shared.timeOffsetWrongCount += 1 shared.timeOffsetWrongCount += 1

View File

@ -111,6 +111,7 @@ class singleCleaner(threading.Thread, StoppableThread):
os._exit(0) os._exit(0)
shared.knownNodesLock.release() shared.knownNodesLock.release()
shared.needToWriteKnownNodesToDisk = False shared.needToWriteKnownNodesToDisk = False
if state.shutdown == 0:
self.stop.wait(300) self.stop.wait(300)

View File

@ -134,7 +134,7 @@ class singleListener(threading.Thread, StoppableThread):
break break
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
socketObject.settimeout(20) socketObject.settimeout(20)
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)

View File

@ -171,6 +171,7 @@ class PendingDownload(object):
pass pass
for objectHash in unreachableObjects: for objectHash in unreachableObjects:
with self.lock: with self.lock:
if objectHash in self.hashes:
del self.hashes[objectHash] del self.hashes[objectHash]
# logger.debug("Pull took %.3f seconds", time.time() - start) # logger.debug("Pull took %.3f seconds", time.time() - start)
return objectHashes return objectHashes