Code cleanup
This commit is contained in:
parent
ae799c295c
commit
2085f01281
|
@ -283,6 +283,7 @@ class Connection(threading.Thread):
|
||||||
self.send_queue.put(message.Version(self.host, self.port))
|
self.send_queue.put(message.Version(self.host, self.port))
|
||||||
else:
|
else:
|
||||||
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
||||||
|
|
||||||
elif m.command == b'verack':
|
elif m.command == b'verack':
|
||||||
self.verack_received = True
|
self.verack_received = True
|
||||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
|
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
|
||||||
|
@ -297,6 +298,7 @@ class Connection(threading.Thread):
|
||||||
self.vectors_to_get.update(to_get)
|
self.vectors_to_get.update(to_get)
|
||||||
# Do not send objects they already have.
|
# Do not send objects they already have.
|
||||||
self.vectors_to_send.difference_update(inv.vectors)
|
self.vectors_to_send.difference_update(inv.vectors)
|
||||||
|
|
||||||
elif m.command == b'object':
|
elif m.command == b'object':
|
||||||
obj = structure.Object.from_message(m)
|
obj = structure.Object.from_message(m)
|
||||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
|
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
|
||||||
|
@ -311,20 +313,25 @@ class Connection(threading.Thread):
|
||||||
logging.debug(dest)
|
logging.debug(dest)
|
||||||
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
||||||
shared.vector_advertise_queue.put(obj.vector)
|
shared.vector_advertise_queue.put(obj.vector)
|
||||||
|
|
||||||
elif m.command == b'getdata':
|
elif m.command == b'getdata':
|
||||||
getdata = message.GetData.from_message(m)
|
getdata = message.GetData.from_message(m)
|
||||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
|
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
|
||||||
self.vectors_to_send.update(getdata.vectors)
|
self.vectors_to_send.update(getdata.vectors)
|
||||||
|
|
||||||
elif m.command == b'addr':
|
elif m.command == b'addr':
|
||||||
addr = message.Addr.from_message(m)
|
addr = message.Addr.from_message(m)
|
||||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
|
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
|
||||||
for a in addr.addresses:
|
for a in addr.addresses:
|
||||||
shared.unchecked_node_pool.add((a.host, a.port))
|
shared.unchecked_node_pool.add((a.host, a.port))
|
||||||
|
|
||||||
elif m.command == b'ping':
|
elif m.command == b'ping':
|
||||||
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
|
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
|
||||||
self.send_queue.put(message.Message(b'pong', b''))
|
self.send_queue.put(message.Message(b'pong', b''))
|
||||||
|
|
||||||
elif m.command == b'error':
|
elif m.command == b'error':
|
||||||
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
|
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
|
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
|
||||||
|
|
||||||
|
|
|
@ -87,21 +87,7 @@ def parse_arguments():
|
||||||
shared.i2p_transient = True
|
shared.i2p_transient = True
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def load_data():
|
||||||
signal.signal(signal.SIGINT, handler)
|
|
||||||
signal.signal(signal.SIGTERM, handler)
|
|
||||||
|
|
||||||
parse_arguments()
|
|
||||||
|
|
||||||
logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
|
||||||
logging.info('Starting MiNode')
|
|
||||||
logging.info('Data directory: {}'.format(shared.data_directory))
|
|
||||||
if not os.path.exists(shared.data_directory):
|
|
||||||
try:
|
|
||||||
os.makedirs(shared.data_directory)
|
|
||||||
except Exception as e:
|
|
||||||
logging.warning('Error while creating data directory in: {}'.format(shared.data_directory))
|
|
||||||
logging.warning(e)
|
|
||||||
try:
|
try:
|
||||||
with open(shared.data_directory + 'objects.pickle', mode='br') as file:
|
with open(shared.data_directory + 'objects.pickle', mode='br') as file:
|
||||||
shared.objects = pickle.load(file)
|
shared.objects = pickle.load(file)
|
||||||
|
@ -133,7 +119,8 @@ def main():
|
||||||
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
|
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
|
||||||
shared.i2p_node_pool.update(shared.i2p_core_nodes)
|
shared.i2p_node_pool.update(shared.i2p_core_nodes)
|
||||||
|
|
||||||
if shared.ip_enabled and not shared.trusted_peer:
|
|
||||||
|
def bootstrap_from_dns():
|
||||||
try:
|
try:
|
||||||
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||||
shared.unchecked_node_pool.add((item[4][0], 8080))
|
shared.unchecked_node_pool.add((item[4][0], 8080))
|
||||||
|
@ -145,7 +132,33 @@ def main():
|
||||||
logging.error('Error during DNS bootstrap')
|
logging.error('Error during DNS bootstrap')
|
||||||
logging.error(e)
|
logging.error(e)
|
||||||
|
|
||||||
if shared.i2p_enabled:
|
|
||||||
|
def start_ip_listener():
|
||||||
|
listener_ipv4 = None
|
||||||
|
listener_ipv6 = None
|
||||||
|
|
||||||
|
if socket.has_ipv6:
|
||||||
|
try:
|
||||||
|
listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6)
|
||||||
|
listener_ipv6.start()
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port))
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
try:
|
||||||
|
listener_ipv4 = Listener(shared.listening_host, shared.listening_port)
|
||||||
|
listener_ipv4.start()
|
||||||
|
except Exception as e:
|
||||||
|
if listener_ipv6:
|
||||||
|
logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
||||||
|
'However the IPv6 one seems to be working and will probably accept IPv4 connections.')
|
||||||
|
else:
|
||||||
|
logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
||||||
|
'You will not receive incoming connections. Please check your port configuration')
|
||||||
|
logging.error(e)
|
||||||
|
|
||||||
|
|
||||||
|
def start_i2p_listener():
|
||||||
# Grab I2P destinations from old object file
|
# Grab I2P destinations from old object file
|
||||||
for obj in shared.objects.values():
|
for obj in shared.objects.values():
|
||||||
if obj.object_type == shared.i2p_dest_obj_type:
|
if obj.object_type == shared.i2p_dest_obj_type:
|
||||||
|
@ -193,37 +206,43 @@ def main():
|
||||||
logging.warning('Error while saving I2P destination public key.')
|
logging.warning('Error while saving I2P destination public key.')
|
||||||
logging.warning(e)
|
logging.warning(e)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
signal.signal(signal.SIGINT, handler)
|
||||||
|
signal.signal(signal.SIGTERM, handler)
|
||||||
|
|
||||||
|
parse_arguments()
|
||||||
|
|
||||||
|
logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
||||||
|
logging.info('Starting MiNode')
|
||||||
|
|
||||||
|
logging.info('Data directory: {}'.format(shared.data_directory))
|
||||||
|
if not os.path.exists(shared.data_directory):
|
||||||
|
try:
|
||||||
|
os.makedirs(shared.data_directory)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning('Error while creating data directory in: {}'.format(shared.data_directory))
|
||||||
|
logging.warning(e)
|
||||||
|
|
||||||
|
load_data()
|
||||||
|
|
||||||
|
if shared.ip_enabled and not shared.trusted_peer:
|
||||||
|
bootstrap_from_dns()
|
||||||
|
|
||||||
|
if shared.i2p_enabled:
|
||||||
|
# We are starting it before cleaning expired objects so we can collect I2P destination objects
|
||||||
|
start_i2p_listener()
|
||||||
|
|
||||||
manager = Manager()
|
manager = Manager()
|
||||||
manager.clean_objects()
|
manager.clean_objects()
|
||||||
manager.clean_connections()
|
|
||||||
manager.start()
|
manager.start()
|
||||||
|
|
||||||
advertiser = Advertiser()
|
advertiser = Advertiser()
|
||||||
advertiser.start()
|
advertiser.start()
|
||||||
|
|
||||||
listener_ipv4 = None
|
|
||||||
listener_ipv6 = None
|
|
||||||
|
|
||||||
if shared.listen_for_connections:
|
if shared.listen_for_connections:
|
||||||
if socket.has_ipv6:
|
start_ip_listener()
|
||||||
try:
|
|
||||||
listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6)
|
|
||||||
listener_ipv6.start()
|
|
||||||
except Exception as e:
|
|
||||||
logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port))
|
|
||||||
logging.warning(e)
|
|
||||||
|
|
||||||
try:
|
|
||||||
listener_ipv4 = Listener(shared.listening_host, shared.listening_port)
|
|
||||||
listener_ipv4.start()
|
|
||||||
except Exception as e:
|
|
||||||
if listener_ipv6:
|
|
||||||
logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
|
||||||
'However the IPv6 one seems to be working and will probably accept IPv4 connections.')
|
|
||||||
else:
|
|
||||||
logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
|
||||||
'You will not receive incoming connections. Please check your port configuration')
|
|
||||||
logging.error(e)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
multiprocessing.set_start_method('spawn')
|
multiprocessing.set_start_method('spawn')
|
||||||
|
|
|
@ -22,7 +22,7 @@ class Manager(threading.Thread):
|
||||||
self.last_cleaned_connections = time.time()
|
self.last_cleaned_connections = time.time()
|
||||||
self.last_pickled_objects = time.time()
|
self.last_pickled_objects = time.time()
|
||||||
self.last_pickled_nodes = time.time()
|
self.last_pickled_nodes = time.time()
|
||||||
self.last_published_i2p_destination = time.time() - 50 * 60 # Publish destination 10 minutes after start
|
self.last_published_i2p_destination = time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # Publish destination 5-15 minutes after start
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -35,7 +35,7 @@ class Manager(threading.Thread):
|
||||||
self.clean_objects()
|
self.clean_objects()
|
||||||
self.last_cleaned_objects = now
|
self.last_cleaned_objects = now
|
||||||
if now - self.last_cleaned_connections > 2:
|
if now - self.last_cleaned_connections > 2:
|
||||||
self.clean_connections()
|
self.manage_connections()
|
||||||
self.last_cleaned_connections = now
|
self.last_cleaned_connections = now
|
||||||
if now - self.last_pickled_objects > 100:
|
if now - self.last_pickled_objects > 100:
|
||||||
self.pickle_objects()
|
self.pickle_objects()
|
||||||
|
@ -56,7 +56,7 @@ class Manager(threading.Thread):
|
||||||
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def clean_connections():
|
def manage_connections():
|
||||||
hosts = set()
|
hosts = set()
|
||||||
outgoing_connections = 0
|
outgoing_connections = 0
|
||||||
for c in shared.connections.copy():
|
for c in shared.connections.copy():
|
||||||
|
@ -147,6 +147,7 @@ class Manager(threading.Thread):
|
||||||
shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000))
|
shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000))
|
||||||
if len(shared.i2p_unchecked_node_pool) > 100:
|
if len(shared.i2p_unchecked_node_pool) > 100:
|
||||||
shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100))
|
shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
|
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
|
||||||
pickle.dump(shared.node_pool, file, protocol=3)
|
pickle.dump(shared.node_pool, file, protocol=3)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user