From 4c9006a632761ff63902307eba1b96d8ad39ff58 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 16 Oct 2017 08:07:32 +0200 Subject: [PATCH] Asyncore performance optimisation - use bytearray instead of strings for buffers --- src/network/advanceddispatcher.py | 25 ++++++++++++++++--------- src/network/bmproto.py | 18 +++++++++--------- src/network/downloadthread.py | 5 ++++- src/network/udp.py | 4 ++-- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index eb636aed..576eed39 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -12,8 +12,8 @@ class AdvancedDispatcher(asyncore.dispatcher): def __init__(self, sock=None): if not hasattr(self, '_map'): asyncore.dispatcher.__init__(self, sock) - self.read_buf = b"" - self.write_buf = b"" + self.read_buf = bytearray() + self.write_buf = bytearray() self.state = "init" self.lastTx = time.time() self.sentBytes = 0 @@ -25,18 +25,23 @@ class AdvancedDispatcher(asyncore.dispatcher): def append_write_buf(self, data): if data: - with self.writeLock: - self.write_buf += data + if isinstance(data, list): + with self.writeLock: + for chunk in data: + self.write_buf.extend(chunk) + else: + with self.writeLock: + self.write_buf.extend(data) def slice_write_buf(self, length=0): if length > 0: with self.writeLock: - self.write_buf = self.write_buf[length:] + del self.write_buf[0:length] def slice_read_buf(self, length=0): if length > 0: with self.readLock: - self.read_buf = self.read_buf[length:] + del self.read_buf[0:length] def process(self): if not self.connected: @@ -85,7 +90,7 @@ class AdvancedDispatcher(asyncore.dispatcher): self.receivedBytes += len(newData) asyncore.update_received(len(newData)) with self.readLock: - self.read_buf += newData + self.read_buf.extend(newData) def handle_write(self): self.lastTx = time.time() @@ -108,7 +113,9 @@ class AdvancedDispatcher(asyncore.dispatcher): return False def handle_close(self): - self.read_buf = b"" - self.write_buf = b"" + with self.readLock: + self.read_buf = bytearray() + with self.writeLock: + self.write_buf = bytearray() self.state = "close" asyncore.dispatcher.close(self) diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 8099c5fc..a086fde0 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -138,16 +138,16 @@ class BMProto(AdvancedDispatcher, ObjectTracker): def decode_payload_node(self): services, host, port = self.decode_payload_content("Q16sH") if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - host = socket.inet_ntop(socket.AF_INET, host[12:]) + host = socket.inet_ntop(socket.AF_INET, buffer(host, 12, 4)) elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': # Onion, based on BMD/bitcoind host = base64.b32encode(host[6:]).lower() + ".onion" else: - host = socket.inet_ntop(socket.AF_INET6, host) + host = socket.inet_ntop(socket.AF_INET6, buffer(host)) if host == "": # This can happen on Windows systems which are not 64-bit compatible # so let us drop the IPv6 address. - host = socket.inet_ntop(socket.AF_INET, host[12:]) + host = socket.inet_ntop(socket.AF_INET, buffer(host, 12, 4)) return Node(services, host, port) @@ -272,7 +272,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return True #TODO make this more asynchronous random.shuffle(items) - for i in items: + for i in map(str, items): if i in DandelionStems().stem and \ self != DandelionStems().stem[i]: self.antiIntersectionDelay() @@ -298,7 +298,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): else: pass - for i in items: + for i in map(str, items): self.handleReceivedInventory(i) return True @@ -323,7 +323,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.dandelionRoutes = BMConnectionPool.dandelionRouteSelector(self) self.dandelionRefresh = time.time() + REASSIGN_INTERVAL - for i in items: + for i in map(str, items): DandelionStems().add(i, self, self.dandelionRoutes) self.handleReceivedInventory(i) @@ -354,12 +354,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker): try: self.object.checkObjectByType() - objectProcessorQueue.put((self.object.objectType, self.object.data)) + objectProcessorQueue.put((self.object.objectType, buffer(self.object.data))) except BMObjectInvalidError as e: BMProto.stopDownloadingObject(self.object.inventoryHash, True) Inventory()[self.object.inventoryHash] = ( - self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) + self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag)) invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination)) return True @@ -370,7 +370,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): addresses = self._decode_addr() for i in addresses: seenTime, stream, services, ip, port = i - decodedIP = protocol.checkIPAddress(ip) + decodedIP = protocol.checkIPAddress(buffer(ip)) if stream not in state.streamsInWhichIAmParticipating: continue if decodedIP is not False and seenTime > time.time() - BMProto.addressAlive: diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 34f23eed..98b6df05 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -55,7 +55,10 @@ class DownloadThread(threading.Thread, StoppableThread): i.objectsNewToMe[k] = False self.pending[k] = now - payload = addresses.encodeVarint(len(request)) + ''.join(request) + payload = bytearray() + payload.extend(addresses.encodeVarint(len(request))) + for chunk in request: + payload.extend(chunk) i.append_write_buf(protocol.CreatePacket('getdata', payload)) logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) requested += len(request) diff --git a/src/network/udp.py b/src/network/udp.py index bd867125..3d238a5e 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -89,7 +89,7 @@ class UDPSocket(BMProto): remoteport = False for i in addresses: seenTime, stream, services, ip, port = i - decodedIP = protocol.checkIPAddress(ip) + decodedIP = protocol.checkIPAddress(buffer(ip)) if stream not in state.streamsInWhichIAmParticipating: continue if seenTime < time.time() - BMProto.maxTimeOffset or seenTime > time.time() + BMProto.maxTimeOffset: @@ -142,7 +142,7 @@ class UDPSocket(BMProto): else: self.local = False # overwrite the old buffer to avoid mixing data and so that self.local works correctly - self.read_buf = recdata + self.read_buf[0:] = recdata self.bm_proto_reset() receiveDataQueue.put(self.listening)