From 89baf6258b6c1a56042cae57f4c8ff2fe5430a9d Mon Sep 17 00:00:00 2001 From: TheKysek Date: Tue, 19 Jul 2016 12:48:04 +0200 Subject: [PATCH] Tweaks to connection logic --- src/connection.py | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/connection.py b/src/connection.py index 36f5fce..7013d4e 100644 --- a/src/connection.py +++ b/src/connection.py @@ -23,8 +23,11 @@ class Connection(threading.Thread): self.vectors_to_get = set() self.status = 'ready' - self.sent_verack = False - self.sent_big_inv_message = False + + self.tls = False + + self.verack_received = False + self.verack_sent = False self.host = host self.port = int(port) @@ -67,9 +70,8 @@ class Connection(threading.Thread): data = None if time.time() - self.last_message_sent > 300 and self.status == 'fully_established': self.send_queue.put(message.Message(b'pong', b'')) - if not self.sent_big_inv_message and self.status == 'verack_received' and self.sent_verack: - self._on_connection_fully_established() except ConnectionResetError: + logging.debug('Disconnecting from {};{}. Reason: ConnectionResetError'.format(self.host, self.port)) data = None self._process_buffer() self._request_objects() @@ -117,6 +119,7 @@ class Connection(threading.Thread): print(e) break self.s.settimeout(0.5) + self.tls = True logging.debug('Established TLS connection with {}:{}'.format(self.host, self.port)) def _send_message(self, m): @@ -130,20 +133,24 @@ class Connection(threading.Thread): def _on_connection_fully_established(self): self.status = 'fully_established' - time.sleep(2) + logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host, self.port)) + if self.remote_version.services & 2: # NODE_SSL + self._do_tls_handshake() with shared.objects_lock: self.send_queue.put(message.Inv({vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()})) addr = {structure.NetAddr(1, c.host, c.port) for c in shared.connections.copy() if not c.server and c.status == 'fully_established'} if len(addr) != 0: self.send_queue.put(message.Addr(addr)) - self.sent_big_inv_message = True def _process_queue(self): while not self.send_queue.empty(): m = self.send_queue.get() if m: - self._send_message(m) - self.last_message_sent = time.time() + if m == 'fully_established': + self._on_connection_fully_established() + else: + self._send_message(m) + self.last_message_sent = time.time() else: self.status = 'disconnecting' break @@ -172,20 +179,21 @@ class Connection(threading.Thread): self.send_queue.put(None) else: self.send_queue.put(message.Message(b'verack', b'')) - self.sent_verack = True + self.verack_sent = True self.remote_version = version if not self.server: - self._do_tls_handshake() + self.send_queue.put('fully_established') shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port)) shared.node_pool.add((self.host, self.port)) shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port)) if self.server: self.send_queue.put(message.Version(self.host, self.port)) - self._do_tls_handshake() elif m.command == b'verack': - self.status = 'verack_received' + self.verack_received = True logging.debug('{}:{} -> {}'.format(self.host, self.port, 'verack')) - logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host, self.port)) + if self.server: + self.send_queue.put('fully_established') + elif m.command == b'inv': inv = message.Inv.from_message(m) logging.debug('{}:{} -> {}'.format(self.host, self.port, inv))