Compare commits

..

2 Commits

Author SHA1 Message Date
Lee Miller 44aaccb7a4
Make a session-wide set of junk vectors to not request repeatedly 2023-08-17 02:32:12 +03:00
Lee Miller 46ea5e0744
Define an additional object validation method is_junk(),
checking curve number and key length in encrypted payload.
2023-08-17 02:32:06 +03:00
12 changed files with 164 additions and 302 deletions

View File

@ -1,7 +1,6 @@
The MIT License (MIT)
Copyright (c) 2016-2017 Krzysztof Oziomek
Copyright (c) 2020-2023 The Bitmessage Developers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -86,4 +86,4 @@ will allow you to use it anonymously over I2P with MiNode acting as a bridge.
## Links
- [Bitmessage project website](https://bitmessage.org)
- [Protocol specification](https://pybitmessage.rtfd.io/en/v0.6/protocol.html)
- [Protocol specification](https://bitmessage.org/wiki/Protocol_specification)

View File

@ -110,7 +110,11 @@ class Connection(threading.Thread):
'Disconnecting from %s:%s. Reason: %s',
self.host_print, self.port, e)
data = None
except ConnectionResetError:
logging.debug(
'Disconnecting from %s:%s. Reason: ConnectionResetError',
self.host_print, self.port)
self.status = 'disconnecting'
self._process_buffer_receive()
self._process_queue()
self._send_data()
@ -135,13 +139,13 @@ class Connection(threading.Thread):
time.time() - self.last_message_sent > 300
and self.status == 'fully_established'
):
self.send_queue.put(message.Message(b'ping', b''))
self.send_queue.put(message.Message(b'pong', b''))
if self.status == 'disconnecting' or shared.shutting_down:
data = None
if not data:
self.status = 'disconnected'
self.s.close()
logging.info(
logging.debug(
'Disconnected from %s:%s', self.host_print, self.port)
break
time.sleep(0.2)
@ -335,9 +339,7 @@ class Connection(threading.Thread):
def _process_message(self, m):
if m.command == b'version':
version = message.Version.from_message(m)
if shared.stream not in version.streams:
raise ValueError('message not for stream %i' % shared.stream)
version = message.Version.from_bytes(m.to_bytes())
logging.debug('%s:%s -> %s', self.host_print, self.port, version)
if (
version.protocol_version != shared.protocol_version
@ -382,6 +384,7 @@ class Connection(threading.Thread):
logging.debug('%s:%s -> %s', self.host_print, self.port, inv)
to_get = inv.vectors.copy()
to_get.difference_update(shared.objects.keys())
to_get.difference_update(shared.junk_vectors)
self.vectors_to_get.update(to_get)
# Do not send objects they already have.
self.vectors_to_send.difference_update(inv.vectors)
@ -393,6 +396,8 @@ class Connection(threading.Thread):
self.vectors_to_get.discard(obj.vector)
if obj.is_valid() and obj.vector not in shared.objects:
with shared.objects_lock:
if obj.is_junk():
return shared.junk_vectors.add(obj.vector)
shared.objects[obj.vector] = obj
if (
obj.object_type == shared.i2p_dest_obj_type
@ -422,12 +427,8 @@ class Connection(threading.Thread):
self.send_queue.put(message.Message(b'pong', b''))
elif m.command == b'error':
error = message.Error.from_message(m)
logging.warning(
'%s:%s -> %s', self.host_print, self.port, error)
if error.fatal == 2:
# reduce probability to connect soon
shared.unchecked_node_pool.discard((self.host, self.port))
'%s:%s -> error: %s', self.host_print, self.port, m.payload)
else:
logging.debug('%s:%s -> %s', self.host_print, self.port, m)

View File

@ -37,7 +37,7 @@ class I2PDialer(I2PThread):
self._send(b'HELLO VERSION MIN=3.0 MAX=3.3\n')
self.version_reply = self._receive_line().split()
if b'RESULT=OK' not in self.version_reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning('Error while connecting to %s', self.destination)
self.success = False
self._send(
@ -45,5 +45,6 @@ class I2PDialer(I2PThread):
+ self.destination + b'\n')
reply = self._receive_line().split(b' ')
if b'RESULT=OK' not in reply:
logging.debug('Error while connecting to %s', self.destination)
logging.warning(
'Error while connecting to %s', self.destination)
self.success = False

View File

@ -2,9 +2,11 @@
"""Functions for starting the program"""
import argparse
import base64
import csv
import logging
import multiprocessing
import os
import pickle
import signal
import socket
@ -20,7 +22,7 @@ def handler(s, f): # pylint: disable=unused-argument
shared.shutting_down = True
def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
def parse_arguments():
"""Parsing arguments"""
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port', help='Port to listen on', type=int)
@ -100,6 +102,56 @@ def parse_arguments(): # pylint: disable=too-many-branches,too-many-statements
shared.i2p_transient = True
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning('Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline=''
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline=''
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
def bootstrap_from_dns():
"""Addes addresses of bootstrap servers to known nodes"""
try:
@ -238,6 +290,8 @@ def main():
'Error while creating data directory in: %s',
shared.data_directory, exc_info=True)
load_data()
if shared.ip_enabled and not shared.trusted_peer:
bootstrap_from_dns()
@ -246,6 +300,18 @@ def main():
# so we can collect I2P destination objects
start_i2p_listener()
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
del shared.objects[vector]
manager = Manager()
manager.start()

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
"""The main thread, managing connections, nodes and objects"""
import base64
import csv
import logging
import os
import pickle
@ -26,11 +25,9 @@ class Manager(threading.Thread):
self.last_pickled_nodes = time.time()
# Publish destination 5-15 minutes after start
self.last_published_i2p_destination = \
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec B311
time.time() - 50 * 60 + random.uniform(-1, 1) * 300 # nosec
def run(self):
self.load_data()
self.clean_objects()
while True:
time.sleep(0.8)
now = time.time()
@ -56,17 +53,12 @@ class Manager(threading.Thread):
@staticmethod
def clean_objects():
for vector in set(shared.objects):
if not shared.objects[vector].is_valid():
if shared.objects[vector].is_expired():
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
else:
logging.warning(
'Deleted invalid object: %s',
base64.b16encode(vector).decode())
if shared.objects[vector].is_expired():
with shared.objects_lock:
del shared.objects[vector]
logging.debug(
'Deleted expired object: %s',
base64.b16encode(vector).decode())
@staticmethod
def manage_connections():
@ -146,59 +138,6 @@ class Manager(threading.Thread):
shared.connections.add(c)
shared.hosts = hosts
@staticmethod
def load_data():
"""Loads initial nodes and data, stored in files between sessions"""
try:
with open(
os.path.join(shared.data_directory, 'objects.pickle'), 'br'
) as src:
shared.objects = pickle.load(src)
except FileNotFoundError:
pass # first start
except Exception:
logging.warning(
'Error while loading objects from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'nodes.pickle'), 'br'
) as src:
shared.node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
try:
with open(
os.path.join(shared.data_directory, 'i2p_nodes.pickle'), 'br'
) as src:
shared.i2p_node_pool = pickle.load(src)
except FileNotFoundError:
pass
except Exception:
logging.warning(
'Error while loading nodes from disk.', exc_info=True)
with open(
os.path.join(shared.source_directory, 'core_nodes.csv'),
'r', newline='', encoding='ascii'
) as src:
reader = csv.reader(src)
shared.core_nodes = {tuple(row) for row in reader}
shared.node_pool.update(shared.core_nodes)
with open(
os.path.join(shared.source_directory, 'i2p_core_nodes.csv'),
'r', newline='', encoding='ascii'
) as f:
reader = csv.reader(f)
shared.i2p_core_nodes = {
(row[0].encode(), 'i2p') for row in reader}
shared.i2p_node_pool.update(shared.i2p_core_nodes)
@staticmethod
def pickle_objects():
try:
@ -248,5 +187,5 @@ class Manager(threading.Thread):
obj = structure.Object(
b'\x00' * 8, int(time.time() + 2 * 3600),
shared.i2p_dest_obj_type, shared.i2p_dest_obj_version,
shared.stream, dest_pub_raw)
1, dest_pub_raw)
proofofwork.do_pow_and_publish(obj)

View File

@ -91,19 +91,12 @@ class Message():
return cls(h.command, payload)
def _payload_read_int(data):
varint_length = structure.VarInt.length(data[0])
return (
structure.VarInt.from_bytes(data[:varint_length]).n,
data[varint_length:])
class Version():
"""The version message payload"""
def __init__(
self, host, port, protocol_version=shared.protocol_version,
services=shared.services, nonce=shared.nonce,
user_agent=shared.user_agent, streams=None
user_agent=shared.user_agent
):
self.host = host
self.port = port
@ -112,9 +105,6 @@ class Version():
self.services = services
self.nonce = nonce
self.user_agent = user_agent
self.streams = streams or [shared.stream]
if len(self.streams) > 160000:
self.streams = self.streams[:160000]
def __repr__(self):
return (
@ -129,20 +119,20 @@ class Version():
payload += struct.pack('>Q', self.services)
payload += struct.pack('>Q', int(time.time()))
payload += structure.NetAddrNoPrefix(
1, self.host, self.port).to_bytes()
shared.services, self.host, self.port).to_bytes()
payload += structure.NetAddrNoPrefix(
self.services, '127.0.0.1', 8444).to_bytes()
shared.services, '127.0.0.1', 8444).to_bytes()
payload += self.nonce
payload += structure.VarInt(len(self.user_agent)).to_bytes()
payload += self.user_agent
payload += structure.VarInt(len(self.streams)).to_bytes()
for stream in self.streams:
payload += structure.VarInt(stream).to_bytes()
payload += structure.VarInt(len(shared.user_agent)).to_bytes()
payload += shared.user_agent
payload += 2 * structure.VarInt(1).to_bytes()
return Message(b'version', payload).to_bytes()
@classmethod
def from_message(cls, m):
def from_bytes(cls, b):
m = Message.from_bytes(b)
payload = m.payload
( # unused: timestamp, net_addr_local
@ -156,24 +146,20 @@ class Version():
payload = payload[80:]
user_agent_length, payload = _payload_read_int(payload)
user_agent_varint_length = structure.VarInt.length(payload[0])
user_agent_length = structure.VarInt.from_bytes(
payload[:user_agent_varint_length]).n
payload = payload[user_agent_varint_length:]
user_agent = payload[:user_agent_length]
payload = payload[user_agent_length:]
streams_count, payload = _payload_read_int(payload)
if streams_count > 160000:
raise ValueError('malformed Version message, to many streams')
streams = []
if payload != b'\x01\x01':
raise ValueError('message not for stream 1')
while payload:
stream, payload = _payload_read_int(payload)
streams.append(stream)
if streams_count != len(streams):
raise ValueError('malformed Version message, wrong streams_count')
return cls(
host, port, protocol_version, services, nonce, user_agent, streams)
return cls(host, port, protocol_version, services, nonce, user_agent)
class Inv():
@ -194,7 +180,11 @@ class Inv():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -226,7 +216,11 @@ class GetData():
def from_message(cls, m):
payload = m.payload
vector_count, payload = _payload_read_int(payload)
vector_count_varint_length = structure.VarInt.length(payload[0])
vector_count = structure.VarInt.from_bytes(
payload[:vector_count_varint_length]).n
payload = payload[vector_count_varint_length:]
vectors = set()
@ -258,8 +252,11 @@ class Addr():
def from_message(cls, m):
payload = m.payload
# not validating addr_count
_, payload = _payload_read_int(payload)
addr_count_varint_length = structure.VarInt.length(payload[0])
# addr_count = structure.VarInt.from_bytes(
# payload[:addr_count_varint_length]).n
payload = payload[addr_count_varint_length:]
addresses = set()
@ -268,37 +265,3 @@ class Addr():
payload = payload[38:]
return cls(addresses)
class Error():
"""The error message payload"""
def __init__(self, error_text=b'', fatal=0, ban_time=0, vector=b''):
self.error_text = error_text
self.fatal = fatal
self.ban_time = ban_time
self.vector = vector
def __repr__(self):
return 'error, text: {}'.format(self.error_text)
def to_bytes(self):
return Message(
b'error', structure.VarInt(self.fatal).to_bytes()
+ structure.VarInt(self.ban_time).to_bytes()
+ structure.VarInt(len(self.vector)).to_bytes() + self.vector
+ structure.VarInt(len(self.error_text)).to_bytes()
+ self.error_text
).to_bytes()
@classmethod
def from_message(cls, m):
payload = m.payload
fatal, payload = _payload_read_int(payload)
ban_time, payload = _payload_read_int(payload)
vector_length, payload = _payload_read_int(payload)
vector = payload[:vector_length]
payload = payload[vector_length:]
error_text_length, payload = _payload_read_int(payload)
error_text = payload[:error_text_length]
return cls(error_text, fatal, ban_time, vector)

View File

@ -16,6 +16,8 @@ ip_enabled = True
log_level = logging.INFO
curve = 714 # secp256k1
key_length = 32
magic_bytes = b'\xe9\xbe\xb4\xd9'
protocol_version = 3
services = 3 # NODE_NETWORK, NODE_SSL
@ -63,4 +65,5 @@ outgoing_connections = 8
connection_limit = 250
objects = {}
junk_vectors = set()
objects_lock = threading.Lock()

View File

@ -60,12 +60,6 @@ class Object():
self.vector = hashlib.sha512(hashlib.sha512(
self.to_bytes()).digest()).digest()[:32]
self.tag = (
# broadcast from version 5 and pubkey/getpukey from version 4
self.object_payload[:32] if object_type == 3 and version == 5
or (object_type in (0, 1) and version == 4)
else None)
def __repr__(self):
return 'object, vector: {}'.format(
base64.b16encode(self.vector).decode())
@ -118,16 +112,18 @@ class Object():
'Invalid object %s, reason: payload is too long',
base64.b16encode(self.vector).decode())
return False
if self.stream_number != shared.stream:
if self.stream_number != 1:
logging.warning(
'Invalid object %s, reason: not in stream %i',
base64.b16encode(self.vector).decode(), shared.stream)
'Invalid object %s, reason: not in stream 1',
base64.b16encode(self.vector).decode())
return False
data = self.to_bytes()[8:]
# length = len(data) + 8 + shared.payload_length_extra_bytes
# dt = max(self.expires_time - time.time(), 0)
h = hashlib.sha512(data).digest()
pow_value = int.from_bytes(
hashlib.sha512(hashlib.sha512(
self.nonce + self.pow_initial_hash()
).digest()).digest()[:8], 'big')
self.nonce + h).digest()).digest()[:8], 'big')
target = self.pow_target()
if target < pow_value:
logging.warning(
@ -136,6 +132,31 @@ class Object():
return False
return True
def is_junk(self):
"""
Returns True if an object with encrypted payload has
curve number or key length different from those defined in shared.
"""
if self.object_type not in (1, 2, 3):
return False
if self.object_type == 2:
sp = 0
elif self.object_type == 1 and self.version != 4:
return False
else:
sp = 32
sp += 16
curve = struct.unpack('!H', self.object_payload[sp:sp + 2])[0]
if curve != shared.curve:
return True
length = struct.unpack('!H', self.object_payload[sp + 2:sp + 4])[0]
if length > shared.key_length:
return True
length = struct.unpack('!H', self.object_payload[sp + 36:sp + 38])[0]
if length > shared.key_length:
return True
return False
def pow_target(self):
"""Compute PoW target"""
data = self.to_bytes()[8:]

View File

@ -13,7 +13,7 @@ from minode.shared import magic_bytes
# 1626611891, 1, 1, net.ipv6(ipaddress.ip_address('127.0.0.1')).packed,
# 8444
# ) for _ in range(1000)]
sample_addr_data = unhexlify(
sample_data = unhexlify(
'fd01f4' + (
'0000000060f420b30000000'
'1000000000000000100000000000000000000ffff7f00000120fc'
@ -24,21 +24,6 @@ sample_addr_data = unhexlify(
sample_ping_msg = unhexlify(
'e9beb4d970696e67000000000000000000000004ee26b0dd74657374')
# from pybitmessage import pathmagic
# pathmagic.setup()
# import protocol
# msg = protocol.assembleVersionMessage('127.0.0.1', 8444, [1, 2, 3])
sample_version_msg = unhexlify(
'e9beb4d976657273696f6e00000000000000006b1b06b182000000030000000000000003'
'0000000064fdd3e1000000000000000100000000000000000000ffff7f00000120fc0000'
'00000000000300000000000000000000ffff7f00000120fc00c0b6c3eefb2adf162f5079'
'4269746d6573736167653a302e362e332e322f03010203'
)
#
sample_error_data = \
b'\x02\x00\x006Too many connections from your IP. Closing connection.'
class TestMessage(unittest.TestCase):
"""Test assembling and disassembling of network mesages"""
@ -62,7 +47,7 @@ class TestMessage(unittest.TestCase):
def test_addr(self):
"""Test addr messages"""
msg = message.Message(b'addr', sample_addr_data)
msg = message.Message(b'addr', sample_data)
addr_packet = message.Addr.from_message(msg)
self.assertEqual(len(addr_packet.addresses), 500)
address = addr_packet.addresses.pop()
@ -70,32 +55,3 @@ class TestMessage(unittest.TestCase):
self.assertEqual(address.services, 1)
self.assertEqual(address.port, 8444)
self.assertEqual(address.host, '127.0.0.1')
def test_version(self):
"""Test version message"""
msg = message.Message.from_bytes(sample_version_msg)
self.assertEqual(msg.command, b'version')
version_packet = message.Version.from_message(msg)
self.assertEqual(version_packet.host, '127.0.0.1')
self.assertEqual(version_packet.port, 8444)
self.assertEqual(version_packet.protocol_version, 3)
self.assertEqual(version_packet.services, 3)
self.assertEqual(version_packet.user_agent, b'/PyBitmessage:0.6.3.2/')
self.assertEqual(version_packet.streams, [1, 2, 3])
msg = version_packet.to_bytes()
# omit header and timestamp
self.assertEqual(msg[24:36], sample_version_msg[24:36])
self.assertEqual(msg[44:], sample_version_msg[44:])
def test_error(self):
"""Test error message"""
msg = message.Error.from_message(
message.Message(b'error', sample_error_data))
self.assertEqual(msg.fatal, 2)
self.assertEqual(msg.ban_time, 0)
self.assertEqual(msg.vector, b'')
msg = message.Error(
b'Too many connections from your IP. Closing connection.', 2)
self.assertEqual(msg.to_bytes()[24:], sample_error_data)

View File

@ -1,13 +1,9 @@
"""Tests for structures"""
import base64
import logging
import queue
import struct
import time
import unittest
import struct
from binascii import unhexlify
from minode import message, proofofwork, shared, structure
from minode import structure
# host pregenerated by pybitmessage.protocol.encodeHost()
@ -17,24 +13,10 @@ sample_addr_data = unhexlify(
'0000000060f420b3000000010000000000000001'
'260753000201300000000000000057ae1f90')
# data for an object with expires_time 1697063939
# structure.Object(
# b'\x00' * 8, expires_time, 42, 1, 2, b'HELLO').to_bytes()
sample_object_data = unhexlify(
'000000000000000000000000652724030000002a010248454c4c4f')
logging.basicConfig(
level=shared.log_level,
format='[%(asctime)s] [%(levelname)s] %(message)s')
class TestStructure(unittest.TestCase):
"""Testing structures serializing and deserializing"""
@classmethod
def setUpClass(cls):
shared.objects = {}
def test_varint(self):
"""Test varint serializing and deserializing"""
s = structure.VarInt(0)
@ -103,71 +85,3 @@ class TestStructure(unittest.TestCase):
addr = structure.NetAddr(1, '2607:5300:201:3000::57ae', 8080, 1)
self.assertEqual(addr.to_bytes()[8:], sample_addr_data[8:])
def test_object(self):
"""Create and check objects"""
obj = structure.Object.from_message(
message.Message(b'object', sample_object_data))
self.assertEqual(obj.object_type, 42)
self.assertEqual(obj.stream_number, 2)
self.assertEqual(obj.expires_time, 1697063939)
self.assertEqual(obj.object_payload, b'HELLO')
obj = structure.Object(
b'\x00' * 8, int(time.time() + 3000000), 42, 1, 1, b'HELLO')
self.assertFalse(obj.is_valid())
obj.expires_time = int(time.time() - 11000)
self.assertFalse(obj.is_valid())
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1, 2, b'HELLO')
vector = obj.vector
proofofwork._worker(obj) # pylint: disable=protected-access
obj = shared.objects.popitem()[1]
self.assertNotEqual(obj.vector, vector)
self.assertFalse(obj.is_expired())
self.assertFalse(obj.is_valid())
shared.stream = 2
self.assertTrue(obj.is_valid())
obj.object_payload = \
b'TIGER, tiger, burning bright. In the forests of the night'
self.assertFalse(obj.is_valid())
def test_proofofwork(self):
"""Check the main proofofwork call and worker"""
shared.vector_advertise_queue = queue.Queue()
obj = structure.Object(
b'\x00' * 8, int(time.time() + 300), 42, 1,
shared.stream, b'HELLO')
start_time = time.time()
proofofwork.do_pow_and_publish(obj)
try:
vector = shared.vector_advertise_queue.get(timeout=300)
except queue.Empty:
self.fail("Couldn't make work in 300 sec")
else:
time.sleep(1)
try:
result = shared.objects[vector]
except KeyError:
self.fail(
"Couldn't found object with vector %s"
" %s sec after pow start" % (
base64.b16encode(vector), time.time() - start_time))
self.assertTrue(result.is_valid())
self.assertEqual(result.object_type, 42)
self.assertEqual(result.object_payload, b'HELLO')
q = queue.Queue()
# pylint: disable=protected-access
proofofwork._pow_worker(obj.pow_target(), obj.pow_initial_hash(), q)
try:
nonce = q.get(timeout=5)
except queue.Empty:
self.fail("No nonce found in the queue")
obj = structure.Object(
nonce, obj.expires_time, obj.object_type, obj.version,
obj.stream_number, obj.object_payload)
self.assertTrue(obj.is_valid())

View File

@ -41,5 +41,4 @@ ignore_errors = true
[pylint.main]
disable = invalid-name,consider-using-f-string,fixme
max-args = 8
max-attributes = 8
max-args = 7