diff --git a/minode/advertiser.py b/minode/advertiser.py index 16d15da..5f385c8 100644 --- a/minode/advertiser.py +++ b/minode/advertiser.py @@ -34,7 +34,8 @@ class Advertiser(threading.Thread): while not shared.address_advertise_queue.empty(): addr = shared.address_advertise_queue.get() if addr.port == 'i2p': - # We should not try to construct Addr messages with I2P destinations (yet) + # We should not try to construct Addr messages + # with I2P destinations (yet) continue addresses_to_advertise.add(addr) if len(addresses_to_advertise) > 0: diff --git a/minode/connection.py b/minode/connection.py index 233d801..6da24f8 100644 --- a/minode/connection.py +++ b/minode/connection.py @@ -14,7 +14,10 @@ from . import message, shared, structure class Connection(threading.Thread): - def __init__(self, host, port, s=None, network='ip', server=False, i2p_remote_dest=b''): + def __init__( + self, host, port, s=None, network='ip', server=False, + i2p_remote_dest=b'' + ): self.host = host self.port = port self.network = network @@ -72,17 +75,21 @@ class Connection(threading.Thread): else: self.send_queue.put(message.Version('127.0.0.1', 7656)) while True: - if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive): + if ( + self.on_connection_fully_established_scheduled + and not (self.buffer_send or self.buffer_receive) + ): self._on_connection_fully_established() data = True try: if self.status == 'fully_established': - data = self.s.recv(4096) - self.buffer_receive += data - if data and len(self.buffer_receive) < 4000000: - continue + data = self.s.recv(4096) + self.buffer_receive += data + if data and len(self.buffer_receive) < 4000000: + continue else: - data = self.s.recv(self.next_message_size - len(self.buffer_receive)) + data = self.s.recv( + self.next_message_size - len(self.buffer_receive)) self.buffer_receive += data except ssl.SSLWantReadError: if self.status == 'fully_established': @@ -90,49 +97,68 @@ class Connection(threading.Thread): self._send_objects() except socket.error as e: err = e.args[0] - if err == errno.EAGAIN or err == errno.EWOULDBLOCK: + if err in (errno.EAGAIN, errno.EWOULDBLOCK): if self.status == 'fully_established': self._request_objects() self._send_objects() else: - logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e)) + logging.debug( + 'Disconnecting from %s:%s. Reason: %s', + self.host_print, self.port, e) data = None except ConnectionResetError: - logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host_print, self.port)) + logging.debug( + 'Disconnecting from %s:%s. Reason: ConnectionResetError', + self.host_print, self.port) self.status = 'disconnecting' self._process_buffer_receive() self._process_queue() self._send_data() if time.time() - self.last_message_received > shared.timeout: logging.debug( - 'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format( - self.host_print, self.port)) + 'Disconnecting from %s:%s. Reason:' + ' time.time() - self.last_message_received' + ' > shared.timeout', self.host_print, self.port) self.status = 'disconnecting' - if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting': + if ( + time.time() - self.last_message_received > 30 + and self.status != 'fully_established' + and self.status != 'disconnecting' + ): logging.debug( - 'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format( - self.host_print, self.port)) + 'Disconnecting from %s:%s. Reason:' + ' time.time() - self.last_message_received > 30' + ' and self.status != "fully_established"', + self.host_print, self.port) self.status = 'disconnecting' - if time.time() - self.last_message_sent > 300 and self.status == 'fully_established': + if ( + time.time() - self.last_message_sent > 300 + and self.status == 'fully_established' + ): self.send_queue.put(message.Message(b'pong', b'')) if self.status == 'disconnecting' or shared.shutting_down: data = None if not data: self.status = 'disconnected' self.s.close() - logging.info('Disconnected from {}:{}'.format(self.host_print, self.port)) + logging.info( + 'Disconnected from %s:%s', self.host_print, self.port) break time.sleep(0.2) def _connect(self): - logging.debug('Connecting to {}:{}'.format(self.host_print, self.port)) + logging.debug('Connecting to %s:%s', self.host_print, self.port) try: self.s = socket.create_connection((self.host, self.port), 10) self.status = 'connected' - logging.info('Established TCP connection to {}:{}'.format(self.host_print, self.port)) + logging.info( + 'Established TCP connection to %s:%s', + self.host_print, self.port) except Exception as e: - logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host_print, self.port, e)) + logging.warning( + 'Connection to %s:%s failed. Reason: %s', + self.host_print, self.port, e) self.status = 'failed' def _send_data(self): @@ -142,27 +168,38 @@ class Connection(threading.Thread): self.buffer_send = self.buffer_send[amount:] except (BlockingIOError, ssl.SSLWantWriteError): pass - except (BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError) as e: - logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e)) + except ( + BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError + ) as e: + logging.debug( + 'Disconnecting from %s:%s. Reason: %s', + self.host_print, self.port, e) self.status = 'disconnecting' def _do_tls_handshake(self): - logging.debug('Initializing TLS connection with {}:{}'.format(self.host_print, self.port)) + logging.debug( + 'Initializing TLS connection with %s:%s', + self.host_print, self.port) context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE - if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not ssl.OPENSSL_VERSION.startswith("LibreSSL"): - # OpenSSL>=1.1 + if ( + ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 + and not ssl.OPENSSL_VERSION.startswith("LibreSSL") + ): # OpenSSL>=1.1 context.set_ciphers('AECDH-AES256-SHA@SECLEVEL=0') else: context.set_ciphers('AECDH-AES256-SHA') context.set_ecdh_curve("secp256k1") - context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE + context.options = ( + ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 + | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE) - self.s = context.wrap_socket(self.s, server_side=self.server, do_handshake_on_connect=False) + self.s = context.wrap_socket( + self.s, server_side=self.server, do_handshake_on_connect=False) while True: try: @@ -173,39 +210,57 @@ class Connection(threading.Thread): except ssl.SSLWantWriteError: select.select([], [self.s], []) except Exception as e: - logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e)) + logging.debug( + 'Disconnecting from %s:%s. Reason: %s', + self.host_print, self.port, e) self.status = 'disconnecting' break self.tls = True - logging.debug('Established TLS connection with {}:{}'.format(self.host_print, self.port)) + logging.debug( + 'Established TLS connection with %s:%s', + self.host_print, self.port) def _send_message(self, m): - if type(m) == message.Message and m.command == b'object': - logging.debug('{}:{} <- {}'.format(self.host_print, self.port, structure.Object.from_message(m))) + if isinstance(m, message.Message) and m.command == b'object': + logging.debug( + '%s:%s <- %s', + self.host_print, self.port, structure.Object.from_message(m)) else: - logging.debug('{}:{} <- {}'.format(self.host_print, self.port, m)) + logging.debug('%s:%s <- %s', self.host_print, self.port, m) self.buffer_send += m.to_bytes() def _on_connection_fully_established(self): - logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host_print, self.port)) + logging.info( + 'Established Bitmessage protocol connection to %s:%s', + self.host_print, self.port) self.on_connection_fully_established_scheduled = False - if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL - self._do_tls_handshake() + if self.remote_version.services & 2 and self.network == 'ip': + self._do_tls_handshake() # NODE_SSL - addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'} + addr = { + structure.NetAddr(c.remote_version.services, c.host, c.port) + for c in shared.connections if c.network != 'i2p' + and c.server is False and c.status == 'fully_established'} if len(shared.node_pool) > 10: - addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)}) + addr.update({ + structure.NetAddr(1, a[0], a[1]) + for a in random.sample(shared.node_pool, 10)}) if len(shared.unchecked_node_pool) > 10: - addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)}) + addr.update({ + structure.NetAddr(1, a[0], a[1]) + for a in random.sample(shared.unchecked_node_pool, 10)}) if len(addr) != 0: self.send_queue.put(message.Addr(addr)) with shared.objects_lock: if len(shared.objects) > 0: - to_send = {vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()} + to_send = { + vector for vector in shared.objects.keys() + if shared.objects[vector].expires_time > time.time()} while len(to_send) > 0: if len(to_send) > 10000: - # We limit size of inv messaged to 10000 entries because they might time out in very slow networks (I2P) + # We limit size of inv messaged to 10000 entries + # because they might time out in very slow networks (I2P) pack = random.sample(to_send, 10000) self.send_queue.put(message.Inv(pack)) to_send.difference_update(pack) @@ -232,35 +287,47 @@ class Connection(threading.Thread): if self.next_header: self.next_header = False try: - h = message.Header.from_bytes(self.buffer_receive[:shared.header_length]) + h = message.Header.from_bytes( + self.buffer_receive[:shared.header_length]) except ValueError as e: self.status = 'disconnecting' - logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e)) + logging.warning( + 'Received malformed message from %s:%s: %s', + self.host_print, self.port, e) break self.next_message_size += h.payload_length else: try: - m = message.Message.from_bytes(self.buffer_receive[:self.next_message_size]) + m = message.Message.from_bytes( + self.buffer_receive[:self.next_message_size]) except ValueError as e: self.status = 'disconnecting' - logging.warning('Received malformed message from {}:{}, {}'.format(self.host_print, self.port, e)) + logging.warning( + 'Received malformed message from %s:%s, %s', + self.host_print, self.port, e) break self.next_header = True - self.buffer_receive = self.buffer_receive[self.next_message_size:] + self.buffer_receive = self.buffer_receive[ + self.next_message_size:] self.next_message_size = shared.header_length self.last_message_received = time.time() try: self._process_message(m) except ValueError as e: self.status = 'disconnecting' - logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e)) + logging.warning( + 'Received malformed message from %s:%s: %s', + self.host_print, self.port, e) break def _process_message(self, m): if m.command == b'version': version = message.Version.from_bytes(m.to_bytes()) - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, str(version))) - if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce: + logging.debug('%s:%s -> %s', self.host_print, self.port, version) + if ( + version.protocol_version != shared.protocol_version + or version.nonce == shared.nonce + ): self.status = 'disconnecting' self.send_queue.put(None) else: @@ -270,27 +337,31 @@ class Connection(threading.Thread): if not self.server: self.send_queue.put('fully_established') if self.network == 'ip': - shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port)) + shared.address_advertise_queue.put(structure.NetAddr( + version.services, self.host, self.port)) shared.node_pool.add((self.host, self.port)) elif self.network == 'i2p': shared.i2p_node_pool.add((self.host, 'i2p')) if self.network == 'ip': - shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port)) + shared.address_advertise_queue.put(structure.NetAddr( + shared.services, version.host, shared.listening_port)) if self.server: if self.network == 'ip': - self.send_queue.put(message.Version(self.host, self.port)) + self.send_queue.put( + message.Version(self.host, self.port)) else: self.send_queue.put(message.Version('127.0.0.1', 7656)) elif m.command == b'verack': self.verack_received = True - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack')) + logging.debug( + '%s:%s -> %s', self.host_print, self.port, 'verack') if self.server: self.send_queue.put('fully_established') elif m.command == b'inv': inv = message.Inv.from_message(m) - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, inv)) + logging.debug('%s:%s -> %s', self.host_print, self.port, inv) to_get = inv.vectors.copy() to_get.difference_update(shared.objects.keys()) self.vectors_to_get.update(to_get) @@ -299,39 +370,45 @@ class Connection(threading.Thread): elif m.command == b'object': obj = structure.Object.from_message(m) - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj)) + logging.debug('%s:%s -> %s', self.host_print, self.port, obj) self.vectors_requested.pop(obj.vector, None) self.vectors_to_get.discard(obj.vector) if obj.is_valid() and obj.vector not in shared.objects: with shared.objects_lock: shared.objects[obj.vector] = obj - if obj.object_type == shared.i2p_dest_obj_type and obj.version == shared.i2p_dest_obj_version: + if ( + obj.object_type == shared.i2p_dest_obj_type + and obj.version == shared.i2p_dest_obj_version + ): dest = base64.b64encode(obj.object_payload, altchars=b'-~') - logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool') + logging.debug( + 'Received I2P destination object,' + ' adding to i2p_unchecked_node_pool') logging.debug(dest) shared.i2p_unchecked_node_pool.add((dest, 'i2p')) shared.vector_advertise_queue.put(obj.vector) elif m.command == b'getdata': getdata = message.GetData.from_message(m) - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata)) + logging.debug('%s:%s -> %s', self.host_print, self.port, getdata) self.vectors_to_send.update(getdata.vectors) elif m.command == b'addr': addr = message.Addr.from_message(m) - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr)) + logging.debug('%s:%s -> %s', self.host_print, self.port, addr) for a in addr.addresses: shared.unchecked_node_pool.add((a.host, a.port)) elif m.command == b'ping': - logging.debug('{}:{} -> ping'.format(self.host_print, self.port)) + logging.debug('%s:%s -> ping', self.host_print, self.port) self.send_queue.put(message.Message(b'pong', b'')) elif m.command == b'error': - logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload)) + logging.error( + '%s:%s -> error: %s', self.host_print, self.port, m.payload) else: - logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m)) + logging.debug('%s:%s -> %s', self.host_print, self.port, m) def _request_objects(self): if self.vectors_to_get and len(self.vectors_requested) < 100: @@ -340,18 +417,28 @@ class Connection(threading.Thread): if len(self.vectors_to_get) > 64: pack = random.sample(self.vectors_to_get, 64) self.send_queue.put(message.GetData(pack)) - self.vectors_requested.update({vector: time.time() for vector in pack if vector not in self.vectors_requested}) + self.vectors_requested.update({ + vector: time.time() for vector in pack + if vector not in self.vectors_requested}) self.vectors_to_get.difference_update(pack) else: self.send_queue.put(message.GetData(self.vectors_to_get)) - self.vectors_requested.update({vector: time.time() for vector in self.vectors_to_get if vector not in self.vectors_requested}) + self.vectors_requested.update({ + vector: time.time() for vector in self.vectors_to_get + if vector not in self.vectors_requested}) self.vectors_to_get.clear() if self.vectors_requested: - self.vectors_requested = {vector: t for vector, t in self.vectors_requested.items() if vector not in shared.objects and t > time.time() - 15 * 60} - to_re_request = {vector for vector, t in self.vectors_requested.items() if t < time.time() - 10 * 60} + self.vectors_requested = { + vector: t for vector, t in self.vectors_requested.items() + if vector not in shared.objects and t > time.time() - 15 * 60} + to_re_request = { + vector for vector, t in self.vectors_requested.items() + if t < time.time() - 10 * 60} if to_re_request: self.vectors_to_get.update(to_re_request) - logging.debug('Re-requesting {} objects from {}:{}'.format(len(to_re_request), self.host_print, self.port)) + logging.debug( + 'Re-requesting %i objects from %s:%s', + len(to_re_request), self.host_print, self.port) def _send_objects(self): if self.vectors_to_send: diff --git a/minode/i2p/controller.py b/minode/i2p/controller.py index 0f2c89c..25f0997 100644 --- a/minode/i2p/controller.py +++ b/minode/i2p/controller.py @@ -23,7 +23,8 @@ class I2PController(threading.Thread): self.s = socket.create_connection((self.host, self.port)) break except ConnectionRefusedError: - logging.error("Error while connecting to I2P SAM bridge. Retrying.") + logging.error( + 'Error while connecting to I2P SAM bridge. Retrying.') time.sleep(10) self.version_reply = [] @@ -42,11 +43,11 @@ class I2PController(threading.Thread): def _receive_line(self): line = receive_line(self.s) - # logging.debug('I2PController <- ' + str(line)) + # logging.debug('I2PController <- %s', line) return line def _send(self, command): - # logging.debug('I2PController -> ' + str(command)) + # logging.debug('I2PController -> %s', command) self.s.sendall(command) def init_connection(self): @@ -78,7 +79,8 @@ class I2PController(threading.Thread): reply = self._receive_line().split() if b'RESULT=OK' not in reply: logging.warning(reply) - logging.warning('We could not create I2P session, retrying in 5 seconds.') + logging.warning( + 'We could not create I2P session, retrying in 5 seconds.') time.sleep(5) self.create_session() diff --git a/minode/i2p/dialer.py b/minode/i2p/dialer.py index 2afe84c..67cec5c 100644 --- a/minode/i2p/dialer.py +++ b/minode/i2p/dialer.py @@ -25,7 +25,7 @@ class I2PDialer(threading.Thread): self.success = True def run(self): - logging.debug('Connecting to {}'.format(self.destination)) + logging.debug('Connecting to %s', self.destination) self._connect() if not self.state.shutting_down and self.success: c = self.state.connection( @@ -36,22 +36,25 @@ class I2PDialer(threading.Thread): def _receive_line(self): line = receive_line(self.s) - # logging.debug('I2PDialer <- ' + str(line)) + # logging.debug('I2PDialer <- %s', line) return line def _send(self, command): - # logging.debug('I2PDialer -> ' + str(command)) + # logging.debug('I2PDialer -> %s', command) self.s.sendall(command) def _connect(self): self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n') self.version_reply = self._receive_line().split() if b'RESULT=OK' not in self.version_reply: - logging.warning('Error while connecting to {}'.format(self.destination)) + logging.warning('Error while connecting to %s', self.destination) self.success = False - self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n') + self._send( + b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + + self.destination + b'\n') reply = self._receive_line().split(b' ') if b'RESULT=OK' not in reply: - logging.warning('Error while connecting to {}'.format(self.destination)) + logging.warning( + 'Error while connecting to %s', self.destination) self.success = False diff --git a/minode/i2p/listener.py b/minode/i2p/listener.py index f8e6127..3737797 100644 --- a/minode/i2p/listener.py +++ b/minode/i2p/listener.py @@ -23,11 +23,11 @@ class I2PListener(threading.Thread): def _receive_line(self): line = receive_line(self.s) - # logging.debug('I2PListener <- ' + str(line)) + # logging.debug('I2PListener <- %s', line) return line def _send(self, command): - # logging.debug('I2PListener -> ' + str(command)) + # logging.debug('I2PListener -> %s', command) self.s.sendall(command) def new_socket(self): @@ -46,7 +46,8 @@ class I2PListener(threading.Thread): while not self.state.shutting_down: try: destination = self._receive_line().split()[0] - logging.info('Incoming I2P connection from: {}'.format(destination.decode())) + logging.info( + 'Incoming I2P connection from: %s', destination.decode()) hosts = set() for c in self.state.connections.copy(): diff --git a/minode/i2p/util.py b/minode/i2p/util.py index a1cdab2..f5fa651 100644 --- a/minode/i2p/util.py +++ b/minode/i2p/util.py @@ -16,11 +16,14 @@ def receive_line(s): def pub_from_priv(priv): priv = base64.b64decode(priv, altchars=b'-~') - # 256 for public key + 128 for signing key + 3 for certificate header + value of bytes priv[385:387] + # 256 for public key + 128 for signing key + 3 for certificate header + # + value of bytes priv[385:387] pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')] pub = base64.b64encode(pub, altchars=b'-~') return pub def b32_from_pub(pub): - return base64.b32encode(hashlib.sha256(base64.b64decode(pub, b'-~')).digest()).replace(b"=", b"").lower() + b'.b32.i2p' + return base64.b32encode( + hashlib.sha256(base64.b64decode(pub, b'-~')).digest() + ).replace(b"=", b"").lower() + b'.b32.i2p' diff --git a/minode/listener.py b/minode/listener.py index cb19c04..a413933 100644 --- a/minode/listener.py +++ b/minode/listener.py @@ -26,12 +26,12 @@ class Listener(threading.Thread): break try: conn, addr = self.s.accept() - logging.info('Incoming connection from: {}:{}'.format(addr[0], addr[1])) + logging.info('Incoming connection from: %s:%s', *addr) with shared.connections_lock: if len(shared.connections) > shared.connection_limit: conn.close() else: - c = Connection(addr[0], addr[1], conn, 'ip', True) + c = Connection(*addr, conn, server=True) c.start() shared.connections.add(c) except socket.timeout: diff --git a/minode/main.py b/minode/main.py index 407ae67..23287fb 100644 --- a/minode/main.py +++ b/minode/main.py @@ -24,18 +24,32 @@ def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument('-p', '--port', help='Port to listen on', type=int) parser.add_argument('--host', help='Listening host') - parser.add_argument('--debug', help='Enable debug logging', action='store_true') + parser.add_argument( + '--debug', action='store_true', help='Enable debug logging') parser.add_argument('--data-dir', help='Path to data directory') - parser.add_argument('--no-incoming', help='Do not listen for incoming connections', action='store_true') - parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true') - parser.add_argument('--no-ip', help='Do not use IP network', action='store_true') - parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to') - parser.add_argument('--connection-limit', help='Maximum number of connections', type=int) - parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true') - parser.add_argument('--i2p-tunnel-length', help='Length of I2P tunnels', type=int) - parser.add_argument('--i2p-sam-host', help='Host of I2P SAMv3 bridge') - parser.add_argument('--i2p-sam-port', help='Port of I2P SAMv3 bridge', type=int) - parser.add_argument('--i2p-transient', help='Generate new I2P destination on start', action='store_true') + parser.add_argument( + '--no-incoming', action='store_true', + help='Do not listen for incoming connections') + parser.add_argument( + '--no-outgoing', action='store_true', + help='Do not send outgoing connections') + parser.add_argument( + '--no-ip', action='store_true', help='Do not use IP network') + parser.add_argument( + '--trusted-peer', help='Specify a trusted peer we should connect to') + parser.add_argument( + '--connection-limit', type=int, help='Maximum number of connections') + parser.add_argument( + '--i2p', action='store_true', help='Enable I2P support (uses SAMv3)') + parser.add_argument( + '--i2p-tunnel-length', type=int, help='Length of I2P tunnels') + parser.add_argument( + '--i2p-sam-host', help='Host of I2P SAMv3 bridge') + parser.add_argument( + '--i2p-sam-port', type=int, help='Port of I2P SAMv3 bridge') + parser.add_argument( + '--i2p-transient', action='store_true', + help='Generate new I2P destination on start') args = parser.parse_args() if args.port: @@ -87,32 +101,44 @@ def parse_arguments(): def load_data(): try: - with open(shared.data_directory + 'objects.pickle', mode='br') as file: - shared.objects = pickle.load(file) + with open( + os.path.join(shared.data_directory, 'objects.pickle'), 'br' + ) as src: + shared.objects = pickle.load(src) except Exception as e: logging.warning('Error while loading objects from disk.') logging.warning(e) try: - with open(shared.data_directory + 'nodes.pickle', mode='br') as file: - shared.node_pool = pickle.load(file) + with open( + os.path.join(shared.data_directory, 'nodes.pickle'), 'br' + ) as src: + shared.node_pool = pickle.load(src) except Exception as e: logging.warning('Error while loading nodes from disk.') logging.warning(e) try: - with open(shared.data_directory + 'i2p_nodes.pickle', mode='br') as file: - shared.i2p_node_pool = pickle.load(file) + with open( + os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br' + ) as src: + shared.i2p_node_pool = pickle.load(src) except Exception as e: logging.warning('Error while loading nodes from disk.') logging.warning(e) - with open(os.path.join(shared.source_directory, 'core_nodes.csv'), mode='r', newline='') as f: - reader = csv.reader(f) + with open( + os.path.join(shared.source_directory, 'core_nodes.csv'), + 'r', newline='' + ) as src: + reader = csv.reader(src) shared.core_nodes = {tuple(row) for row in reader} shared.node_pool.update(shared.core_nodes) - with open(os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), mode='r', newline='') as f: + with open( + os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), + 'r', newline='' + ) as f: reader = csv.reader(f) shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader} shared.i2p_node_pool.update(shared.i2p_core_nodes) @@ -122,13 +148,16 @@ def bootstrap_from_dns(): try: for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): shared.unchecked_node_pool.add((item[4][0], 8080)) - logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') + logging.debug( + 'Adding %s to unchecked_node_pool' + ' based on DNS bootstrap method', item[4][0]) for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): shared.unchecked_node_pool.add((item[4][0], 8444)) - logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') - except Exception as e: - logging.error('Error during DNS bootstrap') - logging.error(e) + logging.debug( + 'Adding %s to unchecked_node_pool' + ' based on DNS bootstrap method', item[4][0]) + except Exception: + logging.error('Error during DNS bootstrap', exc_info=True) def start_ip_listener(): @@ -137,37 +166,48 @@ def start_ip_listener(): if socket.has_ipv6: try: - listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6) + listener_ipv6 = Listener( + shared.listening_host, + shared.listening_port, family=socket.AF_INET6) listener_ipv6.start() - except Exception as e: - logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port)) - logging.warning(e) + except Exception: + logging.warning( + 'Error while starting IPv6 listener on port %s', + shared.listening_port, exc_info=True) try: listener_ipv4 = Listener(shared.listening_host, shared.listening_port) listener_ipv4.start() - except Exception as e: + except Exception: if listener_ipv6: - logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + - 'However the IPv6 one seems to be working and will probably accept IPv4 connections.') + logging.warning( + 'Error while starting IPv4 listener on port %s.' + ' However the IPv6 one seems to be working' + ' and will probably accept IPv4 connections.', + shared.listening_port) else: - logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) + - 'You will not receive incoming connections. Please check your port configuration') - logging.error(e) + logging.error( + 'Error while starting IPv4 listener on port %s.' + 'You will not receive incoming connections.' + ' Please check your port configuration', + shared.listening_port, exc_info=True) def start_i2p_listener(): # Grab I2P destinations from old object file for obj in shared.objects.values(): if obj.object_type == shared.i2p_dest_obj_type: - shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) + shared.i2p_unchecked_node_pool.add(( + base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p')) dest_priv = b'' if not shared.i2p_transient: try: - with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file: - dest_priv = file.read() + with open( + os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'br' + ) as src: + dest_priv = src.read() logging.debug('Loaded I2P destination private key.') except Exception: logging.warning( @@ -183,8 +223,8 @@ def start_i2p_listener(): shared.i2p_dest_pub = i2p_controller.dest_pub shared.i2p_session_nick = i2p_controller.nick - logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode())) - logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode())) + logging.info('Local I2P destination: %s', shared.i2p_dest_pub.decode()) + logging.info('I2P session nick: %s', shared.i2p_session_nick.decode()) logging.info('Starting I2P Listener') i2p_listener = i2p.I2PListener(shared, i2p_controller.nick) @@ -192,20 +232,25 @@ def start_i2p_listener(): if not shared.i2p_transient: try: - with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file: - file.write(i2p_controller.dest_priv) + with open( + os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'bw' + ) as src: + src.write(i2p_controller.dest_priv) logging.debug('Saved I2P destination private key.') - except Exception as e: - logging.warning('Error while saving I2P destination private key.') - logging.warning(e) + except Exception: + logging.warning( + 'Error while saving I2P destination private key.', + exc_info=True) try: - with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file: - file.write(shared.i2p_dest_pub) + with open( + os.path.join(shared.data_directory, 'i2p_dest.pub'), 'bw' + ) as src: + src.write(shared.i2p_dest_pub) logging.debug('Saved I2P destination public key.') - except Exception as e: - logging.warning('Error while saving I2P destination public key.') - logging.warning(e) + except Exception: + logging.warning( + 'Error while saving I2P destination public key.', exc_info=True) def main(): @@ -214,16 +259,19 @@ def main(): parse_arguments() - logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s') + logging.basicConfig( + level=shared.log_level, + format='[%(asctime)s] [%(levelname)s] %(message)s') logging.info('Starting MiNode') - logging.info('Data directory: {}'.format(shared.data_directory)) + logging.info('Data directory: %s', shared.data_directory) if not os.path.exists(shared.data_directory): try: os.makedirs(shared.data_directory) - except Exception as e: - logging.warning('Error while creating data directory in: {}'.format(shared.data_directory)) - logging.warning(e) + except Exception: + logging.warning( + 'Error while creating data directory in: %s', + shared.data_directory, exc_info=True) load_data() @@ -231,15 +279,20 @@ def main(): bootstrap_from_dns() if shared.i2p_enabled: - # We are starting it before cleaning expired objects so we can collect I2P destination objects + # We are starting it before cleaning expired objects + # so we can collect I2P destination objects start_i2p_listener() for vector in set(shared.objects): if not shared.objects[vector].is_valid(): if shared.objects[vector].is_expired(): - logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode())) + logging.debug( + 'Deleted expired object: %s', + base64.b16encode(vector).decode()) else: - logging.warning('Deleted invalid object: {}'.format(base64.b16encode(vector).decode())) + logging.warning( + 'Deleted invalid object: %s', + base64.b16encode(vector).decode()) del shared.objects[vector] manager = Manager() diff --git a/minode/manager.py b/minode/manager.py index ad6c701..959c8a3 100644 --- a/minode/manager.py +++ b/minode/manager.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import base64 import logging +import os import pickle import queue import random @@ -20,7 +21,9 @@ class Manager(threading.Thread): self.last_cleaned_connections = time.time() self.last_pickled_objects = time.time() self.last_pickled_nodes = time.time() - self.last_published_i2p_destination = time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # Publish destination 5-15 minutes after start + # Publish destination 5-15 minutes after start + self.last_published_i2p_destination = \ + time.time() - 50 * 60 + random.uniform(-1, 1) * 300 def run(self): while True: @@ -51,7 +54,9 @@ class Manager(threading.Thread): if shared.objects[vector].is_expired(): with shared.objects_lock: del shared.objects[vector] - logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode())) + logging.debug( + 'Deleted expired object: %s', + base64.b16encode(vector).decode()) @staticmethod def manage_connections(): @@ -75,11 +80,15 @@ class Manager(threading.Thread): if shared.trusted_peer: to_connect.add(shared.trusted_peer) - if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections and not shared.trusted_peer: + if ( + outgoing_connections < shared.outgoing_connections + and shared.send_outgoing_connections and not shared.trusted_peer + ): if shared.ip_enabled: if len(shared.unchecked_node_pool) > 16: - to_connect.update(random.sample(shared.unchecked_node_pool, 16)) + to_connect.update(random.sample( + shared.unchecked_node_pool, 16)) else: to_connect.update(shared.unchecked_node_pool) shared.unchecked_node_pool.difference_update(to_connect) @@ -90,7 +99,8 @@ class Manager(threading.Thread): if shared.i2p_enabled: if len(shared.i2p_unchecked_node_pool) > 16: - to_connect.update(random.sample(shared.i2p_unchecked_node_pool, 16)) + to_connect.update( + random.sample(shared.i2p_unchecked_node_pool, 16)) else: to_connect.update(shared.i2p_unchecked_node_pool) shared.i2p_unchecked_node_pool.difference_update(to_connect) @@ -112,9 +122,10 @@ class Manager(threading.Thread): d.start() hosts.add(d.destination) shared.i2p_dialers.add(d) - except Exception as e: - logging.warning('Exception while trying to establish an I2P connection') - logging.warning(e) + except Exception: + logging.warning( + 'Exception while trying to establish' + ' an I2P connection', exc_info=True) else: continue else: @@ -128,9 +139,11 @@ class Manager(threading.Thread): @staticmethod def pickle_objects(): try: - with open(shared.data_directory + 'objects.pickle', mode='bw') as file: + with open( + os.path.join(shared.data_directory, 'objects.pickle'), 'bw' + ) as dst: with shared.objects_lock: - pickle.dump(shared.objects, file, protocol=3) + pickle.dump(shared.objects, dst, protocol=3) logging.debug('Saved objects') except Exception as e: logging.warning('Error while saving objects') @@ -141,27 +154,37 @@ class Manager(threading.Thread): if len(shared.node_pool) > 10000: shared.node_pool = set(random.sample(shared.node_pool, 10000)) if len(shared.unchecked_node_pool) > 1000: - shared.unchecked_node_pool = set(random.sample(shared.unchecked_node_pool, 1000)) + shared.unchecked_node_pool = set( + random.sample(shared.unchecked_node_pool, 1000)) if len(shared.i2p_node_pool) > 1000: - shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000)) + shared.i2p_node_pool = set( + random.sample(shared.i2p_node_pool, 1000)) if len(shared.i2p_unchecked_node_pool) > 100: - shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100)) + shared.i2p_unchecked_node_pool = set( + random.sample(shared.i2p_unchecked_node_pool, 100)) try: - with open(shared.data_directory + 'nodes.pickle', mode='bw') as file: - pickle.dump(shared.node_pool, file, protocol=3) - with open(shared.data_directory + 'i2p_nodes.pickle', mode='bw') as file: - pickle.dump(shared.i2p_node_pool, file, protocol=3) + with open( + os.path.join(shared.data_directory, 'nodes.pickle'), 'bw' + ) as dst: + pickle.dump(shared.node_pool, dst, protocol=3) + with open( + os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw' + ) as dst: + pickle.dump(shared.i2p_node_pool, dst, protocol=3) logging.debug('Saved nodes') - except Exception as e: - logging.warning('Error while saving nodes') - logging.warning(e) + except Exception: + logging.warning('Error while saving nodes', exc_info=True) @staticmethod def publish_i2p_destination(): if shared.i2p_session_nick and not shared.i2p_transient: logging.info('Publishing our I2P destination') - dest_pub_raw = base64.b64decode(shared.i2p_dest_pub, altchars=b'-~') - obj = structure.Object(b'\x00' * 8, int(time.time() + 2 * 3600), shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, 1, dest_pub_raw) + dest_pub_raw = base64.b64decode( + shared.i2p_dest_pub, altchars=b'-~') + obj = structure.Object( + b'\x00' * 8, int(time.time() + 2 * 3600), + shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, + 1, dest_pub_raw) pow.do_pow_and_publish(obj) diff --git a/minode/message.py b/minode/message.py index 33e723c..1afab2b 100644 --- a/minode/message.py +++ b/minode/message.py @@ -14,8 +14,12 @@ class Header(object): self.payload_checksum = payload_checksum def __repr__(self): - return 'type: header, command: "{}", payload_length: {}, payload_checksum: {}'\ - .format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode()) + return ( + 'type: header, command: "{}", payload_length: {},' + ' payload_checksum: {}' + ).format( + self.command.decode(), self.payload_length, + base64.b16encode(self.payload_checksum).decode()) def to_bytes(self): b = b'' @@ -27,7 +31,8 @@ class Header(object): @classmethod def from_bytes(cls, b): - magic_bytes, command, payload_length, payload_checksum = struct.unpack('>4s12sL4s', b) + magic_bytes, command, payload_length, payload_checksum = struct.unpack( + '>4s12sL4s', b) if magic_bytes != shared.magic_bytes: raise ValueError('magic_bytes do not match') @@ -46,11 +51,14 @@ class Message(object): self.payload_checksum = hashlib.sha512(payload).digest()[:4] def __repr__(self): - return '{}, payload_length: {}, payload_checksum: {}'\ - .format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode()) + return '{}, payload_length: {}, payload_checksum: {}'.format( + self.command.decode(), self.payload_length, + base64.b16encode(self.payload_checksum).decode()) def to_bytes(self): - b = Header(self.command, self.payload_length, self.payload_checksum).to_bytes() + b = Header( + self.command, self.payload_length, self.payload_checksum + ).to_bytes() b += self.payload return b @@ -62,19 +70,26 @@ class Message(object): payload_length = len(payload) if payload_length != h.payload_length: - raise ValueError('wrong payload length, expected {}, got {}'.format(h.payload_length, payload_length)) + raise ValueError( + 'wrong payload length, expected {}, got {}'.format( + h.payload_length, payload_length)) payload_checksum = hashlib.sha512(payload).digest()[:4] if payload_checksum != h.payload_checksum: - raise ValueError('wrong payload checksum, expected {}, got {}'.format(h.payload_checksum, payload_checksum)) + raise ValueError( + 'wrong payload checksum, expected {}, got {}'.format( + h.payload_checksum, payload_checksum)) return cls(h.command, payload) class Version(object): - def __init__(self, host, port, protocol_version=shared.protocol_version, services=shared.services, - nonce=shared.nonce, user_agent=shared.user_agent): + def __init__( + self, host, port, protocol_version=shared.protocol_version, + services=shared.services, nonce=shared.nonce, + user_agent=shared.user_agent + ): self.host = host self.port = port @@ -84,16 +99,21 @@ class Version(object): self.user_agent = user_agent def __repr__(self): - return 'version, protocol_version: {}, services: {}, host: {}, port: {}, nonce: {}, user_agent: {}'\ - .format(self.protocol_version, self.services, self.host, self.port, base64.b16encode(self.nonce).decode(), self.user_agent) + return ( + 'version, protocol_version: {}, services: {}, host: {}, port: {},' + ' nonce: {}, user_agent: {}').format( + self.protocol_version, self.services, self.host, self.port, + base64.b16encode(self.nonce).decode(), self.user_agent) def to_bytes(self): payload = b'' payload += struct.pack('>I', self.protocol_version) payload += struct.pack('>Q', self.services) payload += struct.pack('>Q', int(time.time())) - payload += structure.NetAddrNoPrefix(shared.services, self.host, self.port).to_bytes() - payload += structure.NetAddrNoPrefix(shared.services, '127.0.0.1', 8444).to_bytes() + payload += structure.NetAddrNoPrefix( + shared.services, self.host, self.port).to_bytes() + payload += structure.NetAddrNoPrefix( + shared.services, '127.0.0.1', 8444).to_bytes() payload += self.nonce payload += structure.VarInt(len(shared.user_agent)).to_bytes() payload += shared.user_agent @@ -107,8 +127,9 @@ class Version(object): payload = m.payload - protocol_version, services, t, net_addr_remote, net_addr_local, nonce = \ - struct.unpack('>IQQ26s26s8s', payload[:80]) + ( # unused: net_addr_local + protocol_version, services, t, net_addr_remote, _, nonce + ) = struct.unpack('>IQQ26s26s8s', payload[:80]) net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote) @@ -118,7 +139,8 @@ class Version(object): payload = payload[80:] user_agent_varint_length = structure.VarInt.length(payload[0]) - user_agent_length = structure.VarInt.from_bytes(payload[:user_agent_varint_length]).n + user_agent_length = structure.VarInt.from_bytes( + payload[:user_agent_varint_length]).n payload = payload[user_agent_varint_length:] @@ -140,14 +162,18 @@ class Inv(object): return 'inv, count: {}'.format(len(self.vectors)) def to_bytes(self): - return Message(b'inv', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes() + return Message( + b'inv', structure.VarInt(len(self.vectors)).to_bytes() + + b''.join(self.vectors) + ).to_bytes() @classmethod def from_message(cls, m): payload = m.payload vector_count_varint_length = structure.VarInt.length(payload[0]) - vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n + vector_count = structure.VarInt.from_bytes( + payload[:vector_count_varint_length]).n payload = payload[vector_count_varint_length:] @@ -171,14 +197,18 @@ class GetData(object): return 'getdata, count: {}'.format(len(self.vectors)) def to_bytes(self): - return Message(b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes() + return Message( + b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + + b''.join(self.vectors) + ).to_bytes() @classmethod def from_message(cls, m): payload = m.payload vector_count_varint_length = structure.VarInt.length(payload[0]) - vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n + vector_count = structure.VarInt.from_bytes( + payload[:vector_count_varint_length]).n payload = payload[vector_count_varint_length:] @@ -202,14 +232,18 @@ class Addr(object): return 'addr, count: {}'.format(len(self.addresses)) def to_bytes(self): - return Message(b'addr', structure.VarInt(len(self.addresses)).to_bytes() + b''.join({addr.to_bytes() for addr in self.addresses})).to_bytes() + return Message( + b'addr', structure.VarInt(len(self.addresses)).to_bytes() + + b''.join({addr.to_bytes() for addr in self.addresses}) + ).to_bytes() @classmethod def from_message(cls, m): payload = m.payload addr_count_varint_length = structure.VarInt.length(payload[0]) - addr_count = structure.VarInt.from_bytes(payload[:addr_count_varint_length]).n + # addr_count = structure.VarInt.from_bytes( + # payload[:addr_count_varint_length]).n payload = payload[addr_count_varint_length:] diff --git a/minode/pow.py b/minode/pow.py index 98fedd1..9ff3707 100644 --- a/minode/pow.py +++ b/minode/pow.py @@ -2,7 +2,6 @@ import base64 import hashlib import logging import multiprocessing -import shared import struct import threading import time @@ -12,29 +11,37 @@ from . import shared, structure def _pow_worker(target, initial_hash, q): nonce = 0 - logging.debug("target: {}, initial_hash: {}".format(target, base64.b16encode(initial_hash).decode())) + logging.debug( + 'target: %s, initial_hash: %s', + target, base64.b16encode(initial_hash).decode()) trial_value = target + 1 while trial_value > target: nonce += 1 - trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0] + trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512( + struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0] q.put(struct.pack('>Q', nonce)) def _worker(obj): q = multiprocessing.Queue() - p = multiprocessing.Process(target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q)) + p = multiprocessing.Process( + target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q)) - logging.debug("Starting POW process") + logging.debug('Starting POW process') t = time.time() p.start() nonce = q.get() p.join() - logging.debug("Finished doing POW, nonce: {}, time: {}s".format(nonce, time.time() - t)) - obj = structure.Object(nonce, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.object_payload) - logging.debug("Object vector is {}".format(base64.b16encode(obj.vector).decode())) + logging.debug( + 'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t) + obj = structure.Object( + nonce, obj.expires_time, obj.object_type, obj.version, + obj.stream_number, obj.object_payload) + logging.debug( + 'Object vector is %s', base64.b16encode(obj.vector).decode()) with shared.objects_lock: shared.objects[obj.vector] = obj diff --git a/minode/structure.py b/minode/structure.py index 68f47db..106a7c0 100644 --- a/minode/structure.py +++ b/minode/structure.py @@ -2,8 +2,8 @@ import base64 import hashlib import logging -import struct import socket +import struct import time from . import shared @@ -44,17 +44,22 @@ class VarInt(object): class Object(object): - def __init__(self, nonce, expires_time, object_type, version, stream_number, object_payload): + def __init__( + self, nonce, expires_time, object_type, version, + stream_number, object_payload + ): self.nonce = nonce self.expires_time = expires_time self.object_type = object_type self.version = version self.stream_number = stream_number self.object_payload = object_payload - self.vector = hashlib.sha512(hashlib.sha512(self.to_bytes()).digest()).digest()[:32] + self.vector = hashlib.sha512(hashlib.sha512( + self.to_bytes()).digest()).digest()[:32] def __repr__(self): - return 'object, vector: {}'.format(base64.b16encode(self.vector).decode()) + return 'object, vector: {}'.format( + base64.b16encode(self.vector).decode()) @classmethod def from_message(cls, m): @@ -65,15 +70,19 @@ class Object(object): version = VarInt.from_bytes(payload[:version_varint_length]).n payload = payload[version_varint_length:] stream_number_varint_length = VarInt.length(payload[0]) - stream_number = VarInt.from_bytes(payload[:stream_number_varint_length]).n + stream_number = VarInt.from_bytes( + payload[:stream_number_varint_length]).n payload = payload[stream_number_varint_length:] - return cls(nonce, expires_time, object_type, version, stream_number, payload) + return cls( + nonce, expires_time, object_type, version, stream_number, payload) def to_bytes(self): payload = b'' payload += self.nonce payload += struct.pack('>QL', self.expires_time, self.object_type) - payload += VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes() + payload += ( + VarInt(self.version).to_bytes() + + VarInt(self.stream_number).to_bytes()) payload += self.object_payload return payload @@ -82,25 +91,37 @@ class Object(object): def is_valid(self): if self.is_expired(): - logging.debug('Invalid object {}, reason: expired'.format(base64.b16encode(self.vector).decode())) + logging.debug( + 'Invalid object %s, reason: expired', + base64.b16encode(self.vector).decode()) return False if self.expires_time > time.time() + 28 * 24 * 3600 + 3 * 3600: - logging.warning('Invalid object {}, reason: end of life too far in the future'.format(base64.b16encode(self.vector).decode())) + logging.warning( + 'Invalid object %s, reason: end of life too far in the future', + base64.b16encode(self.vector).decode()) return False if len(self.object_payload) > 2**18: - logging.warning('Invalid object {}, reason: payload is too long'.format(base64.b16encode(self.vector).decode())) + logging.warning( + 'Invalid object %s, reason: payload is too long', + base64.b16encode(self.vector).decode()) return False if self.stream_number != 1: - logging.warning('Invalid object {}, reason: not in stream 1'.format(base64.b16encode(self.vector).decode())) + logging.warning( + 'Invalid object %s, reason: not in stream 1', + base64.b16encode(self.vector).decode()) return False data = self.to_bytes()[8:] - length = len(data) + 8 + shared.payload_length_extra_bytes - dt = max(self.expires_time - time.time(), 0) + # length = len(data) + 8 + shared.payload_length_extra_bytes + # dt = max(self.expires_time - time.time(), 0) h = hashlib.sha512(data).digest() - pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big') + pow_value = int.from_bytes( + hashlib.sha512(hashlib.sha512( + self.nonce + h).digest()).digest()[:8], 'big') target = self.pow_target() if target < pow_value: - logging.warning('Invalid object {}, reason: insufficient pow'.format(base64.b16encode(self.vector).decode())) + logging.warning( + 'Invalid object %s, reason: insufficient pow', + base64.b16encode(self.vector).decode()) return False return True @@ -108,7 +129,10 @@ class Object(object): data = self.to_bytes()[8:] length = len(data) + 8 + shared.payload_length_extra_bytes dt = max(self.expires_time - time.time(), 0) - return int(2 ** 64 / (shared.nonce_trials_per_byte * (length + (dt * length) / (2 ** 16)))) + return int( + 2 ** 64 / ( + shared.nonce_trials_per_byte * ( + length + (dt * length) / (2 ** 16)))) def pow_initial_hash(self): return hashlib.sha512(self.to_bytes()[8:]).digest() @@ -121,7 +145,8 @@ class NetAddrNoPrefix(object): self.port = port def __repr__(self): - return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(self.services, self.host, self.port) + return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format( + self.services, self.host, self.port) def to_bytes(self): b = b'' @@ -137,7 +162,8 @@ class NetAddrNoPrefix(object): @classmethod def from_bytes(cls, b): services, host, port = struct.unpack('>Q16sH', b) - if host.startswith(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'): + if host.startswith( + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'): host = socket.inet_ntop(socket.AF_INET, host[-4:]) else: host = socket.inet_ntop(socket.AF_INET6, host) @@ -152,8 +178,8 @@ class NetAddr(object): self.port = port def __repr__(self): - return 'net_addr, stream: {}, services: {}, host: {}, port {}'\ - .format(self.stream, self.services, self.host, self.port) + return 'net_addr, stream: {}, services: {}, host: {}, port {}'.format( + self.stream, self.services, self.host, self.port) def to_bytes(self): b = b'' @@ -164,6 +190,6 @@ class NetAddr(object): @classmethod def from_bytes(cls, b): - t, stream, net_addr = struct.unpack('>QI26s', b) + stream, net_addr = struct.unpack('>QI26s', b)[1:] n = NetAddrNoPrefix.from_bytes(net_addr) return cls(n.services, n.host, n.port, stream)