From 2d7d9c2f929a230bd9af18721aa52ba902d7e0be Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 8 Jul 2017 06:54:25 +0200 Subject: [PATCH] Asyncore update - request downloads in bigger chunks - don't put whole objects into the receiveDataQueue --- src/network/downloadthread.py | 4 ++-- src/network/receivequeuethread.py | 26 +++++++++++++++++++++++--- src/network/tcp.py | 4 ++-- src/network/tls.py | 2 +- src/network/udp.py | 2 +- 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index c42d7e1c..d1c7b0f4 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -11,8 +11,8 @@ from network.connectionpool import BMConnectionPool import protocol class DownloadThread(threading.Thread, StoppableThread): - maxPending = 50 - requestChunk = 100 + maxPending = 200 + requestChunk = 1000 requestTimeout = 60 cleanInterval = 60 requestExpires = 600 diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index f1e81a0d..5e707398 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -24,17 +24,37 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): def run(self): while not self._stopped and state.shutdown == 0: try: - connection = receiveDataQueue.get(block=True, timeout=1) + dest = receiveDataQueue.get(block=True, timeout=1) receiveDataQueue.task_done() except Queue.Empty: continue if self._stopped: break + # cycle as long as there is data # methods should return False if there isn't enough data, or the connection is to be aborted + + # state_* methods should return False if there isn't enough data, + # or the connection is to be aborted + try: - connection.process() + BMConnectionPool().inboundConnections[dest].process() + except KeyError: + pass + except AttributeError: + logger.error("Unknown state %s, ignoring", connection.state) + + try: + BMConnectionPool().outboundConnections[dest].process() + except KeyError: + pass + except AttributeError: + logger.error("Unknown state %s, ignoring", connection.state) + + try: + BMConnectionPool().udpSockets[dest].process() + except KeyError: + pass except AttributeError: - # missing command logger.error("Unknown state %s, ignoring", connection.state) diff --git a/src/network/tcp.py b/src/network/tcp.py index 1a4a906c..75ddea1c 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -190,7 +190,7 @@ class TCPConnection(BMProto, TLSDispatcher): self.append_write_buf(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) #print "%s:%i: Sending version" % (self.destination.host, self.destination.port) self.connectedAt = time.time() - receiveDataQueue.put(self) + receiveDataQueue.put(self.destination) def handle_read(self): TLSDispatcher.handle_read(self) @@ -201,7 +201,7 @@ class TCPConnection(BMProto, TLSDispatcher): knownnodes.knownNodes[s][self.destination]["lastseen"] = time.time() except KeyError: pass - receiveDataQueue.put(self) + receiveDataQueue.put(self.destination) def handle_write(self): TLSDispatcher.handle_write(self) diff --git a/src/network/tls.py b/src/network/tls.py index 69fc2c20..379dae99 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -169,5 +169,5 @@ class TLSDispatcher(AdvancedDispatcher): self.bm_proto_reset() self.set_state("connection_fully_established") - receiveDataQueue.put(self) + receiveDataQueue.put(self.destination) return False diff --git a/src/network/udp.py b/src/network/udp.py index 824c9bfa..5c19fa69 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -139,7 +139,7 @@ class UDPSocket(BMProto): # overwrite the old buffer to avoid mixing data and so that self.local works correctly self.read_buf = recdata self.bm_proto_reset() - receiveDataQueue.put(self) + receiveDataQueue.put(self.destination) def handle_write(self): try: