From d4f5d370d46b2a069bf6d50c912affd3851e2b90 Mon Sep 17 00:00:00 2001 From: TheKysek Date: Mon, 20 Mar 2017 20:52:53 +0100 Subject: [PATCH] Rework object sending logic --- src/connection.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/connection.py b/src/connection.py index 8ffd045..4089430 100644 --- a/src/connection.py +++ b/src/connection.py @@ -22,6 +22,7 @@ class Connection(threading.Thread): self.send_queue = queue.Queue() self.vectors_to_get = set() + self.vectors_to_send = set() self.status = 'ready' @@ -76,11 +77,13 @@ class Connection(threading.Thread): except ssl.SSLWantReadError: if self.status == 'fully_established': self._request_objects() + self._send_objects() except socket.error as e: err = e.args[0] if err == errno.EAGAIN or err == errno.EWOULDBLOCK: if self.status == 'fully_established': self._request_objects() + self._send_objects() else: logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host, self.port, e)) data = None @@ -270,9 +273,7 @@ class Connection(threading.Thread): elif m.command == b'getdata': getdata = message.GetData.from_message(m) logging.debug('{}:{} -> {}'.format(self.host, self.port, getdata)) - for vector in getdata.vectors: - if vector in shared.objects: - self.send_queue.put(message.Message(b'object', shared.objects[vector].to_bytes())) + self.vectors_to_send.update(getdata.vectors) elif m.command == b'addr': addr = message.Addr.from_message(m) logging.debug('{}:{} -> {}'.format(self.host, self.port, addr)) @@ -295,3 +296,17 @@ class Connection(threading.Thread): else: self.send_queue.put(message.GetData(self.vectors_to_get)) self.vectors_to_get.clear() + + def _send_objects(self): + if self.vectors_to_send: + if len(self.vectors_to_send) > 16: + to_send = random.sample(self.vectors_to_send, 16) + self.vectors_to_send.difference_update(to_send) + else: + to_send = self.vectors_to_send.copy() + self.vectors_to_send.clear() + with shared.objects_lock: + for vector in to_send: + obj = shared.objects.get(vector, None) + if obj: + self.send_queue.put(message.Message(b'object', obj.to_bytes()))