Add project files
This commit is contained in:
parent
e58bc9c76f
commit
61c8e0d024
36
src/advertiser.py
Normal file
36
src/advertiser.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import message
|
||||||
|
import shared
|
||||||
|
|
||||||
|
|
||||||
|
class Advertiser(threading.Thread):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(name='Advertiser')
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
time.sleep(0.6)
|
||||||
|
self._advertise_vectors()
|
||||||
|
self._advertise_addresses()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _advertise_vectors():
|
||||||
|
vectors_to_advertise = set()
|
||||||
|
while not shared.vector_advertise_queue.empty():
|
||||||
|
vectors_to_advertise.add(shared.vector_advertise_queue.get())
|
||||||
|
if len(vectors_to_advertise) > 0:
|
||||||
|
for c in shared.connections.copy():
|
||||||
|
if c.status == 'verack_received':
|
||||||
|
c.send_queue.put(message.Inv(vectors_to_advertise))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _advertise_addresses():
|
||||||
|
addresses_to_advertise = set()
|
||||||
|
while not shared.address_advertise_queue.empty():
|
||||||
|
addresses_to_advertise.add(shared.address_advertise_queue.get())
|
||||||
|
if len(addresses_to_advertise) > 0:
|
||||||
|
for c in shared.connections.copy():
|
||||||
|
if c.status == 'verack_received':
|
||||||
|
c.send_queue.put(message.Addr(addresses_to_advertise))
|
198
src/connection.py
Normal file
198
src/connection.py
Normal file
|
@ -0,0 +1,198 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
import time
|
||||||
|
|
||||||
|
import message
|
||||||
|
import shared
|
||||||
|
import structure
|
||||||
|
|
||||||
|
|
||||||
|
class Connection(threading.Thread):
|
||||||
|
def __init__(self, host, port, s=None):
|
||||||
|
super().__init__(name='Connection to {}:{}'.format(host, port))
|
||||||
|
|
||||||
|
self.send_queue = queue.Queue()
|
||||||
|
|
||||||
|
self.vectors_to_get = set()
|
||||||
|
|
||||||
|
self.status = 'ready'
|
||||||
|
self.sent_verack = False
|
||||||
|
self.sent_big_inv_message = False
|
||||||
|
|
||||||
|
self.host = host
|
||||||
|
self.port = int(port)
|
||||||
|
|
||||||
|
self.s = s
|
||||||
|
|
||||||
|
self.remote_version = None
|
||||||
|
|
||||||
|
self.server = bool(s)
|
||||||
|
|
||||||
|
if self.server:
|
||||||
|
self.status = 'connected'
|
||||||
|
|
||||||
|
self.buffer = b''
|
||||||
|
self.next_message_size = shared.header_length
|
||||||
|
self.next_header = True
|
||||||
|
|
||||||
|
self.last_message_received = time.time()
|
||||||
|
self.last_message_sent = time.time()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if self.s is None:
|
||||||
|
self._connect()
|
||||||
|
if self.status != 'connected':
|
||||||
|
return
|
||||||
|
self.s.settimeout(1)
|
||||||
|
if not self.server:
|
||||||
|
self.send_queue.put(message.Version(self.host, self.port))
|
||||||
|
while True:
|
||||||
|
data = True
|
||||||
|
try:
|
||||||
|
data = self.s.recv(1014)
|
||||||
|
self.buffer += data
|
||||||
|
except socket.timeout:
|
||||||
|
if time.time() - self.last_message_received > shared.timeout:
|
||||||
|
data = None
|
||||||
|
if time.time() - self.last_message_received > 20 and self.status != 'verack_received':
|
||||||
|
data = None
|
||||||
|
if time.time() - self.last_message_sent > 60 and self.status == 'verack_received':
|
||||||
|
self.send_queue.put(message.Message(b'alive', b''))
|
||||||
|
if not self.sent_big_inv_message and self.status == 'verack_received' and self.sent_verack:
|
||||||
|
self._send_big_inv()
|
||||||
|
except ConnectionResetError:
|
||||||
|
data = None
|
||||||
|
self._process_buffer()
|
||||||
|
self._request_objects()
|
||||||
|
self._process_queue()
|
||||||
|
if self.status == 'disconnecting':
|
||||||
|
data = None
|
||||||
|
if not data:
|
||||||
|
self.status = 'disconnected'
|
||||||
|
self.s.close()
|
||||||
|
logging.info('Disconnected from {}:{}'.format(self.host, self.port))
|
||||||
|
break
|
||||||
|
|
||||||
|
def _connect(self):
|
||||||
|
logging.info('Connecting to {}:{}'.format(self.host, self.port))
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.s = socket.create_connection((self.host, self.port))
|
||||||
|
self.status = 'connected'
|
||||||
|
logging.debug('Established TCP connection to {}:{}'.format(self.host, self.port))
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Connection to {}:{} failed'.format(self.host, self.port))
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
self.status = 'failed'
|
||||||
|
|
||||||
|
def _send_message(self, m):
|
||||||
|
if type(m) == message.Message and m.command == b'object':
|
||||||
|
logging.debug('{}:{} <- {}'.format(self.host, self.port, structure.Object.from_message(m)))
|
||||||
|
else:
|
||||||
|
logging.debug('{}:{} <- {}'.format(self.host, self.port, m))
|
||||||
|
self.s.settimeout(60)
|
||||||
|
self.s.sendall(m.to_bytes())
|
||||||
|
self.s.settimeout(1)
|
||||||
|
|
||||||
|
def _send_big_inv(self):
|
||||||
|
with shared.objects_lock:
|
||||||
|
self.send_queue.put(message.Inv({vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()}))
|
||||||
|
addr = {structure.NetAddr(1, c.host, c.port) for c in shared.connections.copy() if not c.server}
|
||||||
|
if len(addr) != 0:
|
||||||
|
self.send_queue.put(message.Addr(addr))
|
||||||
|
self.sent_big_inv_message = True
|
||||||
|
|
||||||
|
def _process_queue(self):
|
||||||
|
while not self.send_queue.empty():
|
||||||
|
m = self.send_queue.get()
|
||||||
|
if m:
|
||||||
|
self._send_message(m)
|
||||||
|
self.last_message_sent = time.time()
|
||||||
|
else:
|
||||||
|
self.status = 'disconnecting'
|
||||||
|
break
|
||||||
|
|
||||||
|
def _process_buffer(self):
|
||||||
|
while len(self.buffer) >= self.next_message_size:
|
||||||
|
if self.next_header:
|
||||||
|
self.next_header = False
|
||||||
|
h = message.Header.from_bytes(self.buffer[:shared.header_length])
|
||||||
|
self.next_message_size += h.payload_length
|
||||||
|
else:
|
||||||
|
m = message.Message.from_bytes(self.buffer[:self.next_message_size])
|
||||||
|
|
||||||
|
self.next_header = True
|
||||||
|
self.buffer = self.buffer[self.next_message_size:]
|
||||||
|
self.next_message_size = shared.header_length
|
||||||
|
self.last_message_received = time.time()
|
||||||
|
self._process_message(m)
|
||||||
|
|
||||||
|
def _process_message(self, m):
|
||||||
|
if m.command == b'version':
|
||||||
|
version = message.Version.from_bytes(m.to_bytes())
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, str(version)))
|
||||||
|
if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce:
|
||||||
|
self.status = 'disconnecting'
|
||||||
|
self.send_queue.put(None)
|
||||||
|
else:
|
||||||
|
self.send_queue.put(message.Message(b'verack', b''))
|
||||||
|
self.sent_verack = True
|
||||||
|
self.remote_version = version
|
||||||
|
if not self.server:
|
||||||
|
shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port))
|
||||||
|
shared.node_pool.add((self.host, self.port))
|
||||||
|
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
|
||||||
|
if self.server:
|
||||||
|
self.send_queue.put(message.Version(self.host, self.port))
|
||||||
|
elif m.command == b'verack':
|
||||||
|
self.status = 'verack_received'
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, 'verack'))
|
||||||
|
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host, self.port))
|
||||||
|
elif m.command == b'inv':
|
||||||
|
inv = message.Inv.from_message(m)
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, inv))
|
||||||
|
to_get = inv.vectors.copy()
|
||||||
|
to_get.difference_update(shared.objects.keys())
|
||||||
|
to_get.difference_update(shared.requested_objects)
|
||||||
|
self.vectors_to_get.update(to_get)
|
||||||
|
elif m.command == b'object':
|
||||||
|
obj = structure.Object.from_message(m)
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, obj))
|
||||||
|
if obj.is_valid() and obj.vector not in shared.objects:
|
||||||
|
with shared.objects_lock:
|
||||||
|
shared.objects[obj.vector] = obj
|
||||||
|
shared.vector_advertise_queue.put(obj.vector)
|
||||||
|
elif m.command == b'getdata':
|
||||||
|
getdata = message.GetData.from_message(m)
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, getdata))
|
||||||
|
for vector in getdata.vectors:
|
||||||
|
if vector in shared.objects:
|
||||||
|
self.send_queue.put(message.Message(b'object', shared.objects[vector].to_bytes()))
|
||||||
|
elif m.command == b'addr': # TODO fix ;P
|
||||||
|
addr = message.Addr.from_message(m)
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, addr))
|
||||||
|
for a in addr.addresses:
|
||||||
|
shared.unchecked_node_pool.add((a.host, a.port))
|
||||||
|
else:
|
||||||
|
logging.debug('{}:{} -> {}'.format(self.host, self.port, m))
|
||||||
|
|
||||||
|
def _request_objects(self):
|
||||||
|
if self.vectors_to_get:
|
||||||
|
if len(self.vectors_to_get) > 50000:
|
||||||
|
pack = random.sample(self.vectors_to_get, 50000)
|
||||||
|
self.send_queue.put(message.GetData(pack))
|
||||||
|
self.vectors_to_get.difference_update(pack)
|
||||||
|
if shared.conserve_bandwidth:
|
||||||
|
with shared.requested_objects_lock:
|
||||||
|
shared.requested_objects.update(pack)
|
||||||
|
else:
|
||||||
|
self.send_queue.put(message.GetData(self.vectors_to_get))
|
||||||
|
if shared.conserve_bandwidth:
|
||||||
|
with shared.requested_objects_lock:
|
||||||
|
shared.requested_objects.update(self.vectors_to_get)
|
||||||
|
self.vectors_to_get.clear()
|
14
src/core_nodes.csv
Normal file
14
src/core_nodes.csv
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
89.36.218.202,8444
|
||||||
|
5.45.99.75,8444
|
||||||
|
158.222.217.190,8444
|
||||||
|
109.147.204.113,1195
|
||||||
|
92.78.49.39,8444
|
||||||
|
75.167.159.54,8444
|
||||||
|
85.180.139.241,8444
|
||||||
|
95.165.168.168,8444
|
||||||
|
78.55.217.23,8844
|
||||||
|
158.222.211.81,8080
|
||||||
|
95.247.239.194,8484
|
||||||
|
178.62.12.187,8448
|
||||||
|
24.188.198.204,8111
|
||||||
|
178.11.46.221,8444
|
|
31
src/listener.py
Normal file
31
src/listener.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from connection import Connection
|
||||||
|
import shared
|
||||||
|
|
||||||
|
|
||||||
|
class Listener(threading.Thread):
|
||||||
|
def __init__(self, host, port, family=socket.AF_INET):
|
||||||
|
super().__init__(name='Listener')
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
self.family = family
|
||||||
|
self.s = socket.socket(self.family, socket.SOCK_STREAM)
|
||||||
|
self.s.bind((self.host, self.port))
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.s.listen(1)
|
||||||
|
self.s.settimeout(1)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
conn, addr = self.s.accept()
|
||||||
|
logging.info('Incoming connection from: {}:{}'.format(addr[0], addr[1]))
|
||||||
|
with shared.connections_lock:
|
||||||
|
c = Connection(addr[0], addr[1], conn)
|
||||||
|
c.start()
|
||||||
|
shared.connections.add(c)
|
||||||
|
except socket.timeout:
|
||||||
|
pass
|
67
src/main.py
Normal file
67
src/main.py
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import csv
|
||||||
|
import logging
|
||||||
|
import pickle
|
||||||
|
import socket
|
||||||
|
|
||||||
|
from advertiser import Advertiser
|
||||||
|
from manager import Manager
|
||||||
|
from listener import Listener
|
||||||
|
import shared
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
||||||
|
logging.info('Starting MiNode')
|
||||||
|
try:
|
||||||
|
with open(shared.data_directory + 'objects.pickle', mode='br') as file:
|
||||||
|
shared.objects = pickle.load(file)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while loading objects from disk.')
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(shared.data_directory + 'nodes.pickle', mode='br') as file:
|
||||||
|
shared.nodes = pickle.load(file)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while loading nodes from disk.')
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
with open('core_nodes.csv', mode='r', newline='') as f:
|
||||||
|
reader = csv.reader(f)
|
||||||
|
shared.core_nodes = {tuple(row) for row in reader}
|
||||||
|
shared.node_pool.update(shared.core_nodes)
|
||||||
|
|
||||||
|
try:
|
||||||
|
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||||
|
shared.unchecked_node_pool.add((item[4][0], 8444))
|
||||||
|
shared.unchecked_node_pool.add((item[4][0], 8080))
|
||||||
|
logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method')
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('Error during DNS bootstrap')
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
manager = Manager()
|
||||||
|
manager.clean_objects()
|
||||||
|
manager.clean_connections()
|
||||||
|
manager.start()
|
||||||
|
|
||||||
|
advertiser = Advertiser()
|
||||||
|
advertiser.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
listener_ipv4 = Listener('0.0.0.0', shared.listening_port)
|
||||||
|
listener_ipv4.start()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('Error while starting IPv4 listener')
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
try:
|
||||||
|
listener_ipv6 = Listener('::', shared.listening_port, family=socket.AF_INET6)
|
||||||
|
listener_ipv6.start()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error('Error while starting IPv6 listener')
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
121
src/manager.py
Normal file
121
src/manager.py
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import base64
|
||||||
|
import logging
|
||||||
|
import pickle
|
||||||
|
import queue
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from connection import Connection
|
||||||
|
import shared
|
||||||
|
|
||||||
|
|
||||||
|
class Manager(threading.Thread):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(name='Manager')
|
||||||
|
self.q = queue.Queue()
|
||||||
|
self.last_cleaned_objects = time.time()
|
||||||
|
self.last_cleaned_requested_objects = time.time()
|
||||||
|
self.last_cleaned_connections = time.time()
|
||||||
|
self.last_pickled_objects = time.time()
|
||||||
|
self.last_pickled_nodes = time.time()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
time.sleep(0.8)
|
||||||
|
now = time.time()
|
||||||
|
if now - self.last_cleaned_objects > 90:
|
||||||
|
self.clean_objects()
|
||||||
|
self.last_cleaned_objects = now
|
||||||
|
if now - self.last_cleaned_requested_objects > 3:
|
||||||
|
self.clean_requested_objects()
|
||||||
|
self.last_cleaned_requested_objects = now
|
||||||
|
if now - self.last_cleaned_connections > 2:
|
||||||
|
self.clean_connections()
|
||||||
|
self.last_cleaned_connections = now
|
||||||
|
if now - self.last_pickled_objects > 100:
|
||||||
|
self.pickle_objects()
|
||||||
|
self.last_pickled_objects = now
|
||||||
|
if now - self.last_pickled_nodes > 60:
|
||||||
|
self.pickle_nodes()
|
||||||
|
self.last_pickled_nodes = now
|
||||||
|
|
||||||
|
if shared.shutting_down:
|
||||||
|
self.shutdown_connections()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clean_objects():
|
||||||
|
for vector in set(shared.objects):
|
||||||
|
if shared.objects[vector].is_expired():
|
||||||
|
with shared.objects_lock:
|
||||||
|
del shared.objects[vector]
|
||||||
|
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clean_requested_objects():
|
||||||
|
with shared.requested_objects_lock:
|
||||||
|
shared.requested_objects.difference_update(shared.objects.keys())
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def clean_connections():
|
||||||
|
conns = shared.connections.copy()
|
||||||
|
hosts = set()
|
||||||
|
outgoing_connections = 0
|
||||||
|
for c in conns:
|
||||||
|
if not c.is_alive() or c.status == 'disconnected':
|
||||||
|
with shared.connections_lock:
|
||||||
|
shared.connections.remove(c)
|
||||||
|
else:
|
||||||
|
hosts.add(c.host)
|
||||||
|
if not c.server:
|
||||||
|
outgoing_connections += 1
|
||||||
|
shared.hosts = hosts
|
||||||
|
if outgoing_connections < shared.outgoing_connections:
|
||||||
|
to_connect = set()
|
||||||
|
if len(shared.unchecked_node_pool) > 12:
|
||||||
|
to_connect.update(random.sample(shared.unchecked_node_pool, 12))
|
||||||
|
else:
|
||||||
|
to_connect.update(shared.unchecked_node_pool)
|
||||||
|
shared.unchecked_node_pool.difference_update(to_connect)
|
||||||
|
if len(shared.node_pool) > 6:
|
||||||
|
to_connect.update(random.sample(shared.node_pool, 6))
|
||||||
|
else:
|
||||||
|
to_connect.update(shared.node_pool)
|
||||||
|
for addr in to_connect:
|
||||||
|
if addr[0] in hosts:
|
||||||
|
continue
|
||||||
|
c = Connection(addr[0], addr[1])
|
||||||
|
c.start()
|
||||||
|
with shared.connections_lock:
|
||||||
|
shared.connections.add(c)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def shutdown_connections():
|
||||||
|
for c in shared.connections.copy():
|
||||||
|
c.send_queue.put(None)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def pickle_objects():
|
||||||
|
try:
|
||||||
|
with open(shared.data_directory + 'objects.pickle', mode='bw') as file:
|
||||||
|
with shared.objects_lock:
|
||||||
|
pickle.dump(shared.objects, file, protocol=4)
|
||||||
|
logging.debug('Saved objects')
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while saving objects')
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def pickle_nodes():
|
||||||
|
if len(shared.node_pool) > 10000:
|
||||||
|
shared.node_pool = set(random.sample(shared.node_pool, 10000))
|
||||||
|
if len(shared.unchecked_node_pool) > 1000:
|
||||||
|
shared.node_pool = set(random.sample(shared.unchecked_node_pool, 1000))
|
||||||
|
try:
|
||||||
|
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
|
||||||
|
pickle.dump(shared.node_pool, file, protocol=4)
|
||||||
|
logging.debug('Saved nodes')
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while saving nodes')
|
||||||
|
logging.warning(e)
|
217
src/message.py
Normal file
217
src/message.py
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
|
||||||
|
import shared
|
||||||
|
import structure
|
||||||
|
|
||||||
|
|
||||||
|
class Header(object):
|
||||||
|
def __init__(self, command, payload_length, payload_checksum):
|
||||||
|
self.command = command
|
||||||
|
self.payload_length = payload_length
|
||||||
|
self.payload_checksum = payload_checksum
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'type: header, command: "{}", payload_length: {}, payload_checksum: {}'\
|
||||||
|
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
b = b''
|
||||||
|
b += shared.magic_bytes
|
||||||
|
b += self.command.ljust(12, b'\x00')
|
||||||
|
b += struct.pack('>L', self.payload_length)
|
||||||
|
b += self.payload_checksum
|
||||||
|
return b
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
magic_bytes, command, payload_length, payload_checksum = struct.unpack('>4s12sL4s', b)
|
||||||
|
|
||||||
|
if magic_bytes != shared.magic_bytes:
|
||||||
|
raise IOError('magic_bytes do not match')
|
||||||
|
|
||||||
|
command = command.rstrip(b'\x00')
|
||||||
|
|
||||||
|
return cls(command, payload_length, payload_checksum)
|
||||||
|
|
||||||
|
|
||||||
|
class Message(object):
|
||||||
|
def __init__(self, command, payload):
|
||||||
|
self.command = command
|
||||||
|
self.payload = payload
|
||||||
|
|
||||||
|
self.payload_length = len(payload)
|
||||||
|
self.payload_checksum = hashlib.sha512(payload).digest()[:4]
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '{}, payload_length: {}, payload_checksum: {}'\
|
||||||
|
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
b = Header(self.command, self.payload_length, self.payload_checksum).to_bytes()
|
||||||
|
b += self.payload
|
||||||
|
return b
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
h = Header.from_bytes(b[:24])
|
||||||
|
|
||||||
|
payload = b[24:]
|
||||||
|
payload_length = len(payload)
|
||||||
|
|
||||||
|
if payload_length != h.payload_length:
|
||||||
|
raise Exception('wrong payload length, expected {}, got {}'.format(h.payload_length, payload_length))
|
||||||
|
|
||||||
|
payload_checksum = hashlib.sha512(payload).digest()[:4]
|
||||||
|
|
||||||
|
if payload_checksum != h.payload_checksum:
|
||||||
|
raise Exception('wrong payload checksum, expected {}, got {}'.format(h.payload_checksum, payload_checksum))
|
||||||
|
|
||||||
|
return cls(h.command, payload)
|
||||||
|
|
||||||
|
|
||||||
|
class Version(object):
|
||||||
|
def __init__(self, host, port, protocol_version=shared.protocol_version, services=shared.services,
|
||||||
|
nonce=shared.nonce, user_agent=shared.user_agent):
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
self.protocol_version = protocol_version
|
||||||
|
self.services = services
|
||||||
|
self.nonce = nonce
|
||||||
|
self.user_agent = user_agent
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'version, protocol_version: {}, services: {}, host: {}, port: {}, nonce: {}, user_agent: {}'\
|
||||||
|
.format(self.protocol_version, self.services, self.host, self.port, base64.b16encode(self.nonce).decode(), self.user_agent)
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
payload = b''
|
||||||
|
payload += struct.pack('>I', self.protocol_version)
|
||||||
|
payload += struct.pack('>Q', self.services)
|
||||||
|
payload += struct.pack('>Q', int(time.time()))
|
||||||
|
payload += structure.NetAddrNoPrefix(shared.services, self.host, self.port).to_bytes()
|
||||||
|
payload += structure.NetAddrNoPrefix(shared.services, '127.0.0.1', 8444).to_bytes()
|
||||||
|
payload += self.nonce
|
||||||
|
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
|
||||||
|
payload += shared.user_agent
|
||||||
|
payload += 2 * structure.VarInt(1).to_bytes()
|
||||||
|
|
||||||
|
return Message(b'version', payload).to_bytes()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
m = Message.from_bytes(b)
|
||||||
|
|
||||||
|
payload = m.payload
|
||||||
|
|
||||||
|
protocol_version, services, t, net_addr_remote, net_addr_local, nonce = \
|
||||||
|
struct.unpack('>IQQ26s26s8s', payload[:80])
|
||||||
|
|
||||||
|
net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote)
|
||||||
|
|
||||||
|
host = net_addr_remote.host
|
||||||
|
port = net_addr_remote.port
|
||||||
|
|
||||||
|
payload = payload[80:]
|
||||||
|
|
||||||
|
user_agent_varint_length = structure.VarInt.length(payload[0])
|
||||||
|
user_agent_length = structure.VarInt.from_bytes(payload[:user_agent_varint_length]).n
|
||||||
|
|
||||||
|
payload = payload[user_agent_varint_length:]
|
||||||
|
|
||||||
|
user_agent = payload[:user_agent_length]
|
||||||
|
|
||||||
|
payload = payload[user_agent_length:]
|
||||||
|
|
||||||
|
# Assume it is stream 1
|
||||||
|
assert payload == b'\x01\x01'
|
||||||
|
|
||||||
|
return cls(host, port, protocol_version, services, nonce, user_agent)
|
||||||
|
|
||||||
|
|
||||||
|
class Inv(object):
|
||||||
|
def __init__(self, vectors):
|
||||||
|
self.vectors = set(vectors)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'inv, count: {}'.format(len(self.vectors))
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
return Message(b'inv', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_message(cls, m):
|
||||||
|
payload = m.payload
|
||||||
|
|
||||||
|
vector_count_varint_length = structure.VarInt.length(payload[0])
|
||||||
|
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
|
||||||
|
|
||||||
|
payload = payload[vector_count_varint_length:]
|
||||||
|
|
||||||
|
vectors = set()
|
||||||
|
|
||||||
|
while payload:
|
||||||
|
vectors.add(payload[:32])
|
||||||
|
payload = payload[32:]
|
||||||
|
|
||||||
|
return cls(vectors)
|
||||||
|
|
||||||
|
|
||||||
|
class GetData(object):
|
||||||
|
def __init__(self, vectors):
|
||||||
|
self.vectors = set(vectors)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'getdata, count: {}'.format(len(self.vectors))
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
return Message(b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_message(cls, m):
|
||||||
|
payload = m.payload
|
||||||
|
|
||||||
|
vector_count_varint_length = structure.VarInt.length(payload[0])
|
||||||
|
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
|
||||||
|
|
||||||
|
payload = payload[vector_count_varint_length:]
|
||||||
|
|
||||||
|
vectors = set()
|
||||||
|
|
||||||
|
while payload:
|
||||||
|
vectors.add(payload[:32])
|
||||||
|
payload = payload[32:]
|
||||||
|
|
||||||
|
return cls(vectors)
|
||||||
|
|
||||||
|
|
||||||
|
class Addr(object):
|
||||||
|
def __init__(self, addresses):
|
||||||
|
self.addresses = addresses
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'addr, count: {}'.format(len(self.addresses))
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
return Message(b'addr', structure.VarInt(len(self.addresses)).to_bytes() + b''.join({addr.to_bytes() for addr in self.addresses})).to_bytes()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_message(cls, m):
|
||||||
|
payload = m.payload
|
||||||
|
|
||||||
|
addr_count_varint_length = structure.VarInt.length(payload[0])
|
||||||
|
addr_count = structure.VarInt.from_bytes(payload[:addr_count_varint_length]).n
|
||||||
|
|
||||||
|
payload = payload[addr_count_varint_length:]
|
||||||
|
|
||||||
|
addresses = set()
|
||||||
|
|
||||||
|
while payload:
|
||||||
|
addresses.add(structure.NetAddr.from_bytes(payload[:38]))
|
||||||
|
payload = payload[38:]
|
||||||
|
|
||||||
|
return cls(addresses)
|
48
src/shared.py
Normal file
48
src/shared.py
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
|
||||||
|
listening_port = 8444
|
||||||
|
data_directory = 'minode_data/'
|
||||||
|
|
||||||
|
log_level = logging.DEBUG
|
||||||
|
|
||||||
|
magic_bytes = b'\xe9\xbe\xb4\xd9'
|
||||||
|
protocol_version = 3
|
||||||
|
services = 1 # NODE_NETWORK
|
||||||
|
stream = 1
|
||||||
|
nonce = os.urandom(8)
|
||||||
|
user_agent = b'MiNode-v0.0.1'
|
||||||
|
timeout = 600
|
||||||
|
header_length = 24
|
||||||
|
|
||||||
|
nonce_trials_per_byte = 1000
|
||||||
|
payload_length_extra_bytes = 1000
|
||||||
|
|
||||||
|
# Longer synchronization, lower bandwidth usage, not really working
|
||||||
|
conserve_bandwidth = False
|
||||||
|
requested_objects = set()
|
||||||
|
requested_objects_lock = threading.Lock()
|
||||||
|
|
||||||
|
shutting_down = False
|
||||||
|
|
||||||
|
vector_advertise_queue = queue.Queue()
|
||||||
|
address_advertise_queue = queue.Queue()
|
||||||
|
|
||||||
|
connections = set()
|
||||||
|
connections_lock = threading.Lock()
|
||||||
|
|
||||||
|
hosts = set()
|
||||||
|
|
||||||
|
core_nodes = set()
|
||||||
|
|
||||||
|
node_pool = set()
|
||||||
|
unchecked_node_pool = set()
|
||||||
|
|
||||||
|
outgoing_connections = 8
|
||||||
|
|
||||||
|
objects = {}
|
||||||
|
objects_lock = threading.Lock()
|
||||||
|
|
151
src/structure.py
Normal file
151
src/structure.py
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import struct
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
|
||||||
|
import shared
|
||||||
|
|
||||||
|
|
||||||
|
class VarInt(object):
|
||||||
|
def __init__(self, n):
|
||||||
|
self.n = n
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
if self.n < 0xfd:
|
||||||
|
return struct.pack('>B', self.n)
|
||||||
|
|
||||||
|
if self.n <= 0xffff:
|
||||||
|
return b'\xfd' + struct.pack('>H', self.n)
|
||||||
|
|
||||||
|
if self.n <= 0xffffffff:
|
||||||
|
return b'\xfe' + struct.pack('>I', self.n)
|
||||||
|
|
||||||
|
return b'\xff' + struct.pack('>Q', self.n)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def length(b):
|
||||||
|
if b == 0xfd:
|
||||||
|
return 3
|
||||||
|
if b == 0xfe:
|
||||||
|
return 5
|
||||||
|
if b == 0xff:
|
||||||
|
return 9
|
||||||
|
return 1
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
if cls.length(b[0]) > 1:
|
||||||
|
b = b[1:]
|
||||||
|
n = int.from_bytes(b, 'big')
|
||||||
|
return cls(n)
|
||||||
|
|
||||||
|
|
||||||
|
class Object(object):
|
||||||
|
def __init__(self, nonce, expires_time, object_type, version, stream_number, object_payload):
|
||||||
|
self.nonce = nonce
|
||||||
|
self.expires_time = expires_time
|
||||||
|
self.object_type = object_type
|
||||||
|
self.version = version
|
||||||
|
self.stream_number = stream_number
|
||||||
|
self.object_payload = object_payload
|
||||||
|
self.vector = hashlib.sha512(hashlib.sha512(self.to_bytes()).digest()).digest()[:32]
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'object, vector: {}'.format(base64.b16encode(self.vector).decode())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_message(cls, m):
|
||||||
|
payload = m.payload
|
||||||
|
nonce, expires_time, object_type = struct.unpack('>8sQL', payload[:20])
|
||||||
|
payload = payload[20:]
|
||||||
|
version_varint_length = VarInt.length(payload[0])
|
||||||
|
version = VarInt.from_bytes(payload[:version_varint_length]).n
|
||||||
|
payload = payload[version_varint_length:]
|
||||||
|
stream_number_varint_length = VarInt.length(payload[0])
|
||||||
|
stream_number = VarInt.from_bytes(payload[:stream_number_varint_length]).n
|
||||||
|
payload = payload[stream_number_varint_length:]
|
||||||
|
return cls(nonce, expires_time, object_type, version, stream_number, payload)
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
payload = b''
|
||||||
|
payload += self.nonce
|
||||||
|
payload += struct.pack('>QL', self.expires_time, self.object_type)
|
||||||
|
payload += VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes()
|
||||||
|
payload += self.object_payload
|
||||||
|
return payload
|
||||||
|
|
||||||
|
def is_expired(self):
|
||||||
|
return self.expires_time + 3 * 3600 < time.time()
|
||||||
|
|
||||||
|
def is_valid(self):
|
||||||
|
if self.is_expired():
|
||||||
|
return False
|
||||||
|
if len(self.object_payload) > 2**18:
|
||||||
|
return False
|
||||||
|
data = self.to_bytes()[8:]
|
||||||
|
length = len(data) + 8 + shared.payload_length_extra_bytes
|
||||||
|
dt = max(self.expires_time - time.time(), 0)
|
||||||
|
h = hashlib.sha512(data).digest()
|
||||||
|
pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big')
|
||||||
|
target = int(2**64/(shared.nonce_trials_per_byte*(length+(dt*length)/(2**16))))
|
||||||
|
if target < pow_value:
|
||||||
|
print('insufficient pow')
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class NetAddrNoPrefix(object):
|
||||||
|
def __init__(self, services, host, port):
|
||||||
|
self.services = services
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(self.services, self.host, self.port)
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
b = b''
|
||||||
|
b += struct.pack('>Q', self.services)
|
||||||
|
try:
|
||||||
|
host = socket.inet_pton(socket.AF_INET, self.host)
|
||||||
|
b += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + host
|
||||||
|
except socket.error:
|
||||||
|
b += socket.inet_pton(socket.AF_INET6, self.host)
|
||||||
|
b += struct.pack('>H', self.port)
|
||||||
|
return b
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
services, host, port = struct.unpack('>Q16sH', b)
|
||||||
|
if host.startswith(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'):
|
||||||
|
host = socket.inet_ntop(socket.AF_INET, host[-4:])
|
||||||
|
else:
|
||||||
|
host = socket.inet_ntop(socket.AF_INET6, host)
|
||||||
|
return cls(services, host, port)
|
||||||
|
|
||||||
|
|
||||||
|
class NetAddr(object):
|
||||||
|
def __init__(self, services, host, port, stream=shared.stream):
|
||||||
|
self.stream = stream
|
||||||
|
self.services = services
|
||||||
|
self.host = host
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'net_addr, stream: {}, services: {}, host: {}, port {}'\
|
||||||
|
.format(self.stream, self.services, self.host, self.port)
|
||||||
|
|
||||||
|
def to_bytes(self):
|
||||||
|
b = b''
|
||||||
|
b += struct.pack('>Q', int(time.time()))
|
||||||
|
b += struct.pack('>I', self.stream)
|
||||||
|
b += NetAddrNoPrefix(self.services, self.host, self.port).to_bytes()
|
||||||
|
return b
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, b):
|
||||||
|
t, stream, net_addr = struct.unpack('>QI26s', b)
|
||||||
|
n = NetAddrNoPrefix.from_bytes(net_addr)
|
||||||
|
return cls(n.services, n.host, n.port, stream)
|
Loading…
Reference in New Issue
Block a user