From 61c8e0d0244b37a90bacfa027387eeb502f7e626 Mon Sep 17 00:00:00 2001 From: TheKysek Date: Thu, 30 Jun 2016 10:11:33 +0200 Subject: [PATCH] Add project files --- src/advertiser.py | 36 ++++++++ src/connection.py | 198 +++++++++++++++++++++++++++++++++++++++++ src/core_nodes.csv | 14 +++ src/listener.py | 31 +++++++ src/main.py | 67 ++++++++++++++ src/manager.py | 121 +++++++++++++++++++++++++ src/message.py | 217 +++++++++++++++++++++++++++++++++++++++++++++ src/shared.py | 48 ++++++++++ src/structure.py | 151 +++++++++++++++++++++++++++++++ start.sh | 2 + 10 files changed, 885 insertions(+) create mode 100644 src/advertiser.py create mode 100644 src/connection.py create mode 100644 src/core_nodes.csv create mode 100644 src/listener.py create mode 100644 src/main.py create mode 100644 src/manager.py create mode 100644 src/message.py create mode 100644 src/shared.py create mode 100644 src/structure.py create mode 100644 start.sh diff --git a/src/advertiser.py b/src/advertiser.py new file mode 100644 index 0000000..60a8a0d --- /dev/null +++ b/src/advertiser.py @@ -0,0 +1,36 @@ +import threading +import time + +import message +import shared + + +class Advertiser(threading.Thread): + def __init__(self): + super().__init__(name='Advertiser') + + def run(self): + while True: + time.sleep(0.6) + self._advertise_vectors() + self._advertise_addresses() + + @staticmethod + def _advertise_vectors(): + vectors_to_advertise = set() + while not shared.vector_advertise_queue.empty(): + vectors_to_advertise.add(shared.vector_advertise_queue.get()) + if len(vectors_to_advertise) > 0: + for c in shared.connections.copy(): + if c.status == 'verack_received': + c.send_queue.put(message.Inv(vectors_to_advertise)) + + @staticmethod + def _advertise_addresses(): + addresses_to_advertise = set() + while not shared.address_advertise_queue.empty(): + addresses_to_advertise.add(shared.address_advertise_queue.get()) + if len(addresses_to_advertise) > 0: + for c in shared.connections.copy(): + if c.status == 'verack_received': + c.send_queue.put(message.Addr(addresses_to_advertise)) diff --git a/src/connection.py b/src/connection.py new file mode 100644 index 0000000..76d7731 --- /dev/null +++ b/src/connection.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +import logging +import random +import socket +import threading +import queue +import time + +import message +import shared +import structure + + +class Connection(threading.Thread): + def __init__(self, host, port, s=None): + super().__init__(name='Connection to {}:{}'.format(host, port)) + + self.send_queue = queue.Queue() + + self.vectors_to_get = set() + + self.status = 'ready' + self.sent_verack = False + self.sent_big_inv_message = False + + self.host = host + self.port = int(port) + + self.s = s + + self.remote_version = None + + self.server = bool(s) + + if self.server: + self.status = 'connected' + + self.buffer = b'' + self.next_message_size = shared.header_length + self.next_header = True + + self.last_message_received = time.time() + self.last_message_sent = time.time() + + def run(self): + if self.s is None: + self._connect() + if self.status != 'connected': + return + self.s.settimeout(1) + if not self.server: + self.send_queue.put(message.Version(self.host, self.port)) + while True: + data = True + try: + data = self.s.recv(1014) + self.buffer += data + except socket.timeout: + if time.time() - self.last_message_received > shared.timeout: + data = None + if time.time() - self.last_message_received > 20 and self.status != 'verack_received': + data = None + if time.time() - self.last_message_sent > 60 and self.status == 'verack_received': + self.send_queue.put(message.Message(b'alive', b'')) + if not self.sent_big_inv_message and self.status == 'verack_received' and self.sent_verack: + self._send_big_inv() + except ConnectionResetError: + data = None + self._process_buffer() + self._request_objects() + self._process_queue() + if self.status == 'disconnecting': + data = None + if not data: + self.status = 'disconnected' + self.s.close() + logging.info('Disconnected from {}:{}'.format(self.host, self.port)) + break + + def _connect(self): + logging.info('Connecting to {}:{}'.format(self.host, self.port)) + + try: + self.s = socket.create_connection((self.host, self.port)) + self.status = 'connected' + logging.debug('Established TCP connection to {}:{}'.format(self.host, self.port)) + except Exception as e: + logging.warning('Connection to {}:{} failed'.format(self.host, self.port)) + logging.warning(e) + + self.status = 'failed' + + def _send_message(self, m): + if type(m) == message.Message and m.command == b'object': + logging.debug('{}:{} <- {}'.format(self.host, self.port, structure.Object.from_message(m))) + else: + logging.debug('{}:{} <- {}'.format(self.host, self.port, m)) + self.s.settimeout(60) + self.s.sendall(m.to_bytes()) + self.s.settimeout(1) + + def _send_big_inv(self): + with shared.objects_lock: + self.send_queue.put(message.Inv({vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()})) + addr = {structure.NetAddr(1, c.host, c.port) for c in shared.connections.copy() if not c.server} + if len(addr) != 0: + self.send_queue.put(message.Addr(addr)) + self.sent_big_inv_message = True + + def _process_queue(self): + while not self.send_queue.empty(): + m = self.send_queue.get() + if m: + self._send_message(m) + self.last_message_sent = time.time() + else: + self.status = 'disconnecting' + break + + def _process_buffer(self): + while len(self.buffer) >= self.next_message_size: + if self.next_header: + self.next_header = False + h = message.Header.from_bytes(self.buffer[:shared.header_length]) + self.next_message_size += h.payload_length + else: + m = message.Message.from_bytes(self.buffer[:self.next_message_size]) + + self.next_header = True + self.buffer = self.buffer[self.next_message_size:] + self.next_message_size = shared.header_length + self.last_message_received = time.time() + self._process_message(m) + + def _process_message(self, m): + if m.command == b'version': + version = message.Version.from_bytes(m.to_bytes()) + logging.debug('{}:{} -> {}'.format(self.host, self.port, str(version))) + if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce: + self.status = 'disconnecting' + self.send_queue.put(None) + else: + self.send_queue.put(message.Message(b'verack', b'')) + self.sent_verack = True + self.remote_version = version + if not self.server: + shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port)) + shared.node_pool.add((self.host, self.port)) + shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port)) + if self.server: + self.send_queue.put(message.Version(self.host, self.port)) + elif m.command == b'verack': + self.status = 'verack_received' + logging.debug('{}:{} -> {}'.format(self.host, self.port, 'verack')) + logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host, self.port)) + elif m.command == b'inv': + inv = message.Inv.from_message(m) + logging.debug('{}:{} -> {}'.format(self.host, self.port, inv)) + to_get = inv.vectors.copy() + to_get.difference_update(shared.objects.keys()) + to_get.difference_update(shared.requested_objects) + self.vectors_to_get.update(to_get) + elif m.command == b'object': + obj = structure.Object.from_message(m) + logging.debug('{}:{} -> {}'.format(self.host, self.port, obj)) + if obj.is_valid() and obj.vector not in shared.objects: + with shared.objects_lock: + shared.objects[obj.vector] = obj + shared.vector_advertise_queue.put(obj.vector) + elif m.command == b'getdata': + getdata = message.GetData.from_message(m) + logging.debug('{}:{} -> {}'.format(self.host, self.port, getdata)) + for vector in getdata.vectors: + if vector in shared.objects: + self.send_queue.put(message.Message(b'object', shared.objects[vector].to_bytes())) + elif m.command == b'addr': # TODO fix ;P + addr = message.Addr.from_message(m) + logging.debug('{}:{} -> {}'.format(self.host, self.port, addr)) + for a in addr.addresses: + shared.unchecked_node_pool.add((a.host, a.port)) + else: + logging.debug('{}:{} -> {}'.format(self.host, self.port, m)) + + def _request_objects(self): + if self.vectors_to_get: + if len(self.vectors_to_get) > 50000: + pack = random.sample(self.vectors_to_get, 50000) + self.send_queue.put(message.GetData(pack)) + self.vectors_to_get.difference_update(pack) + if shared.conserve_bandwidth: + with shared.requested_objects_lock: + shared.requested_objects.update(pack) + else: + self.send_queue.put(message.GetData(self.vectors_to_get)) + if shared.conserve_bandwidth: + with shared.requested_objects_lock: + shared.requested_objects.update(self.vectors_to_get) + self.vectors_to_get.clear() diff --git a/src/core_nodes.csv b/src/core_nodes.csv new file mode 100644 index 0000000..9c31bcf --- /dev/null +++ b/src/core_nodes.csv @@ -0,0 +1,14 @@ +89.36.218.202,8444 +5.45.99.75,8444 +158.222.217.190,8444 +109.147.204.113,1195 +92.78.49.39,8444 +75.167.159.54,8444 +85.180.139.241,8444 +95.165.168.168,8444 +78.55.217.23,8844 +158.222.211.81,8080 +95.247.239.194,8484 +178.62.12.187,8448 +24.188.198.204,8111 +178.11.46.221,8444 diff --git a/src/listener.py b/src/listener.py new file mode 100644 index 0000000..9c12c6f --- /dev/null +++ b/src/listener.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import logging +import socket +import threading + +from connection import Connection +import shared + + +class Listener(threading.Thread): + def __init__(self, host, port, family=socket.AF_INET): + super().__init__(name='Listener') + self.host = host + self.port = port + self.family = family + self.s = socket.socket(self.family, socket.SOCK_STREAM) + self.s.bind((self.host, self.port)) + + def run(self): + self.s.listen(1) + self.s.settimeout(1) + while True: + try: + conn, addr = self.s.accept() + logging.info('Incoming connection from: {}:{}'.format(addr[0], addr[1])) + with shared.connections_lock: + c = Connection(addr[0], addr[1], conn) + c.start() + shared.connections.add(c) + except socket.timeout: + pass diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..50547e4 --- /dev/null +++ b/src/main.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +import csv +import logging +import pickle +import socket + +from advertiser import Advertiser +from manager import Manager +from listener import Listener +import shared + + +def main(): + logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s') + logging.info('Starting MiNode') + try: + with open(shared.data_directory + 'objects.pickle', mode='br') as file: + shared.objects = pickle.load(file) + except Exception as e: + logging.warning('Error while loading objects from disk.') + logging.warning(e) + + try: + with open(shared.data_directory + 'nodes.pickle', mode='br') as file: + shared.nodes = pickle.load(file) + except Exception as e: + logging.warning('Error while loading nodes from disk.') + logging.warning(e) + + with open('core_nodes.csv', mode='r', newline='') as f: + reader = csv.reader(f) + shared.core_nodes = {tuple(row) for row in reader} + shared.node_pool.update(shared.core_nodes) + + try: + for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): + shared.unchecked_node_pool.add((item[4][0], 8444)) + shared.unchecked_node_pool.add((item[4][0], 8080)) + logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method') + except Exception as e: + logging.error('Error during DNS bootstrap') + logging.error(e) + + manager = Manager() + manager.clean_objects() + manager.clean_connections() + manager.start() + + advertiser = Advertiser() + advertiser.start() + + try: + listener_ipv4 = Listener('0.0.0.0', shared.listening_port) + listener_ipv4.start() + except Exception as e: + logging.error('Error while starting IPv4 listener') + logging.error(e) + + try: + listener_ipv6 = Listener('::', shared.listening_port, family=socket.AF_INET6) + listener_ipv6.start() + except Exception as e: + logging.error('Error while starting IPv6 listener') + logging.error(e) + +if __name__ == '__main__': + main() diff --git a/src/manager.py b/src/manager.py new file mode 100644 index 0000000..fc7457e --- /dev/null +++ b/src/manager.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +import base64 +import logging +import pickle +import queue +import random +import threading +import time + +from connection import Connection +import shared + + +class Manager(threading.Thread): + def __init__(self): + super().__init__(name='Manager') + self.q = queue.Queue() + self.last_cleaned_objects = time.time() + self.last_cleaned_requested_objects = time.time() + self.last_cleaned_connections = time.time() + self.last_pickled_objects = time.time() + self.last_pickled_nodes = time.time() + + def run(self): + while True: + time.sleep(0.8) + now = time.time() + if now - self.last_cleaned_objects > 90: + self.clean_objects() + self.last_cleaned_objects = now + if now - self.last_cleaned_requested_objects > 3: + self.clean_requested_objects() + self.last_cleaned_requested_objects = now + if now - self.last_cleaned_connections > 2: + self.clean_connections() + self.last_cleaned_connections = now + if now - self.last_pickled_objects > 100: + self.pickle_objects() + self.last_pickled_objects = now + if now - self.last_pickled_nodes > 60: + self.pickle_nodes() + self.last_pickled_nodes = now + + if shared.shutting_down: + self.shutdown_connections() + + @staticmethod + def clean_objects(): + for vector in set(shared.objects): + if shared.objects[vector].is_expired(): + with shared.objects_lock: + del shared.objects[vector] + logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode())) + + @staticmethod + def clean_requested_objects(): + with shared.requested_objects_lock: + shared.requested_objects.difference_update(shared.objects.keys()) + + @staticmethod + def clean_connections(): + conns = shared.connections.copy() + hosts = set() + outgoing_connections = 0 + for c in conns: + if not c.is_alive() or c.status == 'disconnected': + with shared.connections_lock: + shared.connections.remove(c) + else: + hosts.add(c.host) + if not c.server: + outgoing_connections += 1 + shared.hosts = hosts + if outgoing_connections < shared.outgoing_connections: + to_connect = set() + if len(shared.unchecked_node_pool) > 12: + to_connect.update(random.sample(shared.unchecked_node_pool, 12)) + else: + to_connect.update(shared.unchecked_node_pool) + shared.unchecked_node_pool.difference_update(to_connect) + if len(shared.node_pool) > 6: + to_connect.update(random.sample(shared.node_pool, 6)) + else: + to_connect.update(shared.node_pool) + for addr in to_connect: + if addr[0] in hosts: + continue + c = Connection(addr[0], addr[1]) + c.start() + with shared.connections_lock: + shared.connections.add(c) + + @staticmethod + def shutdown_connections(): + for c in shared.connections.copy(): + c.send_queue.put(None) + + @staticmethod + def pickle_objects(): + try: + with open(shared.data_directory + 'objects.pickle', mode='bw') as file: + with shared.objects_lock: + pickle.dump(shared.objects, file, protocol=4) + logging.debug('Saved objects') + except Exception as e: + logging.warning('Error while saving objects') + logging.warning(e) + + @staticmethod + def pickle_nodes(): + if len(shared.node_pool) > 10000: + shared.node_pool = set(random.sample(shared.node_pool, 10000)) + if len(shared.unchecked_node_pool) > 1000: + shared.node_pool = set(random.sample(shared.unchecked_node_pool, 1000)) + try: + with open(shared.data_directory + 'nodes.pickle', mode='bw') as file: + pickle.dump(shared.node_pool, file, protocol=4) + logging.debug('Saved nodes') + except Exception as e: + logging.warning('Error while saving nodes') + logging.warning(e) diff --git a/src/message.py b/src/message.py new file mode 100644 index 0000000..a3dbc99 --- /dev/null +++ b/src/message.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +import base64 +import hashlib +import struct +import time + +import shared +import structure + + +class Header(object): + def __init__(self, command, payload_length, payload_checksum): + self.command = command + self.payload_length = payload_length + self.payload_checksum = payload_checksum + + def __repr__(self): + return 'type: header, command: "{}", payload_length: {}, payload_checksum: {}'\ + .format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode()) + + def to_bytes(self): + b = b'' + b += shared.magic_bytes + b += self.command.ljust(12, b'\x00') + b += struct.pack('>L', self.payload_length) + b += self.payload_checksum + return b + + @classmethod + def from_bytes(cls, b): + magic_bytes, command, payload_length, payload_checksum = struct.unpack('>4s12sL4s', b) + + if magic_bytes != shared.magic_bytes: + raise IOError('magic_bytes do not match') + + command = command.rstrip(b'\x00') + + return cls(command, payload_length, payload_checksum) + + +class Message(object): + def __init__(self, command, payload): + self.command = command + self.payload = payload + + self.payload_length = len(payload) + self.payload_checksum = hashlib.sha512(payload).digest()[:4] + + def __repr__(self): + return '{}, payload_length: {}, payload_checksum: {}'\ + .format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode()) + + def to_bytes(self): + b = Header(self.command, self.payload_length, self.payload_checksum).to_bytes() + b += self.payload + return b + + @classmethod + def from_bytes(cls, b): + h = Header.from_bytes(b[:24]) + + payload = b[24:] + payload_length = len(payload) + + if payload_length != h.payload_length: + raise Exception('wrong payload length, expected {}, got {}'.format(h.payload_length, payload_length)) + + payload_checksum = hashlib.sha512(payload).digest()[:4] + + if payload_checksum != h.payload_checksum: + raise Exception('wrong payload checksum, expected {}, got {}'.format(h.payload_checksum, payload_checksum)) + + return cls(h.command, payload) + + +class Version(object): + def __init__(self, host, port, protocol_version=shared.protocol_version, services=shared.services, + nonce=shared.nonce, user_agent=shared.user_agent): + self.host = host + self.port = port + + self.protocol_version = protocol_version + self.services = services + self.nonce = nonce + self.user_agent = user_agent + + def __repr__(self): + return 'version, protocol_version: {}, services: {}, host: {}, port: {}, nonce: {}, user_agent: {}'\ + .format(self.protocol_version, self.services, self.host, self.port, base64.b16encode(self.nonce).decode(), self.user_agent) + + def to_bytes(self): + payload = b'' + payload += struct.pack('>I', self.protocol_version) + payload += struct.pack('>Q', self.services) + payload += struct.pack('>Q', int(time.time())) + payload += structure.NetAddrNoPrefix(shared.services, self.host, self.port).to_bytes() + payload += structure.NetAddrNoPrefix(shared.services, '127.0.0.1', 8444).to_bytes() + payload += self.nonce + payload += structure.VarInt(len(shared.user_agent)).to_bytes() + payload += shared.user_agent + payload += 2 * structure.VarInt(1).to_bytes() + + return Message(b'version', payload).to_bytes() + + @classmethod + def from_bytes(cls, b): + m = Message.from_bytes(b) + + payload = m.payload + + protocol_version, services, t, net_addr_remote, net_addr_local, nonce = \ + struct.unpack('>IQQ26s26s8s', payload[:80]) + + net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote) + + host = net_addr_remote.host + port = net_addr_remote.port + + payload = payload[80:] + + user_agent_varint_length = structure.VarInt.length(payload[0]) + user_agent_length = structure.VarInt.from_bytes(payload[:user_agent_varint_length]).n + + payload = payload[user_agent_varint_length:] + + user_agent = payload[:user_agent_length] + + payload = payload[user_agent_length:] + + # Assume it is stream 1 + assert payload == b'\x01\x01' + + return cls(host, port, protocol_version, services, nonce, user_agent) + + +class Inv(object): + def __init__(self, vectors): + self.vectors = set(vectors) + + def __repr__(self): + return 'inv, count: {}'.format(len(self.vectors)) + + def to_bytes(self): + return Message(b'inv', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes() + + @classmethod + def from_message(cls, m): + payload = m.payload + + vector_count_varint_length = structure.VarInt.length(payload[0]) + vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n + + payload = payload[vector_count_varint_length:] + + vectors = set() + + while payload: + vectors.add(payload[:32]) + payload = payload[32:] + + return cls(vectors) + + +class GetData(object): + def __init__(self, vectors): + self.vectors = set(vectors) + + def __repr__(self): + return 'getdata, count: {}'.format(len(self.vectors)) + + def to_bytes(self): + return Message(b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes() + + @classmethod + def from_message(cls, m): + payload = m.payload + + vector_count_varint_length = structure.VarInt.length(payload[0]) + vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n + + payload = payload[vector_count_varint_length:] + + vectors = set() + + while payload: + vectors.add(payload[:32]) + payload = payload[32:] + + return cls(vectors) + + +class Addr(object): + def __init__(self, addresses): + self.addresses = addresses + + def __repr__(self): + return 'addr, count: {}'.format(len(self.addresses)) + + def to_bytes(self): + return Message(b'addr', structure.VarInt(len(self.addresses)).to_bytes() + b''.join({addr.to_bytes() for addr in self.addresses})).to_bytes() + + @classmethod + def from_message(cls, m): + payload = m.payload + + addr_count_varint_length = structure.VarInt.length(payload[0]) + addr_count = structure.VarInt.from_bytes(payload[:addr_count_varint_length]).n + + payload = payload[addr_count_varint_length:] + + addresses = set() + + while payload: + addresses.add(structure.NetAddr.from_bytes(payload[:38])) + payload = payload[38:] + + return cls(addresses) diff --git a/src/shared.py b/src/shared.py new file mode 100644 index 0000000..1894ff4 --- /dev/null +++ b/src/shared.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +import logging +import os +import queue +import threading + +listening_port = 8444 +data_directory = 'minode_data/' + +log_level = logging.DEBUG + +magic_bytes = b'\xe9\xbe\xb4\xd9' +protocol_version = 3 +services = 1 # NODE_NETWORK +stream = 1 +nonce = os.urandom(8) +user_agent = b'MiNode-v0.0.1' +timeout = 600 +header_length = 24 + +nonce_trials_per_byte = 1000 +payload_length_extra_bytes = 1000 + +# Longer synchronization, lower bandwidth usage, not really working +conserve_bandwidth = False +requested_objects = set() +requested_objects_lock = threading.Lock() + +shutting_down = False + +vector_advertise_queue = queue.Queue() +address_advertise_queue = queue.Queue() + +connections = set() +connections_lock = threading.Lock() + +hosts = set() + +core_nodes = set() + +node_pool = set() +unchecked_node_pool = set() + +outgoing_connections = 8 + +objects = {} +objects_lock = threading.Lock() + diff --git a/src/structure.py b/src/structure.py new file mode 100644 index 0000000..a5c8d6c --- /dev/null +++ b/src/structure.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +import base64 +import hashlib +import struct +import socket +import time + +import shared + + +class VarInt(object): + def __init__(self, n): + self.n = n + + def to_bytes(self): + if self.n < 0xfd: + return struct.pack('>B', self.n) + + if self.n <= 0xffff: + return b'\xfd' + struct.pack('>H', self.n) + + if self.n <= 0xffffffff: + return b'\xfe' + struct.pack('>I', self.n) + + return b'\xff' + struct.pack('>Q', self.n) + + @staticmethod + def length(b): + if b == 0xfd: + return 3 + if b == 0xfe: + return 5 + if b == 0xff: + return 9 + return 1 + + @classmethod + def from_bytes(cls, b): + if cls.length(b[0]) > 1: + b = b[1:] + n = int.from_bytes(b, 'big') + return cls(n) + + +class Object(object): + def __init__(self, nonce, expires_time, object_type, version, stream_number, object_payload): + self.nonce = nonce + self.expires_time = expires_time + self.object_type = object_type + self.version = version + self.stream_number = stream_number + self.object_payload = object_payload + self.vector = hashlib.sha512(hashlib.sha512(self.to_bytes()).digest()).digest()[:32] + + def __repr__(self): + return 'object, vector: {}'.format(base64.b16encode(self.vector).decode()) + + @classmethod + def from_message(cls, m): + payload = m.payload + nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20]) + payload = payload[20:] + version_varint_length = VarInt.length(payload[0]) + version = VarInt.from_bytes(payload[:version_varint_length]).n + payload = payload[version_varint_length:] + stream_number_varint_length = VarInt.length(payload[0]) + stream_number = VarInt.from_bytes(payload[:stream_number_varint_length]).n + payload = payload[stream_number_varint_length:] + return cls(nonce, expires_time, object_type, version, stream_number, payload) + + def to_bytes(self): + payload = b'' + payload += self.nonce + payload += struct.pack('>QL', self.expires_time, self.object_type) + payload += VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes() + payload += self.object_payload + return payload + + def is_expired(self): + return self.expires_time + 3 * 3600 < time.time() + + def is_valid(self): + if self.is_expired(): + return False + if len(self.object_payload) > 2**18: + return False + data = self.to_bytes()[8:] + length = len(data) + 8 + shared.payload_length_extra_bytes + dt = max(self.expires_time - time.time(), 0) + h = hashlib.sha512(data).digest() + pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big') + target = int(2**64/(shared.nonce_trials_per_byte*(length+(dt*length)/(2**16)))) + if target < pow_value: + print('insufficient pow') + return False + return True + + +class NetAddrNoPrefix(object): + def __init__(self, services, host, port): + self.services = services + self.host = host + self.port = port + + def __repr__(self): + return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(self.services, self.host, self.port) + + def to_bytes(self): + b = b'' + b += struct.pack('>Q', self.services) + try: + host = socket.inet_pton(socket.AF_INET, self.host) + b += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + host + except socket.error: + b += socket.inet_pton(socket.AF_INET6, self.host) + b += struct.pack('>H', self.port) + return b + + @classmethod + def from_bytes(cls, b): + services, host, port = struct.unpack('>Q16sH', b) + if host.startswith(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'): + host = socket.inet_ntop(socket.AF_INET, host[-4:]) + else: + host = socket.inet_ntop(socket.AF_INET6, host) + return cls(services, host, port) + + +class NetAddr(object): + def __init__(self, services, host, port, stream=shared.stream): + self.stream = stream + self.services = services + self.host = host + self.port = port + + def __repr__(self): + return 'net_addr, stream: {}, services: {}, host: {}, port {}'\ + .format(self.stream, self.services, self.host, self.port) + + def to_bytes(self): + b = b'' + b += struct.pack('>Q', int(time.time())) + b += struct.pack('>I', self.stream) + b += NetAddrNoPrefix(self.services, self.host, self.port).to_bytes() + return b + + @classmethod + def from_bytes(cls, b): + t, stream, net_addr = struct.unpack('>QI26s', b) + n = NetAddrNoPrefix.from_bytes(net_addr) + return cls(n.services, n.host, n.port, stream) diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..b0463ce --- /dev/null +++ b/start.sh @@ -0,0 +1,2 @@ +#!/bin/sh +python3 ./src/main.py \ No newline at end of file