Formatted the code with flake8
This commit is contained in:
parent
66b0f43f08
commit
3fa84d1f4b
|
@ -34,7 +34,8 @@ class Advertiser(threading.Thread):
|
|||
while not shared.address_advertise_queue.empty():
|
||||
addr = shared.address_advertise_queue.get()
|
||||
if addr.port == 'i2p':
|
||||
# We should not try to construct Addr messages with I2P destinations (yet)
|
||||
# We should not try to construct Addr messages
|
||||
# with I2P destinations (yet)
|
||||
continue
|
||||
addresses_to_advertise.add(addr)
|
||||
if len(addresses_to_advertise) > 0:
|
||||
|
|
|
@ -14,7 +14,10 @@ from . import message, shared, structure
|
|||
|
||||
|
||||
class Connection(threading.Thread):
|
||||
def __init__(self, host, port, s=None, network='ip', server=False, i2p_remote_dest=b''):
|
||||
def __init__(
|
||||
self, host, port, s=None, network='ip', server=False,
|
||||
i2p_remote_dest=b''
|
||||
):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.network = network
|
||||
|
@ -72,17 +75,21 @@ class Connection(threading.Thread):
|
|||
else:
|
||||
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
||||
while True:
|
||||
if self.on_connection_fully_established_scheduled and not (self.buffer_send or self.buffer_receive):
|
||||
if (
|
||||
self.on_connection_fully_established_scheduled
|
||||
and not (self.buffer_send or self.buffer_receive)
|
||||
):
|
||||
self._on_connection_fully_established()
|
||||
data = True
|
||||
try:
|
||||
if self.status == 'fully_established':
|
||||
data = self.s.recv(4096)
|
||||
self.buffer_receive += data
|
||||
if data and len(self.buffer_receive) < 4000000:
|
||||
continue
|
||||
data = self.s.recv(4096)
|
||||
self.buffer_receive += data
|
||||
if data and len(self.buffer_receive) < 4000000:
|
||||
continue
|
||||
else:
|
||||
data = self.s.recv(self.next_message_size - len(self.buffer_receive))
|
||||
data = self.s.recv(
|
||||
self.next_message_size - len(self.buffer_receive))
|
||||
self.buffer_receive += data
|
||||
except ssl.SSLWantReadError:
|
||||
if self.status == 'fully_established':
|
||||
|
@ -90,49 +97,68 @@ class Connection(threading.Thread):
|
|||
self._send_objects()
|
||||
except socket.error as e:
|
||||
err = e.args[0]
|
||||
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
|
||||
if err in (errno.EAGAIN, errno.EWOULDBLOCK):
|
||||
if self.status == 'fully_established':
|
||||
self._request_objects()
|
||||
self._send_objects()
|
||||
else:
|
||||
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
|
||||
logging.debug(
|
||||
'Disconnecting from %s:%s. Reason: %s',
|
||||
self.host_print, self.port, e)
|
||||
data = None
|
||||
except ConnectionResetError:
|
||||
logging.debug('Disconnecting from {}:{}. Reason: ConnectionResetError'.format(self.host_print, self.port))
|
||||
logging.debug(
|
||||
'Disconnecting from %s:%s. Reason: ConnectionResetError',
|
||||
self.host_print, self.port)
|
||||
self.status = 'disconnecting'
|
||||
self._process_buffer_receive()
|
||||
self._process_queue()
|
||||
self._send_data()
|
||||
if time.time() - self.last_message_received > shared.timeout:
|
||||
logging.debug(
|
||||
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > shared.timeout'.format(
|
||||
self.host_print, self.port))
|
||||
'Disconnecting from %s:%s. Reason:'
|
||||
' time.time() - self.last_message_received'
|
||||
' > shared.timeout', self.host_print, self.port)
|
||||
self.status = 'disconnecting'
|
||||
if time.time() - self.last_message_received > 30 and self.status != 'fully_established'and self.status != 'disconnecting':
|
||||
if (
|
||||
time.time() - self.last_message_received > 30
|
||||
and self.status != 'fully_established'
|
||||
and self.status != 'disconnecting'
|
||||
):
|
||||
logging.debug(
|
||||
'Disconnecting from {}:{}. Reason: time.time() - self.last_message_received > 30 and self.status != \'fully_established\''.format(
|
||||
self.host_print, self.port))
|
||||
'Disconnecting from %s:%s. Reason:'
|
||||
' time.time() - self.last_message_received > 30'
|
||||
' and self.status != "fully_established"',
|
||||
self.host_print, self.port)
|
||||
self.status = 'disconnecting'
|
||||
if time.time() - self.last_message_sent > 300 and self.status == 'fully_established':
|
||||
if (
|
||||
time.time() - self.last_message_sent > 300
|
||||
and self.status == 'fully_established'
|
||||
):
|
||||
self.send_queue.put(message.Message(b'pong', b''))
|
||||
if self.status == 'disconnecting' or shared.shutting_down:
|
||||
data = None
|
||||
if not data:
|
||||
self.status = 'disconnected'
|
||||
self.s.close()
|
||||
logging.info('Disconnected from {}:{}'.format(self.host_print, self.port))
|
||||
logging.info(
|
||||
'Disconnected from %s:%s', self.host_print, self.port)
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
def _connect(self):
|
||||
logging.debug('Connecting to {}:{}'.format(self.host_print, self.port))
|
||||
logging.debug('Connecting to %s:%s', self.host_print, self.port)
|
||||
|
||||
try:
|
||||
self.s = socket.create_connection((self.host, self.port), 10)
|
||||
self.status = 'connected'
|
||||
logging.info('Established TCP connection to {}:{}'.format(self.host_print, self.port))
|
||||
logging.info(
|
||||
'Established TCP connection to %s:%s',
|
||||
self.host_print, self.port)
|
||||
except Exception as e:
|
||||
logging.warning('Connection to {}:{} failed. Reason: {}'.format(self.host_print, self.port, e))
|
||||
logging.warning(
|
||||
'Connection to %s:%s failed. Reason: %s',
|
||||
self.host_print, self.port, e)
|
||||
self.status = 'failed'
|
||||
|
||||
def _send_data(self):
|
||||
|
@ -142,27 +168,38 @@ class Connection(threading.Thread):
|
|||
self.buffer_send = self.buffer_send[amount:]
|
||||
except (BlockingIOError, ssl.SSLWantWriteError):
|
||||
pass
|
||||
except (BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError) as e:
|
||||
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
|
||||
except (
|
||||
BrokenPipeError, ConnectionResetError, ssl.SSLError, OSError
|
||||
) as e:
|
||||
logging.debug(
|
||||
'Disconnecting from %s:%s. Reason: %s',
|
||||
self.host_print, self.port, e)
|
||||
self.status = 'disconnecting'
|
||||
|
||||
def _do_tls_handshake(self):
|
||||
logging.debug('Initializing TLS connection with {}:{}'.format(self.host_print, self.port))
|
||||
logging.debug(
|
||||
'Initializing TLS connection with %s:%s',
|
||||
self.host_print, self.port)
|
||||
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
if ssl.OPENSSL_VERSION_NUMBER >= 0x10100000 and not ssl.OPENSSL_VERSION.startswith("LibreSSL"):
|
||||
# OpenSSL>=1.1
|
||||
if (
|
||||
ssl.OPENSSL_VERSION_NUMBER >= 0x10100000
|
||||
and not ssl.OPENSSL_VERSION.startswith("LibreSSL")
|
||||
): # OpenSSL>=1.1
|
||||
context.set_ciphers('AECDH-AES256-SHA@SECLEVEL=0')
|
||||
else:
|
||||
context.set_ciphers('AECDH-AES256-SHA')
|
||||
|
||||
context.set_ecdh_curve("secp256k1")
|
||||
context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE
|
||||
context.options = (
|
||||
ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3
|
||||
| ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE)
|
||||
|
||||
self.s = context.wrap_socket(self.s, server_side=self.server, do_handshake_on_connect=False)
|
||||
self.s = context.wrap_socket(
|
||||
self.s, server_side=self.server, do_handshake_on_connect=False)
|
||||
|
||||
while True:
|
||||
try:
|
||||
|
@ -173,39 +210,57 @@ class Connection(threading.Thread):
|
|||
except ssl.SSLWantWriteError:
|
||||
select.select([], [self.s], [])
|
||||
except Exception as e:
|
||||
logging.debug('Disconnecting from {}:{}. Reason: {}'.format(self.host_print, self.port, e))
|
||||
logging.debug(
|
||||
'Disconnecting from %s:%s. Reason: %s',
|
||||
self.host_print, self.port, e)
|
||||
self.status = 'disconnecting'
|
||||
break
|
||||
self.tls = True
|
||||
logging.debug('Established TLS connection with {}:{}'.format(self.host_print, self.port))
|
||||
logging.debug(
|
||||
'Established TLS connection with %s:%s',
|
||||
self.host_print, self.port)
|
||||
|
||||
def _send_message(self, m):
|
||||
if type(m) == message.Message and m.command == b'object':
|
||||
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, structure.Object.from_message(m)))
|
||||
if isinstance(m, message.Message) and m.command == b'object':
|
||||
logging.debug(
|
||||
'%s:%s <- %s',
|
||||
self.host_print, self.port, structure.Object.from_message(m))
|
||||
else:
|
||||
logging.debug('{}:{} <- {}'.format(self.host_print, self.port, m))
|
||||
logging.debug('%s:%s <- %s', self.host_print, self.port, m)
|
||||
self.buffer_send += m.to_bytes()
|
||||
|
||||
def _on_connection_fully_established(self):
|
||||
logging.info('Established Bitmessage protocol connection to {}:{}'.format(self.host_print, self.port))
|
||||
logging.info(
|
||||
'Established Bitmessage protocol connection to %s:%s',
|
||||
self.host_print, self.port)
|
||||
self.on_connection_fully_established_scheduled = False
|
||||
if self.remote_version.services & 2 and self.network == 'ip': # NODE_SSL
|
||||
self._do_tls_handshake()
|
||||
if self.remote_version.services & 2 and self.network == 'ip':
|
||||
self._do_tls_handshake() # NODE_SSL
|
||||
|
||||
addr = {structure.NetAddr(c.remote_version.services, c.host, c.port) for c in shared.connections if c.network != 'i2p' and c.server is False and c.status == 'fully_established'}
|
||||
addr = {
|
||||
structure.NetAddr(c.remote_version.services, c.host, c.port)
|
||||
for c in shared.connections if c.network != 'i2p'
|
||||
and c.server is False and c.status == 'fully_established'}
|
||||
if len(shared.node_pool) > 10:
|
||||
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.node_pool, 10)})
|
||||
addr.update({
|
||||
structure.NetAddr(1, a[0], a[1])
|
||||
for a in random.sample(shared.node_pool, 10)})
|
||||
if len(shared.unchecked_node_pool) > 10:
|
||||
addr.update({structure.NetAddr(1, a[0], a[1]) for a in random.sample(shared.unchecked_node_pool, 10)})
|
||||
addr.update({
|
||||
structure.NetAddr(1, a[0], a[1])
|
||||
for a in random.sample(shared.unchecked_node_pool, 10)})
|
||||
if len(addr) != 0:
|
||||
self.send_queue.put(message.Addr(addr))
|
||||
|
||||
with shared.objects_lock:
|
||||
if len(shared.objects) > 0:
|
||||
to_send = {vector for vector in shared.objects.keys() if shared.objects[vector].expires_time > time.time()}
|
||||
to_send = {
|
||||
vector for vector in shared.objects.keys()
|
||||
if shared.objects[vector].expires_time > time.time()}
|
||||
while len(to_send) > 0:
|
||||
if len(to_send) > 10000:
|
||||
# We limit size of inv messaged to 10000 entries because they might time out in very slow networks (I2P)
|
||||
# We limit size of inv messaged to 10000 entries
|
||||
# because they might time out in very slow networks (I2P)
|
||||
pack = random.sample(to_send, 10000)
|
||||
self.send_queue.put(message.Inv(pack))
|
||||
to_send.difference_update(pack)
|
||||
|
@ -232,35 +287,47 @@ class Connection(threading.Thread):
|
|||
if self.next_header:
|
||||
self.next_header = False
|
||||
try:
|
||||
h = message.Header.from_bytes(self.buffer_receive[:shared.header_length])
|
||||
h = message.Header.from_bytes(
|
||||
self.buffer_receive[:shared.header_length])
|
||||
except ValueError as e:
|
||||
self.status = 'disconnecting'
|
||||
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
|
||||
logging.warning(
|
||||
'Received malformed message from %s:%s: %s',
|
||||
self.host_print, self.port, e)
|
||||
break
|
||||
self.next_message_size += h.payload_length
|
||||
else:
|
||||
try:
|
||||
m = message.Message.from_bytes(self.buffer_receive[:self.next_message_size])
|
||||
m = message.Message.from_bytes(
|
||||
self.buffer_receive[:self.next_message_size])
|
||||
except ValueError as e:
|
||||
self.status = 'disconnecting'
|
||||
logging.warning('Received malformed message from {}:{}, {}'.format(self.host_print, self.port, e))
|
||||
logging.warning(
|
||||
'Received malformed message from %s:%s, %s',
|
||||
self.host_print, self.port, e)
|
||||
break
|
||||
self.next_header = True
|
||||
self.buffer_receive = self.buffer_receive[self.next_message_size:]
|
||||
self.buffer_receive = self.buffer_receive[
|
||||
self.next_message_size:]
|
||||
self.next_message_size = shared.header_length
|
||||
self.last_message_received = time.time()
|
||||
try:
|
||||
self._process_message(m)
|
||||
except ValueError as e:
|
||||
self.status = 'disconnecting'
|
||||
logging.warning('Received malformed message from {}:{}: {}'.format(self.host_print, self.port, e))
|
||||
logging.warning(
|
||||
'Received malformed message from %s:%s: %s',
|
||||
self.host_print, self.port, e)
|
||||
break
|
||||
|
||||
def _process_message(self, m):
|
||||
if m.command == b'version':
|
||||
version = message.Version.from_bytes(m.to_bytes())
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, str(version)))
|
||||
if version.protocol_version != shared.protocol_version or version.nonce == shared.nonce:
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
|
||||
if (
|
||||
version.protocol_version != shared.protocol_version
|
||||
or version.nonce == shared.nonce
|
||||
):
|
||||
self.status = 'disconnecting'
|
||||
self.send_queue.put(None)
|
||||
else:
|
||||
|
@ -270,27 +337,31 @@ class Connection(threading.Thread):
|
|||
if not self.server:
|
||||
self.send_queue.put('fully_established')
|
||||
if self.network == 'ip':
|
||||
shared.address_advertise_queue.put(structure.NetAddr(version.services, self.host, self.port))
|
||||
shared.address_advertise_queue.put(structure.NetAddr(
|
||||
version.services, self.host, self.port))
|
||||
shared.node_pool.add((self.host, self.port))
|
||||
elif self.network == 'i2p':
|
||||
shared.i2p_node_pool.add((self.host, 'i2p'))
|
||||
if self.network == 'ip':
|
||||
shared.address_advertise_queue.put(structure.NetAddr(shared.services, version.host, shared.listening_port))
|
||||
shared.address_advertise_queue.put(structure.NetAddr(
|
||||
shared.services, version.host, shared.listening_port))
|
||||
if self.server:
|
||||
if self.network == 'ip':
|
||||
self.send_queue.put(message.Version(self.host, self.port))
|
||||
self.send_queue.put(
|
||||
message.Version(self.host, self.port))
|
||||
else:
|
||||
self.send_queue.put(message.Version('127.0.0.1', 7656))
|
||||
|
||||
elif m.command == b'verack':
|
||||
self.verack_received = True
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, 'verack'))
|
||||
logging.debug(
|
||||
'%s:%s -> %s', self.host_print, self.port, 'verack')
|
||||
if self.server:
|
||||
self.send_queue.put('fully_established')
|
||||
|
||||
elif m.command == b'inv':
|
||||
inv = message.Inv.from_message(m)
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, inv))
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
|
||||
to_get = inv.vectors.copy()
|
||||
to_get.difference_update(shared.objects.keys())
|
||||
self.vectors_to_get.update(to_get)
|
||||
|
@ -299,39 +370,45 @@ class Connection(threading.Thread):
|
|||
|
||||
elif m.command == b'object':
|
||||
obj = structure.Object.from_message(m)
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, obj))
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, obj)
|
||||
self.vectors_requested.pop(obj.vector, None)
|
||||
self.vectors_to_get.discard(obj.vector)
|
||||
if obj.is_valid() and obj.vector not in shared.objects:
|
||||
with shared.objects_lock:
|
||||
shared.objects[obj.vector] = obj
|
||||
if obj.object_type == shared.i2p_dest_obj_type and obj.version == shared.i2p_dest_obj_version:
|
||||
if (
|
||||
obj.object_type == shared.i2p_dest_obj_type
|
||||
and obj.version == shared.i2p_dest_obj_version
|
||||
):
|
||||
dest = base64.b64encode(obj.object_payload, altchars=b'-~')
|
||||
logging.debug('Received I2P destination object, adding to i2p_unchecked_node_pool')
|
||||
logging.debug(
|
||||
'Received I2P destination object,'
|
||||
' adding to i2p_unchecked_node_pool')
|
||||
logging.debug(dest)
|
||||
shared.i2p_unchecked_node_pool.add((dest, 'i2p'))
|
||||
shared.vector_advertise_queue.put(obj.vector)
|
||||
|
||||
elif m.command == b'getdata':
|
||||
getdata = message.GetData.from_message(m)
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, getdata))
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, getdata)
|
||||
self.vectors_to_send.update(getdata.vectors)
|
||||
|
||||
elif m.command == b'addr':
|
||||
addr = message.Addr.from_message(m)
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, addr))
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, addr)
|
||||
for a in addr.addresses:
|
||||
shared.unchecked_node_pool.add((a.host, a.port))
|
||||
|
||||
elif m.command == b'ping':
|
||||
logging.debug('{}:{} -> ping'.format(self.host_print, self.port))
|
||||
logging.debug('%s:%s -> ping', self.host_print, self.port)
|
||||
self.send_queue.put(message.Message(b'pong', b''))
|
||||
|
||||
elif m.command == b'error':
|
||||
logging.error('{}:{} -> error: {}'.format(self.host_print, self.port, m.payload))
|
||||
logging.error(
|
||||
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
|
||||
|
||||
else:
|
||||
logging.debug('{}:{} -> {}'.format(self.host_print, self.port, m))
|
||||
logging.debug('%s:%s -> %s', self.host_print, self.port, m)
|
||||
|
||||
def _request_objects(self):
|
||||
if self.vectors_to_get and len(self.vectors_requested) < 100:
|
||||
|
@ -340,18 +417,28 @@ class Connection(threading.Thread):
|
|||
if len(self.vectors_to_get) > 64:
|
||||
pack = random.sample(self.vectors_to_get, 64)
|
||||
self.send_queue.put(message.GetData(pack))
|
||||
self.vectors_requested.update({vector: time.time() for vector in pack if vector not in self.vectors_requested})
|
||||
self.vectors_requested.update({
|
||||
vector: time.time() for vector in pack
|
||||
if vector not in self.vectors_requested})
|
||||
self.vectors_to_get.difference_update(pack)
|
||||
else:
|
||||
self.send_queue.put(message.GetData(self.vectors_to_get))
|
||||
self.vectors_requested.update({vector: time.time() for vector in self.vectors_to_get if vector not in self.vectors_requested})
|
||||
self.vectors_requested.update({
|
||||
vector: time.time() for vector in self.vectors_to_get
|
||||
if vector not in self.vectors_requested})
|
||||
self.vectors_to_get.clear()
|
||||
if self.vectors_requested:
|
||||
self.vectors_requested = {vector: t for vector, t in self.vectors_requested.items() if vector not in shared.objects and t > time.time() - 15 * 60}
|
||||
to_re_request = {vector for vector, t in self.vectors_requested.items() if t < time.time() - 10 * 60}
|
||||
self.vectors_requested = {
|
||||
vector: t for vector, t in self.vectors_requested.items()
|
||||
if vector not in shared.objects and t > time.time() - 15 * 60}
|
||||
to_re_request = {
|
||||
vector for vector, t in self.vectors_requested.items()
|
||||
if t < time.time() - 10 * 60}
|
||||
if to_re_request:
|
||||
self.vectors_to_get.update(to_re_request)
|
||||
logging.debug('Re-requesting {} objects from {}:{}'.format(len(to_re_request), self.host_print, self.port))
|
||||
logging.debug(
|
||||
'Re-requesting %i objects from %s:%s',
|
||||
len(to_re_request), self.host_print, self.port)
|
||||
|
||||
def _send_objects(self):
|
||||
if self.vectors_to_send:
|
||||
|
|
|
@ -23,7 +23,8 @@ class I2PController(threading.Thread):
|
|||
self.s = socket.create_connection((self.host, self.port))
|
||||
break
|
||||
except ConnectionRefusedError:
|
||||
logging.error("Error while connecting to I2P SAM bridge. Retrying.")
|
||||
logging.error(
|
||||
'Error while connecting to I2P SAM bridge. Retrying.')
|
||||
time.sleep(10)
|
||||
|
||||
self.version_reply = []
|
||||
|
@ -42,11 +43,11 @@ class I2PController(threading.Thread):
|
|||
|
||||
def _receive_line(self):
|
||||
line = receive_line(self.s)
|
||||
# logging.debug('I2PController <- ' + str(line))
|
||||
# logging.debug('I2PController <- %s', line)
|
||||
return line
|
||||
|
||||
def _send(self, command):
|
||||
# logging.debug('I2PController -> ' + str(command))
|
||||
# logging.debug('I2PController -> %s', command)
|
||||
self.s.sendall(command)
|
||||
|
||||
def init_connection(self):
|
||||
|
@ -78,7 +79,8 @@ class I2PController(threading.Thread):
|
|||
reply = self._receive_line().split()
|
||||
if b'RESULT=OK' not in reply:
|
||||
logging.warning(reply)
|
||||
logging.warning('We could not create I2P session, retrying in 5 seconds.')
|
||||
logging.warning(
|
||||
'We could not create I2P session, retrying in 5 seconds.')
|
||||
time.sleep(5)
|
||||
self.create_session()
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ class I2PDialer(threading.Thread):
|
|||
self.success = True
|
||||
|
||||
def run(self):
|
||||
logging.debug('Connecting to {}'.format(self.destination))
|
||||
logging.debug('Connecting to %s', self.destination)
|
||||
self._connect()
|
||||
if not self.state.shutting_down and self.success:
|
||||
c = self.state.connection(
|
||||
|
@ -36,22 +36,25 @@ class I2PDialer(threading.Thread):
|
|||
|
||||
def _receive_line(self):
|
||||
line = receive_line(self.s)
|
||||
# logging.debug('I2PDialer <- ' + str(line))
|
||||
# logging.debug('I2PDialer <- %s', line)
|
||||
return line
|
||||
|
||||
def _send(self, command):
|
||||
# logging.debug('I2PDialer -> ' + str(command))
|
||||
# logging.debug('I2PDialer -> %s', command)
|
||||
self.s.sendall(command)
|
||||
|
||||
def _connect(self):
|
||||
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
|
||||
self.version_reply = self._receive_line().split()
|
||||
if b'RESULT=OK' not in self.version_reply:
|
||||
logging.warning('Error while connecting to {}'.format(self.destination))
|
||||
logging.warning('Error while connecting to %s', self.destination)
|
||||
self.success = False
|
||||
|
||||
self._send(b'STREAM CONNECT ID=' + self.nick + b' DESTINATION=' + self.destination + b'\n')
|
||||
self._send(
|
||||
b'STREAM CONNECT ID=' + self.nick + b' DESTINATION='
|
||||
+ self.destination + b'\n')
|
||||
reply = self._receive_line().split(b' ')
|
||||
if b'RESULT=OK' not in reply:
|
||||
logging.warning('Error while connecting to {}'.format(self.destination))
|
||||
logging.warning(
|
||||
'Error while connecting to %s', self.destination)
|
||||
self.success = False
|
||||
|
|
|
@ -23,11 +23,11 @@ class I2PListener(threading.Thread):
|
|||
|
||||
def _receive_line(self):
|
||||
line = receive_line(self.s)
|
||||
# logging.debug('I2PListener <- ' + str(line))
|
||||
# logging.debug('I2PListener <- %s', line)
|
||||
return line
|
||||
|
||||
def _send(self, command):
|
||||
# logging.debug('I2PListener -> ' + str(command))
|
||||
# logging.debug('I2PListener -> %s', command)
|
||||
self.s.sendall(command)
|
||||
|
||||
def new_socket(self):
|
||||
|
@ -46,7 +46,8 @@ class I2PListener(threading.Thread):
|
|||
while not self.state.shutting_down:
|
||||
try:
|
||||
destination = self._receive_line().split()[0]
|
||||
logging.info('Incoming I2P connection from: {}'.format(destination.decode()))
|
||||
logging.info(
|
||||
'Incoming I2P connection from: %s', destination.decode())
|
||||
|
||||
hosts = set()
|
||||
for c in self.state.connections.copy():
|
||||
|
|
|
@ -16,11 +16,14 @@ def receive_line(s):
|
|||
|
||||
def pub_from_priv(priv):
|
||||
priv = base64.b64decode(priv, altchars=b'-~')
|
||||
# 256 for public key + 128 for signing key + 3 for certificate header + value of bytes priv[385:387]
|
||||
# 256 for public key + 128 for signing key + 3 for certificate header
|
||||
# + value of bytes priv[385:387]
|
||||
pub = priv[:387 + int.from_bytes(priv[385:387], byteorder='big')]
|
||||
pub = base64.b64encode(pub, altchars=b'-~')
|
||||
return pub
|
||||
|
||||
|
||||
def b32_from_pub(pub):
|
||||
return base64.b32encode(hashlib.sha256(base64.b64decode(pub, b'-~')).digest()).replace(b"=", b"").lower() + b'.b32.i2p'
|
||||
return base64.b32encode(
|
||||
hashlib.sha256(base64.b64decode(pub, b'-~')).digest()
|
||||
).replace(b"=", b"").lower() + b'.b32.i2p'
|
||||
|
|
|
@ -26,12 +26,12 @@ class Listener(threading.Thread):
|
|||
break
|
||||
try:
|
||||
conn, addr = self.s.accept()
|
||||
logging.info('Incoming connection from: {}:{}'.format(addr[0], addr[1]))
|
||||
logging.info('Incoming connection from: %s:%s', *addr)
|
||||
with shared.connections_lock:
|
||||
if len(shared.connections) > shared.connection_limit:
|
||||
conn.close()
|
||||
else:
|
||||
c = Connection(addr[0], addr[1], conn, 'ip', True)
|
||||
c = Connection(*addr, conn, server=True)
|
||||
c.start()
|
||||
shared.connections.add(c)
|
||||
except socket.timeout:
|
||||
|
|
169
minode/main.py
169
minode/main.py
|
@ -24,18 +24,32 @@ def parse_arguments():
|
|||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
|
||||
parser.add_argument('--host', help='Listening host')
|
||||
parser.add_argument('--debug', help='Enable debug logging', action='store_true')
|
||||
parser.add_argument(
|
||||
'--debug', action='store_true', help='Enable debug logging')
|
||||
parser.add_argument('--data-dir', help='Path to data directory')
|
||||
parser.add_argument('--no-incoming', help='Do not listen for incoming connections', action='store_true')
|
||||
parser.add_argument('--no-outgoing', help='Do not send outgoing connections', action='store_true')
|
||||
parser.add_argument('--no-ip', help='Do not use IP network', action='store_true')
|
||||
parser.add_argument('--trusted-peer', help='Specify a trusted peer we should connect to')
|
||||
parser.add_argument('--connection-limit', help='Maximum number of connections', type=int)
|
||||
parser.add_argument('--i2p', help='Enable I2P support (uses SAMv3)', action='store_true')
|
||||
parser.add_argument('--i2p-tunnel-length', help='Length of I2P tunnels', type=int)
|
||||
parser.add_argument('--i2p-sam-host', help='Host of I2P SAMv3 bridge')
|
||||
parser.add_argument('--i2p-sam-port', help='Port of I2P SAMv3 bridge', type=int)
|
||||
parser.add_argument('--i2p-transient', help='Generate new I2P destination on start', action='store_true')
|
||||
parser.add_argument(
|
||||
'--no-incoming', action='store_true',
|
||||
help='Do not listen for incoming connections')
|
||||
parser.add_argument(
|
||||
'--no-outgoing', action='store_true',
|
||||
help='Do not send outgoing connections')
|
||||
parser.add_argument(
|
||||
'--no-ip', action='store_true', help='Do not use IP network')
|
||||
parser.add_argument(
|
||||
'--trusted-peer', help='Specify a trusted peer we should connect to')
|
||||
parser.add_argument(
|
||||
'--connection-limit', type=int, help='Maximum number of connections')
|
||||
parser.add_argument(
|
||||
'--i2p', action='store_true', help='Enable I2P support (uses SAMv3)')
|
||||
parser.add_argument(
|
||||
'--i2p-tunnel-length', type=int, help='Length of I2P tunnels')
|
||||
parser.add_argument(
|
||||
'--i2p-sam-host', help='Host of I2P SAMv3 bridge')
|
||||
parser.add_argument(
|
||||
'--i2p-sam-port', type=int, help='Port of I2P SAMv3 bridge')
|
||||
parser.add_argument(
|
||||
'--i2p-transient', action='store_true',
|
||||
help='Generate new I2P destination on start')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.port:
|
||||
|
@ -87,32 +101,44 @@ def parse_arguments():
|
|||
|
||||
def load_data():
|
||||
try:
|
||||
with open(shared.data_directory + 'objects.pickle', mode='br') as file:
|
||||
shared.objects = pickle.load(file)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
|
||||
) as src:
|
||||
shared.objects = pickle.load(src)
|
||||
except Exception as e:
|
||||
logging.warning('Error while loading objects from disk.')
|
||||
logging.warning(e)
|
||||
|
||||
try:
|
||||
with open(shared.data_directory + 'nodes.pickle', mode='br') as file:
|
||||
shared.node_pool = pickle.load(file)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
|
||||
) as src:
|
||||
shared.node_pool = pickle.load(src)
|
||||
except Exception as e:
|
||||
logging.warning('Error while loading nodes from disk.')
|
||||
logging.warning(e)
|
||||
|
||||
try:
|
||||
with open(shared.data_directory + 'i2p_nodes.pickle', mode='br') as file:
|
||||
shared.i2p_node_pool = pickle.load(file)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
|
||||
) as src:
|
||||
shared.i2p_node_pool = pickle.load(src)
|
||||
except Exception as e:
|
||||
logging.warning('Error while loading nodes from disk.')
|
||||
logging.warning(e)
|
||||
|
||||
with open(os.path.join(shared.source_directory, 'core_nodes.csv'), mode='r', newline='') as f:
|
||||
reader = csv.reader(f)
|
||||
with open(
|
||||
os.path.join(shared.source_directory, 'core_nodes.csv'),
|
||||
'r', newline=''
|
||||
) as src:
|
||||
reader = csv.reader(src)
|
||||
shared.core_nodes = {tuple(row) for row in reader}
|
||||
shared.node_pool.update(shared.core_nodes)
|
||||
|
||||
with open(os.path.join(shared.source_directory, 'i2p_core_nodes.csv'), mode='r', newline='') as f:
|
||||
with open(
|
||||
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
|
||||
'r', newline=''
|
||||
) as f:
|
||||
reader = csv.reader(f)
|
||||
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
|
||||
shared.i2p_node_pool.update(shared.i2p_core_nodes)
|
||||
|
@ -122,13 +148,16 @@ def bootstrap_from_dns():
|
|||
try:
|
||||
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||
shared.unchecked_node_pool.add((item[4][0], 8080))
|
||||
logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method')
|
||||
logging.debug(
|
||||
'Adding %s to unchecked_node_pool'
|
||||
' based on DNS bootstrap method', item[4][0])
|
||||
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
||||
shared.unchecked_node_pool.add((item[4][0], 8444))
|
||||
logging.debug('Adding ' + item[4][0] + ' to unchecked_node_pool based on DNS bootstrap method')
|
||||
except Exception as e:
|
||||
logging.error('Error during DNS bootstrap')
|
||||
logging.error(e)
|
||||
logging.debug(
|
||||
'Adding %s to unchecked_node_pool'
|
||||
' based on DNS bootstrap method', item[4][0])
|
||||
except Exception:
|
||||
logging.error('Error during DNS bootstrap', exc_info=True)
|
||||
|
||||
|
||||
def start_ip_listener():
|
||||
|
@ -137,37 +166,48 @@ def start_ip_listener():
|
|||
|
||||
if socket.has_ipv6:
|
||||
try:
|
||||
listener_ipv6 = Listener(shared.listening_host, shared.listening_port, family=socket.AF_INET6)
|
||||
listener_ipv6 = Listener(
|
||||
shared.listening_host,
|
||||
shared.listening_port, family=socket.AF_INET6)
|
||||
listener_ipv6.start()
|
||||
except Exception as e:
|
||||
logging.warning('Error while starting IPv6 listener on port {}'.format(shared.listening_port))
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning(
|
||||
'Error while starting IPv6 listener on port %s',
|
||||
shared.listening_port, exc_info=True)
|
||||
|
||||
try:
|
||||
listener_ipv4 = Listener(shared.listening_host, shared.listening_port)
|
||||
listener_ipv4.start()
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
if listener_ipv6:
|
||||
logging.warning('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
||||
'However the IPv6 one seems to be working and will probably accept IPv4 connections.')
|
||||
logging.warning(
|
||||
'Error while starting IPv4 listener on port %s.'
|
||||
' However the IPv6 one seems to be working'
|
||||
' and will probably accept IPv4 connections.',
|
||||
shared.listening_port)
|
||||
else:
|
||||
logging.error('Error while starting IPv4 listener on port {}. '.format(shared.listening_port) +
|
||||
'You will not receive incoming connections. Please check your port configuration')
|
||||
logging.error(e)
|
||||
logging.error(
|
||||
'Error while starting IPv4 listener on port %s.'
|
||||
'You will not receive incoming connections.'
|
||||
' Please check your port configuration',
|
||||
shared.listening_port, exc_info=True)
|
||||
|
||||
|
||||
def start_i2p_listener():
|
||||
# Grab I2P destinations from old object file
|
||||
for obj in shared.objects.values():
|
||||
if obj.object_type == shared.i2p_dest_obj_type:
|
||||
shared.i2p_unchecked_node_pool.add((base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p'))
|
||||
shared.i2p_unchecked_node_pool.add((
|
||||
base64.b64encode(obj.object_payload, altchars=b'-~'), 'i2p'))
|
||||
|
||||
dest_priv = b''
|
||||
|
||||
if not shared.i2p_transient:
|
||||
try:
|
||||
with open(shared.data_directory + 'i2p_dest_priv.key', mode='br') as file:
|
||||
dest_priv = file.read()
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'br'
|
||||
) as src:
|
||||
dest_priv = src.read()
|
||||
logging.debug('Loaded I2P destination private key.')
|
||||
except Exception:
|
||||
logging.warning(
|
||||
|
@ -183,8 +223,8 @@ def start_i2p_listener():
|
|||
shared.i2p_dest_pub = i2p_controller.dest_pub
|
||||
shared.i2p_session_nick = i2p_controller.nick
|
||||
|
||||
logging.info('Local I2P destination: {}'.format(shared.i2p_dest_pub.decode()))
|
||||
logging.info('I2P session nick: {}'.format(shared.i2p_session_nick.decode()))
|
||||
logging.info('Local I2P destination: %s', shared.i2p_dest_pub.decode())
|
||||
logging.info('I2P session nick: %s', shared.i2p_session_nick.decode())
|
||||
|
||||
logging.info('Starting I2P Listener')
|
||||
i2p_listener = i2p.I2PListener(shared, i2p_controller.nick)
|
||||
|
@ -192,20 +232,25 @@ def start_i2p_listener():
|
|||
|
||||
if not shared.i2p_transient:
|
||||
try:
|
||||
with open(shared.data_directory + 'i2p_dest_priv.key', mode='bw') as file:
|
||||
file.write(i2p_controller.dest_priv)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'i2p_dest_priv.key'), 'bw'
|
||||
) as src:
|
||||
src.write(i2p_controller.dest_priv)
|
||||
logging.debug('Saved I2P destination private key.')
|
||||
except Exception as e:
|
||||
logging.warning('Error while saving I2P destination private key.')
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning(
|
||||
'Error while saving I2P destination private key.',
|
||||
exc_info=True)
|
||||
|
||||
try:
|
||||
with open(shared.data_directory + 'i2p_dest.pub', mode='bw') as file:
|
||||
file.write(shared.i2p_dest_pub)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'i2p_dest.pub'), 'bw'
|
||||
) as src:
|
||||
src.write(shared.i2p_dest_pub)
|
||||
logging.debug('Saved I2P destination public key.')
|
||||
except Exception as e:
|
||||
logging.warning('Error while saving I2P destination public key.')
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning(
|
||||
'Error while saving I2P destination public key.', exc_info=True)
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -214,16 +259,19 @@ def main():
|
|||
|
||||
parse_arguments()
|
||||
|
||||
logging.basicConfig(level=shared.log_level, format='[%(asctime)s] [%(levelname)s] %(message)s')
|
||||
logging.basicConfig(
|
||||
level=shared.log_level,
|
||||
format='[%(asctime)s] [%(levelname)s] %(message)s')
|
||||
logging.info('Starting MiNode')
|
||||
|
||||
logging.info('Data directory: {}'.format(shared.data_directory))
|
||||
logging.info('Data directory: %s', shared.data_directory)
|
||||
if not os.path.exists(shared.data_directory):
|
||||
try:
|
||||
os.makedirs(shared.data_directory)
|
||||
except Exception as e:
|
||||
logging.warning('Error while creating data directory in: {}'.format(shared.data_directory))
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning(
|
||||
'Error while creating data directory in: %s',
|
||||
shared.data_directory, exc_info=True)
|
||||
|
||||
load_data()
|
||||
|
||||
|
@ -231,15 +279,20 @@ def main():
|
|||
bootstrap_from_dns()
|
||||
|
||||
if shared.i2p_enabled:
|
||||
# We are starting it before cleaning expired objects so we can collect I2P destination objects
|
||||
# We are starting it before cleaning expired objects
|
||||
# so we can collect I2P destination objects
|
||||
start_i2p_listener()
|
||||
|
||||
for vector in set(shared.objects):
|
||||
if not shared.objects[vector].is_valid():
|
||||
if shared.objects[vector].is_expired():
|
||||
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
||||
logging.debug(
|
||||
'Deleted expired object: %s',
|
||||
base64.b16encode(vector).decode())
|
||||
else:
|
||||
logging.warning('Deleted invalid object: {}'.format(base64.b16encode(vector).decode()))
|
||||
logging.warning(
|
||||
'Deleted invalid object: %s',
|
||||
base64.b16encode(vector).decode())
|
||||
del shared.objects[vector]
|
||||
|
||||
manager = Manager()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import queue
|
||||
import random
|
||||
|
@ -20,7 +21,9 @@ class Manager(threading.Thread):
|
|||
self.last_cleaned_connections = time.time()
|
||||
self.last_pickled_objects = time.time()
|
||||
self.last_pickled_nodes = time.time()
|
||||
self.last_published_i2p_destination = time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # Publish destination 5-15 minutes after start
|
||||
# Publish destination 5-15 minutes after start
|
||||
self.last_published_i2p_destination = \
|
||||
time.time() - 50 * 60 + random.uniform(-1, 1) * 300
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -51,7 +54,9 @@ class Manager(threading.Thread):
|
|||
if shared.objects[vector].is_expired():
|
||||
with shared.objects_lock:
|
||||
del shared.objects[vector]
|
||||
logging.debug('Deleted expired object: {}'.format(base64.b16encode(vector).decode()))
|
||||
logging.debug(
|
||||
'Deleted expired object: %s',
|
||||
base64.b16encode(vector).decode())
|
||||
|
||||
@staticmethod
|
||||
def manage_connections():
|
||||
|
@ -75,11 +80,15 @@ class Manager(threading.Thread):
|
|||
if shared.trusted_peer:
|
||||
to_connect.add(shared.trusted_peer)
|
||||
|
||||
if outgoing_connections < shared.outgoing_connections and shared.send_outgoing_connections and not shared.trusted_peer:
|
||||
if (
|
||||
outgoing_connections < shared.outgoing_connections
|
||||
and shared.send_outgoing_connections and not shared.trusted_peer
|
||||
):
|
||||
|
||||
if shared.ip_enabled:
|
||||
if len(shared.unchecked_node_pool) > 16:
|
||||
to_connect.update(random.sample(shared.unchecked_node_pool, 16))
|
||||
to_connect.update(random.sample(
|
||||
shared.unchecked_node_pool, 16))
|
||||
else:
|
||||
to_connect.update(shared.unchecked_node_pool)
|
||||
shared.unchecked_node_pool.difference_update(to_connect)
|
||||
|
@ -90,7 +99,8 @@ class Manager(threading.Thread):
|
|||
|
||||
if shared.i2p_enabled:
|
||||
if len(shared.i2p_unchecked_node_pool) > 16:
|
||||
to_connect.update(random.sample(shared.i2p_unchecked_node_pool, 16))
|
||||
to_connect.update(
|
||||
random.sample(shared.i2p_unchecked_node_pool, 16))
|
||||
else:
|
||||
to_connect.update(shared.i2p_unchecked_node_pool)
|
||||
shared.i2p_unchecked_node_pool.difference_update(to_connect)
|
||||
|
@ -112,9 +122,10 @@ class Manager(threading.Thread):
|
|||
d.start()
|
||||
hosts.add(d.destination)
|
||||
shared.i2p_dialers.add(d)
|
||||
except Exception as e:
|
||||
logging.warning('Exception while trying to establish an I2P connection')
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning(
|
||||
'Exception while trying to establish'
|
||||
' an I2P connection', exc_info=True)
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
|
@ -128,9 +139,11 @@ class Manager(threading.Thread):
|
|||
@staticmethod
|
||||
def pickle_objects():
|
||||
try:
|
||||
with open(shared.data_directory + 'objects.pickle', mode='bw') as file:
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'objects.pickle'), 'bw'
|
||||
) as dst:
|
||||
with shared.objects_lock:
|
||||
pickle.dump(shared.objects, file, protocol=3)
|
||||
pickle.dump(shared.objects, dst, protocol=3)
|
||||
logging.debug('Saved objects')
|
||||
except Exception as e:
|
||||
logging.warning('Error while saving objects')
|
||||
|
@ -141,27 +154,37 @@ class Manager(threading.Thread):
|
|||
if len(shared.node_pool) > 10000:
|
||||
shared.node_pool = set(random.sample(shared.node_pool, 10000))
|
||||
if len(shared.unchecked_node_pool) > 1000:
|
||||
shared.unchecked_node_pool = set(random.sample(shared.unchecked_node_pool, 1000))
|
||||
shared.unchecked_node_pool = set(
|
||||
random.sample(shared.unchecked_node_pool, 1000))
|
||||
|
||||
if len(shared.i2p_node_pool) > 1000:
|
||||
shared.i2p_node_pool = set(random.sample(shared.i2p_node_pool, 1000))
|
||||
shared.i2p_node_pool = set(
|
||||
random.sample(shared.i2p_node_pool, 1000))
|
||||
if len(shared.i2p_unchecked_node_pool) > 100:
|
||||
shared.i2p_unchecked_node_pool = set(random.sample(shared.i2p_unchecked_node_pool, 100))
|
||||
shared.i2p_unchecked_node_pool = set(
|
||||
random.sample(shared.i2p_unchecked_node_pool, 100))
|
||||
|
||||
try:
|
||||
with open(shared.data_directory + 'nodes.pickle', mode='bw') as file:
|
||||
pickle.dump(shared.node_pool, file, protocol=3)
|
||||
with open(shared.data_directory + 'i2p_nodes.pickle', mode='bw') as file:
|
||||
pickle.dump(shared.i2p_node_pool, file, protocol=3)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'nodes.pickle'), 'bw'
|
||||
) as dst:
|
||||
pickle.dump(shared.node_pool, dst, protocol=3)
|
||||
with open(
|
||||
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'bw'
|
||||
) as dst:
|
||||
pickle.dump(shared.i2p_node_pool, dst, protocol=3)
|
||||
logging.debug('Saved nodes')
|
||||
except Exception as e:
|
||||
logging.warning('Error while saving nodes')
|
||||
logging.warning(e)
|
||||
except Exception:
|
||||
logging.warning('Error while saving nodes', exc_info=True)
|
||||
|
||||
@staticmethod
|
||||
def publish_i2p_destination():
|
||||
if shared.i2p_session_nick and not shared.i2p_transient:
|
||||
logging.info('Publishing our I2P destination')
|
||||
dest_pub_raw = base64.b64decode(shared.i2p_dest_pub, altchars=b'-~')
|
||||
obj = structure.Object(b'\x00' * 8, int(time.time() + 2 * 3600), shared.i2p_dest_obj_type, shared.i2p_dest_obj_version, 1, dest_pub_raw)
|
||||
dest_pub_raw = base64.b64decode(
|
||||
shared.i2p_dest_pub, altchars=b'-~')
|
||||
obj = structure.Object(
|
||||
b'\x00' * 8, int(time.time() + 2 * 3600),
|
||||
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
|
||||
1, dest_pub_raw)
|
||||
pow.do_pow_and_publish(obj)
|
||||
|
|
|
@ -14,8 +14,12 @@ class Header(object):
|
|||
self.payload_checksum = payload_checksum
|
||||
|
||||
def __repr__(self):
|
||||
return 'type: header, command: "{}", payload_length: {}, payload_checksum: {}'\
|
||||
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
|
||||
return (
|
||||
'type: header, command: "{}", payload_length: {},'
|
||||
' payload_checksum: {}'
|
||||
).format(
|
||||
self.command.decode(), self.payload_length,
|
||||
base64.b16encode(self.payload_checksum).decode())
|
||||
|
||||
def to_bytes(self):
|
||||
b = b''
|
||||
|
@ -27,7 +31,8 @@ class Header(object):
|
|||
|
||||
@classmethod
|
||||
def from_bytes(cls, b):
|
||||
magic_bytes, command, payload_length, payload_checksum = struct.unpack('>4s12sL4s', b)
|
||||
magic_bytes, command, payload_length, payload_checksum = struct.unpack(
|
||||
'>4s12sL4s', b)
|
||||
|
||||
if magic_bytes != shared.magic_bytes:
|
||||
raise ValueError('magic_bytes do not match')
|
||||
|
@ -46,11 +51,14 @@ class Message(object):
|
|||
self.payload_checksum = hashlib.sha512(payload).digest()[:4]
|
||||
|
||||
def __repr__(self):
|
||||
return '{}, payload_length: {}, payload_checksum: {}'\
|
||||
.format(self.command.decode(), self.payload_length, base64.b16encode(self.payload_checksum).decode())
|
||||
return '{}, payload_length: {}, payload_checksum: {}'.format(
|
||||
self.command.decode(), self.payload_length,
|
||||
base64.b16encode(self.payload_checksum).decode())
|
||||
|
||||
def to_bytes(self):
|
||||
b = Header(self.command, self.payload_length, self.payload_checksum).to_bytes()
|
||||
b = Header(
|
||||
self.command, self.payload_length, self.payload_checksum
|
||||
).to_bytes()
|
||||
b += self.payload
|
||||
return b
|
||||
|
||||
|
@ -62,19 +70,26 @@ class Message(object):
|
|||
payload_length = len(payload)
|
||||
|
||||
if payload_length != h.payload_length:
|
||||
raise ValueError('wrong payload length, expected {}, got {}'.format(h.payload_length, payload_length))
|
||||
raise ValueError(
|
||||
'wrong payload length, expected {}, got {}'.format(
|
||||
h.payload_length, payload_length))
|
||||
|
||||
payload_checksum = hashlib.sha512(payload).digest()[:4]
|
||||
|
||||
if payload_checksum != h.payload_checksum:
|
||||
raise ValueError('wrong payload checksum, expected {}, got {}'.format(h.payload_checksum, payload_checksum))
|
||||
raise ValueError(
|
||||
'wrong payload checksum, expected {}, got {}'.format(
|
||||
h.payload_checksum, payload_checksum))
|
||||
|
||||
return cls(h.command, payload)
|
||||
|
||||
|
||||
class Version(object):
|
||||
def __init__(self, host, port, protocol_version=shared.protocol_version, services=shared.services,
|
||||
nonce=shared.nonce, user_agent=shared.user_agent):
|
||||
def __init__(
|
||||
self, host, port, protocol_version=shared.protocol_version,
|
||||
services=shared.services, nonce=shared.nonce,
|
||||
user_agent=shared.user_agent
|
||||
):
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
|
@ -84,16 +99,21 @@ class Version(object):
|
|||
self.user_agent = user_agent
|
||||
|
||||
def __repr__(self):
|
||||
return 'version, protocol_version: {}, services: {}, host: {}, port: {}, nonce: {}, user_agent: {}'\
|
||||
.format(self.protocol_version, self.services, self.host, self.port, base64.b16encode(self.nonce).decode(), self.user_agent)
|
||||
return (
|
||||
'version, protocol_version: {}, services: {}, host: {}, port: {},'
|
||||
' nonce: {}, user_agent: {}').format(
|
||||
self.protocol_version, self.services, self.host, self.port,
|
||||
base64.b16encode(self.nonce).decode(), self.user_agent)
|
||||
|
||||
def to_bytes(self):
|
||||
payload = b''
|
||||
payload += struct.pack('>I', self.protocol_version)
|
||||
payload += struct.pack('>Q', self.services)
|
||||
payload += struct.pack('>Q', int(time.time()))
|
||||
payload += structure.NetAddrNoPrefix(shared.services, self.host, self.port).to_bytes()
|
||||
payload += structure.NetAddrNoPrefix(shared.services, '127.0.0.1', 8444).to_bytes()
|
||||
payload += structure.NetAddrNoPrefix(
|
||||
shared.services, self.host, self.port).to_bytes()
|
||||
payload += structure.NetAddrNoPrefix(
|
||||
shared.services, '127.0.0.1', 8444).to_bytes()
|
||||
payload += self.nonce
|
||||
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
|
||||
payload += shared.user_agent
|
||||
|
@ -107,8 +127,9 @@ class Version(object):
|
|||
|
||||
payload = m.payload
|
||||
|
||||
protocol_version, services, t, net_addr_remote, net_addr_local, nonce = \
|
||||
struct.unpack('>IQQ26s26s8s', payload[:80])
|
||||
( # unused: net_addr_local
|
||||
protocol_version, services, t, net_addr_remote, _, nonce
|
||||
) = struct.unpack('>IQQ26s26s8s', payload[:80])
|
||||
|
||||
net_addr_remote = structure.NetAddrNoPrefix.from_bytes(net_addr_remote)
|
||||
|
||||
|
@ -118,7 +139,8 @@ class Version(object):
|
|||
payload = payload[80:]
|
||||
|
||||
user_agent_varint_length = structure.VarInt.length(payload[0])
|
||||
user_agent_length = structure.VarInt.from_bytes(payload[:user_agent_varint_length]).n
|
||||
user_agent_length = structure.VarInt.from_bytes(
|
||||
payload[:user_agent_varint_length]).n
|
||||
|
||||
payload = payload[user_agent_varint_length:]
|
||||
|
||||
|
@ -140,14 +162,18 @@ class Inv(object):
|
|||
return 'inv, count: {}'.format(len(self.vectors))
|
||||
|
||||
def to_bytes(self):
|
||||
return Message(b'inv', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
|
||||
return Message(
|
||||
b'inv', structure.VarInt(len(self.vectors)).to_bytes()
|
||||
+ b''.join(self.vectors)
|
||||
).to_bytes()
|
||||
|
||||
@classmethod
|
||||
def from_message(cls, m):
|
||||
payload = m.payload
|
||||
|
||||
vector_count_varint_length = structure.VarInt.length(payload[0])
|
||||
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
|
||||
vector_count = structure.VarInt.from_bytes(
|
||||
payload[:vector_count_varint_length]).n
|
||||
|
||||
payload = payload[vector_count_varint_length:]
|
||||
|
||||
|
@ -171,14 +197,18 @@ class GetData(object):
|
|||
return 'getdata, count: {}'.format(len(self.vectors))
|
||||
|
||||
def to_bytes(self):
|
||||
return Message(b'getdata', structure.VarInt(len(self.vectors)).to_bytes() + b''.join(self.vectors)).to_bytes()
|
||||
return Message(
|
||||
b'getdata', structure.VarInt(len(self.vectors)).to_bytes()
|
||||
+ b''.join(self.vectors)
|
||||
).to_bytes()
|
||||
|
||||
@classmethod
|
||||
def from_message(cls, m):
|
||||
payload = m.payload
|
||||
|
||||
vector_count_varint_length = structure.VarInt.length(payload[0])
|
||||
vector_count = structure.VarInt.from_bytes(payload[:vector_count_varint_length]).n
|
||||
vector_count = structure.VarInt.from_bytes(
|
||||
payload[:vector_count_varint_length]).n
|
||||
|
||||
payload = payload[vector_count_varint_length:]
|
||||
|
||||
|
@ -202,14 +232,18 @@ class Addr(object):
|
|||
return 'addr, count: {}'.format(len(self.addresses))
|
||||
|
||||
def to_bytes(self):
|
||||
return Message(b'addr', structure.VarInt(len(self.addresses)).to_bytes() + b''.join({addr.to_bytes() for addr in self.addresses})).to_bytes()
|
||||
return Message(
|
||||
b'addr', structure.VarInt(len(self.addresses)).to_bytes()
|
||||
+ b''.join({addr.to_bytes() for addr in self.addresses})
|
||||
).to_bytes()
|
||||
|
||||
@classmethod
|
||||
def from_message(cls, m):
|
||||
payload = m.payload
|
||||
|
||||
addr_count_varint_length = structure.VarInt.length(payload[0])
|
||||
addr_count = structure.VarInt.from_bytes(payload[:addr_count_varint_length]).n
|
||||
# addr_count = structure.VarInt.from_bytes(
|
||||
# payload[:addr_count_varint_length]).n
|
||||
|
||||
payload = payload[addr_count_varint_length:]
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ import base64
|
|||
import hashlib
|
||||
import logging
|
||||
import multiprocessing
|
||||
import shared
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
|
@ -12,29 +11,37 @@ from . import shared, structure
|
|||
|
||||
def _pow_worker(target, initial_hash, q):
|
||||
nonce = 0
|
||||
logging.debug("target: {}, initial_hash: {}".format(target, base64.b16encode(initial_hash).decode()))
|
||||
logging.debug(
|
||||
'target: %s, initial_hash: %s',
|
||||
target, base64.b16encode(initial_hash).decode())
|
||||
trial_value = target + 1
|
||||
|
||||
while trial_value > target:
|
||||
nonce += 1
|
||||
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
|
||||
trial_value = struct.unpack('>Q', hashlib.sha512(hashlib.sha512(
|
||||
struct.pack('>Q', nonce) + initial_hash).digest()).digest()[:8])[0]
|
||||
|
||||
q.put(struct.pack('>Q', nonce))
|
||||
|
||||
|
||||
def _worker(obj):
|
||||
q = multiprocessing.Queue()
|
||||
p = multiprocessing.Process(target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
|
||||
p = multiprocessing.Process(
|
||||
target=_pow_worker, args=(obj.pow_target(), obj.pow_initial_hash(), q))
|
||||
|
||||
logging.debug("Starting POW process")
|
||||
logging.debug('Starting POW process')
|
||||
t = time.time()
|
||||
p.start()
|
||||
nonce = q.get()
|
||||
p.join()
|
||||
|
||||
logging.debug("Finished doing POW, nonce: {}, time: {}s".format(nonce, time.time() - t))
|
||||
obj = structure.Object(nonce, obj.expires_time, obj.object_type, obj.version, obj.stream_number, obj.object_payload)
|
||||
logging.debug("Object vector is {}".format(base64.b16encode(obj.vector).decode()))
|
||||
logging.debug(
|
||||
'Finished doing POW, nonce: %s, time: %ss', nonce, time.time() - t)
|
||||
obj = structure.Object(
|
||||
nonce, obj.expires_time, obj.object_type, obj.version,
|
||||
obj.stream_number, obj.object_payload)
|
||||
logging.debug(
|
||||
'Object vector is %s', base64.b16encode(obj.vector).decode())
|
||||
|
||||
with shared.objects_lock:
|
||||
shared.objects[obj.vector] = obj
|
||||
|
|
|
@ -2,8 +2,8 @@
|
|||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
import struct
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
|
||||
from . import shared
|
||||
|
@ -44,17 +44,22 @@ class VarInt(object):
|
|||
|
||||
|
||||
class Object(object):
|
||||
def __init__(self, nonce, expires_time, object_type, version, stream_number, object_payload):
|
||||
def __init__(
|
||||
self, nonce, expires_time, object_type, version,
|
||||
stream_number, object_payload
|
||||
):
|
||||
self.nonce = nonce
|
||||
self.expires_time = expires_time
|
||||
self.object_type = object_type
|
||||
self.version = version
|
||||
self.stream_number = stream_number
|
||||
self.object_payload = object_payload
|
||||
self.vector = hashlib.sha512(hashlib.sha512(self.to_bytes()).digest()).digest()[:32]
|
||||
self.vector = hashlib.sha512(hashlib.sha512(
|
||||
self.to_bytes()).digest()).digest()[:32]
|
||||
|
||||
def __repr__(self):
|
||||
return 'object, vector: {}'.format(base64.b16encode(self.vector).decode())
|
||||
return 'object, vector: {}'.format(
|
||||
base64.b16encode(self.vector).decode())
|
||||
|
||||
@classmethod
|
||||
def from_message(cls, m):
|
||||
|
@ -65,15 +70,19 @@ class Object(object):
|
|||
version = VarInt.from_bytes(payload[:version_varint_length]).n
|
||||
payload = payload[version_varint_length:]
|
||||
stream_number_varint_length = VarInt.length(payload[0])
|
||||
stream_number = VarInt.from_bytes(payload[:stream_number_varint_length]).n
|
||||
stream_number = VarInt.from_bytes(
|
||||
payload[:stream_number_varint_length]).n
|
||||
payload = payload[stream_number_varint_length:]
|
||||
return cls(nonce, expires_time, object_type, version, stream_number, payload)
|
||||
return cls(
|
||||
nonce, expires_time, object_type, version, stream_number, payload)
|
||||
|
||||
def to_bytes(self):
|
||||
payload = b''
|
||||
payload += self.nonce
|
||||
payload += struct.pack('>QL', self.expires_time, self.object_type)
|
||||
payload += VarInt(self.version).to_bytes() + VarInt(self.stream_number).to_bytes()
|
||||
payload += (
|
||||
VarInt(self.version).to_bytes()
|
||||
+ VarInt(self.stream_number).to_bytes())
|
||||
payload += self.object_payload
|
||||
return payload
|
||||
|
||||
|
@ -82,25 +91,37 @@ class Object(object):
|
|||
|
||||
def is_valid(self):
|
||||
if self.is_expired():
|
||||
logging.debug('Invalid object {}, reason: expired'.format(base64.b16encode(self.vector).decode()))
|
||||
logging.debug(
|
||||
'Invalid object %s, reason: expired',
|
||||
base64.b16encode(self.vector).decode())
|
||||
return False
|
||||
if self.expires_time > time.time() + 28 * 24 * 3600 + 3 * 3600:
|
||||
logging.warning('Invalid object {}, reason: end of life too far in the future'.format(base64.b16encode(self.vector).decode()))
|
||||
logging.warning(
|
||||
'Invalid object %s, reason: end of life too far in the future',
|
||||
base64.b16encode(self.vector).decode())
|
||||
return False
|
||||
if len(self.object_payload) > 2**18:
|
||||
logging.warning('Invalid object {}, reason: payload is too long'.format(base64.b16encode(self.vector).decode()))
|
||||
logging.warning(
|
||||
'Invalid object %s, reason: payload is too long',
|
||||
base64.b16encode(self.vector).decode())
|
||||
return False
|
||||
if self.stream_number != 1:
|
||||
logging.warning('Invalid object {}, reason: not in stream 1'.format(base64.b16encode(self.vector).decode()))
|
||||
logging.warning(
|
||||
'Invalid object %s, reason: not in stream 1',
|
||||
base64.b16encode(self.vector).decode())
|
||||
return False
|
||||
data = self.to_bytes()[8:]
|
||||
length = len(data) + 8 + shared.payload_length_extra_bytes
|
||||
dt = max(self.expires_time - time.time(), 0)
|
||||
# length = len(data) + 8 + shared.payload_length_extra_bytes
|
||||
# dt = max(self.expires_time - time.time(), 0)
|
||||
h = hashlib.sha512(data).digest()
|
||||
pow_value = int.from_bytes(hashlib.sha512(hashlib.sha512(self.nonce + h).digest()).digest()[:8], 'big')
|
||||
pow_value = int.from_bytes(
|
||||
hashlib.sha512(hashlib.sha512(
|
||||
self.nonce + h).digest()).digest()[:8], 'big')
|
||||
target = self.pow_target()
|
||||
if target < pow_value:
|
||||
logging.warning('Invalid object {}, reason: insufficient pow'.format(base64.b16encode(self.vector).decode()))
|
||||
logging.warning(
|
||||
'Invalid object %s, reason: insufficient pow',
|
||||
base64.b16encode(self.vector).decode())
|
||||
return False
|
||||
return True
|
||||
|
||||
|
@ -108,7 +129,10 @@ class Object(object):
|
|||
data = self.to_bytes()[8:]
|
||||
length = len(data) + 8 + shared.payload_length_extra_bytes
|
||||
dt = max(self.expires_time - time.time(), 0)
|
||||
return int(2 ** 64 / (shared.nonce_trials_per_byte * (length + (dt * length) / (2 ** 16))))
|
||||
return int(
|
||||
2 ** 64 / (
|
||||
shared.nonce_trials_per_byte * (
|
||||
length + (dt * length) / (2 ** 16))))
|
||||
|
||||
def pow_initial_hash(self):
|
||||
return hashlib.sha512(self.to_bytes()[8:]).digest()
|
||||
|
@ -121,7 +145,8 @@ class NetAddrNoPrefix(object):
|
|||
self.port = port
|
||||
|
||||
def __repr__(self):
|
||||
return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(self.services, self.host, self.port)
|
||||
return 'net_addr_no_prefix, services: {}, host: {}, port {}'.format(
|
||||
self.services, self.host, self.port)
|
||||
|
||||
def to_bytes(self):
|
||||
b = b''
|
||||
|
@ -137,7 +162,8 @@ class NetAddrNoPrefix(object):
|
|||
@classmethod
|
||||
def from_bytes(cls, b):
|
||||
services, host, port = struct.unpack('>Q16sH', b)
|
||||
if host.startswith(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'):
|
||||
if host.startswith(
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF'):
|
||||
host = socket.inet_ntop(socket.AF_INET, host[-4:])
|
||||
else:
|
||||
host = socket.inet_ntop(socket.AF_INET6, host)
|
||||
|
@ -152,8 +178,8 @@ class NetAddr(object):
|
|||
self.port = port
|
||||
|
||||
def __repr__(self):
|
||||
return 'net_addr, stream: {}, services: {}, host: {}, port {}'\
|
||||
.format(self.stream, self.services, self.host, self.port)
|
||||
return 'net_addr, stream: {}, services: {}, host: {}, port {}'.format(
|
||||
self.stream, self.services, self.host, self.port)
|
||||
|
||||
def to_bytes(self):
|
||||
b = b''
|
||||
|
@ -164,6 +190,6 @@ class NetAddr(object):
|
|||
|
||||
@classmethod
|
||||
def from_bytes(cls, b):
|
||||
t, stream, net_addr = struct.unpack('>QI26s', b)
|
||||
stream, net_addr = struct.unpack('>QI26s', b)[1:]
|
||||
n = NetAddrNoPrefix.from_bytes(net_addr)
|
||||
return cls(n.services, n.host, n.port, stream)
|
||||
|
|
Loading…
Reference in New Issue
Block a user