From a98b8690d3e5276f47299366b44f1faaad6319f3 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 7 Jul 2017 07:55:29 +0200 Subject: [PATCH] Asyncore fixes - fix broken loops - optimise I/O tests --- src/network/advanceddispatcher.py | 54 ++++++++++++++----------------- src/network/receivequeuethread.py | 6 +--- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 57bd4f41..338e3bba 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -40,18 +40,14 @@ class AdvancedDispatcher(asyncore.dispatcher): def process(self): if not self.connected: - return - loop = 0 + return False while len(self.read_buf) >= self.expectBytes: - loop += 1 - if loop > 1000: - logger.error("Stuck at state %s, report this bug please", self.state) - break try: if getattr(self, "state_" + str(self.state))() is False: break except AttributeError: raise + return False def set_state(self, state, length=0, expectBytes=0): self.expectBytes = expectBytes @@ -59,39 +55,39 @@ class AdvancedDispatcher(asyncore.dispatcher): self.state = state def writable(self): + self.uploadChunk = AdvancedDispatcher._buf_len + if asyncore.maxUploadRate > 0: + self.uploadChunk = asyncore.uploadBucket + self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) return asyncore.dispatcher.writable(self) and \ - (self.connecting or self.write_buf) + (self.connecting or self.uploadChunk > 0) def readable(self): + self.downloadChunk = AdvancedDispatcher._buf_len + if asyncore.maxDownloadRate > 0: + self.downloadChunk = asyncore.downloadBucket + try: + if self.expectBytes > 0 and not self.fullyEstablished: + self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf)) + except AttributeError: + pass return asyncore.dispatcher.readable(self) and \ - (self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len) + (self.connecting or self.downloadChunk > 0) def handle_read(self): self.lastTx = time.time() - downloadBytes = AdvancedDispatcher._buf_len - if asyncore.maxDownloadRate > 0: - downloadBytes = asyncore.downloadBucket - if self.expectBytes > 0 and downloadBytes > self.expectBytes - len(self.read_buf): - downloadBytes = self.expectBytes - len(self.read_buf) - if downloadBytes > 0: - newData = self.recv(downloadBytes) - self.receivedBytes += len(newData) - asyncore.update_received(len(newData)) - with self.readLock: - self.read_buf += newData + newData = self.recv(self.downloadChunk) + self.receivedBytes += len(newData) + asyncore.update_received(len(newData)) + with self.readLock: + self.read_buf += newData def handle_write(self): self.lastTx = time.time() - bufSize = AdvancedDispatcher._buf_len - if asyncore.maxUploadRate > 0: - bufSize = asyncore.uploadBucket - if bufSize <= 0: - return - if self.write_buf: - written = self.send(self.write_buf[0:bufSize]) - asyncore.update_sent(written) - self.sentBytes += written - self.slice_write_buf(written) + written = self.send(self.write_buf[0:self.uploadChunk]) + asyncore.update_sent(written) + self.sentBytes += written + self.slice_write_buf(written) def handle_connect_event(self): try: diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 120d15e2..f1e81a0d 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -34,11 +34,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): # cycle as long as there is data # methods should return False if there isn't enough data, or the connection is to be aborted try: - while connection.process(): - pass + connection.process() except AttributeError: # missing command logger.error("Unknown state %s, ignoring", connection.state) - - def stopThread(self): - super(ReceiveQueueThread, self).stopThread()