From fa9ad537a576588f2f5f49e316dee417ed5c4dae Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 27 May 2017 19:39:19 +0200 Subject: [PATCH] Add task_done to asyncore-related queues --- src/network/advanceddispatcher.py | 3 +++ src/network/connectionchooser.py | 7 +++++-- src/network/receivequeuethread.py | 2 ++ src/network/udp.py | 5 +++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 938eb11d..947824bd 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -80,6 +80,7 @@ class AdvancedDispatcher(asyncore.dispatcher): while len(self.write_buf) < bufSize: try: self.write_buf += self.writeQueue.get(False) + self.writeQueue.task_done() except Queue.Empty: break if len(self.write_buf) > 0: @@ -103,11 +104,13 @@ class AdvancedDispatcher(asyncore.dispatcher): while True: try: self.writeQueue.get(False) + self.writeQueue.task_done() except Queue.Empty: break while True: try: self.receiveQueue.get(False) + self.receiveQueue.task_done() except Queue.Empty: break asyncore.dispatcher.close(self) diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 05ef47bd..1e26b994 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -11,9 +11,12 @@ def chooseConnection(stream): return state.trustedPeer else: try: - return portCheckerQueue.get(False) + retval = portCheckerQueue.get(False) + portCheckerQueue.task_done() except Queue.Empty: try: - return peerDiscoveryQueue.get(False) + retval = peerDiscoveryQueue.get(False) + peerDiscoveryQueue.task_done() except Queue.Empty: return random.choice(knownnodes.knownNodes[stream].keys()) + return retval diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 27a01902..c5509b65 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -35,7 +35,9 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): processed += 1 try: getattr(self, "command_" + str(command))(i, args) + i.receiveQueue.task_done() except AttributeError: + i.receiveQueue.task_done() # missing command raise if processed == 0: diff --git a/src/network/udp.py b/src/network/udp.py index 81bcc06a..4bb78823 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -137,7 +137,6 @@ class UDPSocket(BMProto): return len(self.read_buf) < AdvancedDispatcher._buf_len def handle_read(self): - print "read!" try: (addr, recdata) = self.socket.recvfrom(AdvancedDispatcher._buf_len) except socket.error as e: @@ -150,6 +149,7 @@ class UDPSocket(BMProto): self.local = True else: self.local = False + print "read %ib" % (len(recdata)) # overwrite the old buffer to avoid mixing data and so that self.local works correctly self.read_buf = data self.process() @@ -162,9 +162,10 @@ class UDPSocket(BMProto): return try: retval = self.socket.sendto(data, ('', UDPSocket.port)) -# print "broadcasted %ib" % (retval) + print "broadcasted %ib" % (retval) except socket.error as e: print "socket error on sendato: %s" % (e) + self.writeQueue.task_done() def close(self, reason=None): self.set_state("close")