2016-06-30 10:11:33 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import base64
|
|
|
|
import logging
|
|
|
|
import pickle
|
|
|
|
import queue
|
|
|
|
import random
|
|
|
|
import threading
|
|
|
|
import time
|
|
|
|
|
|
|
|
from connection import Connection
|
|
|
|
import shared
|
|
|
|
|
|
|
|
|
|
|
|
class Manager(threading.Thread):
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__(name='Manager')
|
|
|
|
self.q = queue.Queue()
|
|
|
|
self.last_cleaned_objects = time.time()
|
|
|
|
self.last_cleaned_connections = time.time()
|
|
|
|
self.last_pickled_objects = time.time()
|
|
|
|
self.last_pickled_nodes = time.time()
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
while True:
|
|
|
|
time.sleep(0.8)
|
|
|
|
now = time.time()
|
|
|
|
if now - self.last_cleaned_objects > 90:
|
|
|
|
self.clean_objects()
|
|
|
|
self.last_cleaned_objects = now
|
|
|
|
if now - self.last_cleaned_connections > 2:
|
|
|
|
self.clean_connections()
|
|
|
|
self.last_cleaned_connections = now
|
|
|
|
if now - self.last_pickled_objects > 100:
|
|
|
|
self.pickle_objects()
|
|
|
|
self.last_pickled_objects = now
|
|
|
|
if now - self.last_pickled_nodes > 60:
|
|
|
|
self.pickle_nodes()
|
|
|
|
self.last_pickled_nodes = now
|
|
|
|
|
|
|
|
if shared.shutting_down:
|
2016-10-15 16:12:09 +02:00
|
|
|
logging.debug('Shutting down connections')
|
2016-06-30 10:11:33 +02:00
|
|
|
self.shutdown_connections()
|
2016-10-15 16:12:09 +02:00
|
|
|
logging.debug('Shutting down Manager')
|
|
|
|
break
|
2016-06-30 10:11:33 +02:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def clean_objects():
|
|
|
|
for vector in set(shared.objects):
|
|
|
|
if shared.objects[vector].is_expired():
|
|
|
|
with shared.objects_lock:
|
|
|
|
del shared.objects[vector]
|
|
|
|
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def clean_connections():
|
|
|
|
hosts = set()
|
|
|
|
outgoing_connections = 0
|
2016-07-09 19:37:54 +02:00
|
|
|
for c in shared.connections.copy():
|
2016-06-30 10:11:33 +02:00
|
|
|
if not c.is_alive() or c.status == 'disconnected':
|
|
|
|
with shared.connections_lock:
|
|
|
|
shared.connections.remove(c)
|
|
|
|
else:
|
|
|
|
hosts.add(c.host)
|
|
|
|
if not c.server:
|
|
|
|
outgoing_connections += 1
|
2016-07-09 19:37:54 +02:00
|
|
|
if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections:
|
2016-06-30 10:11:33 +02:00
|
|
|
to_connect = set()
|
2016-08-17 17:48:45 +02:00
|
|
|
if len(shared.unchecked_node_pool) > 16:
|
|
|
|
to_connect.update(random.sample(shared.unchecked_node_pool, 16))
|
2016-06-30 10:11:33 +02:00
|
|
|
else:
|
|
|
|
to_connect.update(shared.unchecked_node_pool)
|
|
|
|
shared.unchecked_node_pool.difference_update(to_connect)
|
2016-08-17 17:48:45 +02:00
|
|
|
if len(shared.node_pool) > 8:
|
|
|
|
to_connect.update(random.sample(shared.node_pool, 8))
|
2016-06-30 10:11:33 +02:00
|
|
|
else:
|
|
|
|
to_connect.update(shared.node_pool)
|
|
|
|
for addr in to_connect:
|
|
|
|
if addr[0] in hosts:
|
|
|
|
continue
|
|
|
|
c = Connection(addr[0], addr[1])
|
|
|
|
c.start()
|
2016-07-05 09:12:20 +02:00
|
|
|
hosts.add(c.host)
|
2016-06-30 10:11:33 +02:00
|
|
|
with shared.connections_lock:
|
|
|
|
shared.connections.add(c)
|
2016-07-09 19:37:54 +02:00
|
|
|
shared.hosts = hosts
|
2016-06-30 10:11:33 +02:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def shutdown_connections():
|
|
|
|
for c in shared.connections.copy():
|
|
|
|
c.send_queue.put(None)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def pickle_objects():
|
|
|
|
try:
|
|
|
|
with open(shared.data_directory + 'objects.pickle', mode='bw') as file:
|
|
|
|
with shared.objects_lock:
|
|
|
|
pickle.dump(shared.objects, file, protocol=4)
|
|
|
|
logging.debug('Saved objects')
|
|
|
|
except Exception as e:
|
|
|
|
logging.warning('Error while saving objects')
|
|
|
|
logging.warning(e)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def pickle_nodes():
|
|
|
|
if len(shared.node_pool) > 10000:
|
|
|
|
shared.node_pool = set(random.sample(shared.node_pool, 10000))
|
|
|
|
if len(shared.unchecked_node_pool) > 1000:
|
|
|
|
shared.node_pool = set(random.sample(shared.unchecked_node_pool, 1000))
|
|
|
|
try:
|
|
|
|
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
|
|
|
|
pickle.dump(shared.node_pool, file, protocol=4)
|
|
|
|
logging.debug('Saved nodes')
|
|
|
|
except Exception as e:
|
|
|
|
logging.warning('Error while saving nodes')
|
|
|
|
logging.warning(e)
|