From ba130e03e5165dc0eee060b08112b0a4043bd698 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 2 Feb 2017 15:52:32 +0100 Subject: [PATCH] Network subsystem freezing fixes - queues were too short - some error handling was missing - remove nonblocking repeats in receive data thread - singleCleaner shouldn't wait unnecessarily --- src/class_outgoingSynSender.py | 2 +- src/class_receiveDataThread.py | 15 ++++++++------- src/class_singleCleaner.py | 3 ++- src/class_singleListener.py | 2 +- src/inventory.py | 3 ++- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index 11f755de..f154f455 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -195,7 +195,7 @@ class outgoingSynSender(threading.Thread, StoppableThread): self.sock.close() return someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. - sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. + sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sd = sendDataThread(sendDataThreadQueue) sd.setup(self.sock, peer.host, peer.port, self.streamNumber, diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 250ae355..6004de5c 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -82,7 +82,7 @@ class receiveDataThread(threading.Thread): def run(self): logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) - while True: + while state.shutdown == 0: dataLen = len(self.data) try: ssl = False @@ -99,12 +99,13 @@ class receiveDataThread(threading.Thread): logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')') break except socket.error as err: - if err.errno == 2 or (sys.platform == 'win32' and err.errno == 10035) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK): - if ssl: - select.select([self.sslSock], [], []) - else: - select.select([self.sock], [], []) - continue +# if err.errno == 2 or (sys.platform == 'win32' and err.errno == 10035) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK): +# if ssl: +# select.select([self.sslSock], [], []) +# else: +# select.select([self.sock], [], []) +# logger.error('sock.recv retriable error') +# continue logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err)) if self.initiatedConnection and not self.connectionIsOrWasFullyEstablished: shared.timeOffsetWrongCount += 1 diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index a4177986..ab16e985 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -111,7 +111,8 @@ class singleCleaner(threading.Thread, StoppableThread): os._exit(0) shared.knownNodesLock.release() shared.needToWriteKnownNodesToDisk = False - self.stop.wait(300) + if state.shutdown == 0: + self.stop.wait(300) def resendPubkeyRequest(address): diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 65c0a8a8..49acc71c 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -134,7 +134,7 @@ class singleListener(threading.Thread, StoppableThread): break someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. - sendDataThreadQueue = Queue.Queue(100) # Used to submit information to the send data thread for this connection. + sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. socketObject.settimeout(20) sd = sendDataThread(sendDataThreadQueue) diff --git a/src/inventory.py b/src/inventory.py index d175035d..2bdb3629 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -171,7 +171,8 @@ class PendingDownload(object): pass for objectHash in unreachableObjects: with self.lock: - del self.hashes[objectHash] + if objectHash in self.hashes: + del self.hashes[objectHash] # logger.debug("Pull took %.3f seconds", time.time() - start) return objectHashes