refactor: Single file code - remove external depends

This commit is contained in:
Swapnil 2024-06-05 15:52:17 +05:30
parent 305f001d42
commit b1c89e62e3
Signed by untrusted user: swapnil
GPG Key ID: 58029C48BB100574
5 changed files with 127 additions and 178 deletions

127
agent.py Normal file
View File

@ -0,0 +1,127 @@
import os
import urllib.request
import logging
import json
import http.client
class ServerData:
def __init__(self):
self.hostname = os.uname().nodename
self.public_ip = self.get_public_ip()
logging.basicConfig(level=logging.INFO)
def get_ram_and_disk(self):
with open('/proc/meminfo', 'r') as f:
meminfo = f.read()
ram = int([x for x in meminfo.split('\n') if 'MemTotal' in x][0].split()[1]) // 1024
with open('/proc/diskstats', 'r') as f:
diskstats = f.read()
disk = sum(int(x.split()[9]) for x in diskstats.split('\n') if x) * 512 // 10**9
logging.info(f"RAM: {ram}MB, Disk: {disk}GB")
return ram, disk
def get_cpu_count(self):
with open('/proc/cpuinfo', 'r') as f:
cpuinfo = f.read()
cpu_count = cpuinfo.count('processor')
logging.info(f"CPU Count: {cpu_count}")
return cpu_count
def get_bandwidth(self):
bandwidth = 2000
logging.info(f"Bandwidth: {bandwidth}")
return bandwidth
def get_public_ip(self):
try:
response = urllib.request.urlopen('https://api.ipify.org')
return response.read().decode()
except Exception as e:
logging.error(f"Failed to get public IP: {e}")
return '127.0.0.1'
def get_os(self):
os_id = 27
logging.info(f"OS ID: {os_id}")
return os_id
def create_post_data(self):
class ServerManager:
def __init__(self, host, api_key):
self.host = host
self.api_key = api_key
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + self.api_key,
}
logging.basicConfig(level=logging.INFO)
def send_request(self, method, endpoint, data=None):
url = self.host + endpoint
payload = json.dumps(data).encode('utf-8') if data else None
req = urllib.request.Request(url, data=payload, headers=self.headers, method=method)
try:
with urllib.request.urlopen(req) as response:
if method == 'GET':
return json.loads(response.read().decode())
else:
return response.read().decode()
except urllib.error.HTTPError as e:
logging.error(f"Request failed with {e}")
raise
def get_existing_servers(self):
return self.send_request('GET', '/api/servers')
def create_server(self, post_data):
logging.info("Creating server...")
return self.send_request('POST', '/api/servers', post_data)
def update_server(self, post_data, server_id):
# remove following keys from post_data
for key in ['ssh_port', 'ip1', 'currency', 'price', 'payment_term', 'next_due_date']:
post_data.pop(key, None)
logging.info(f"Updating server with id {server_id}...")
return self.send_request('PUT', '/api/servers/' + str(server_id), post_data)
def existing_server_id(self, post_data):
existing_servers = self.get_existing_servers()
for server in existing_servers:
if server['hostname'] == post_data['hostname']:
return server['id']
return None
def validate_env_vars():
api_key = os.getenv('AGENT_API')
host = os.getenv('HOST')
if not api_key:
raise Exception('AGENT_API not found in environment variables')
if not host:
raise Exception('HOST not found in environment variables')
return host, api_key
def main():
logging.basicConfig(level=logging.INFO)
host, api_key = validate_env_vars()
server_data = ServerData()
post_data = server_data.create_post_data()
server_manager = ServerManager(host, api_key)
# Check if the server already exists
server_id = server_manager.existing_server_id(post_data)
# If the server exists, update it
if server_id:
logging.info('Server already exists with id: {}, Updating...'.format(server_id))
logging.info(server_manager.update_server(post_data, server_id))
else:
logging.info('Server does not exist, Creating...')
logging.info(server_manager.create_server(post_data))
if __name__ == '__main__':
main()

38
main.py
View File

@ -1,38 +0,0 @@
import os
import logging
from server_manager import ServerManager
from server_data import ServerData
def validate_env_vars():
api_key = os.getenv('API_KEY')
host = os.getenv('HOST')
if not api_key:
raise Exception('API_KEY not found in environment variables')
if not host:
raise Exception('HOST not found in environment variables')
return host, api_key
def main():
logging.basicConfig(level=logging.INFO)
host, api_key = validate_env_vars()
server_data = ServerData()
post_data = server_data.create_post_data()
server_manager = ServerManager(host, api_key)
# Check if the server already exists
server_id = server_manager.existing_server_id(post_data)
# If the server exists, update it
if server_id:
logging.info('Server already exists with id: {}, Updating...'.format(server_id))
logging.info(server_manager.update_server(post_data, server_id))
else:
logging.info('Server does not exist, Creating...')
logging.info(server_manager.create_server(post_data))
if __name__ == '__main__':
main()

View File

@ -1,4 +0,0 @@
requests
psutil
tldextract
dnspython

View File

@ -1,90 +0,0 @@
import os
import requests
import psutil
import tldextract
import dns.resolver
import logging
class ServerData:
def __init__(self):
self.hostname = os.uname().nodename
self.public_ip = self.get_public_ip()
self.domain = self.get_domain()
self.nameservers = self.get_nameservers()
logging.basicConfig(level=logging.INFO)
def get_ram_and_disk(self):
ram = psutil.virtual_memory().total >> 20
disk = psutil.disk_usage('/').total >> 30
logging.info(f"RAM: {ram}MB, Disk: {disk}GB")
return ram, disk
def get_cpu_count(self):
cpu_count = psutil.cpu_count()
logging.info(f"CPU Count: {cpu_count}")
return cpu_count
def get_bandwidth(self):
bandwidth = 2000
logging.info(f"Bandwidth: {bandwidth}")
return bandwidth
def get_public_ip(self):
try:
response = requests.get('https://api.ipify.org')
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Failed to get public IP: {e}")
return '127.0.0.1'
def get_domain(self):
extracted = tldextract.extract(self.hostname)
domain = "{}.{}".format(extracted.domain, extracted.suffix)
logging.info(f"Domain: {domain}")
return domain
def get_nameservers(self):
try:
answers = dns.resolver.resolve(self.domain, 'NS')
return [str(rdata) for rdata in answers]
except Exception as e:
logging.error(f"Failed to get nameservers: {e}")
return []
def get_os(self):
os_id = 27
logging.info(f"OS ID: {os_id}")
return os_id
def create_post_data(self):
ram, disk = self.get_ram_and_disk()
post_data = {
"server_type": 1,
"os_id": self.get_os(),
"provider_id": 10,
"location_id": 15,
"ssh_port": 22,
"ram": ram >> 10, # convert to GB
"ram_as_mb": ram, # in MB
"disk": disk, # in GB
"disk_as_gb": disk, # in GB
"cpu": self.get_cpu_count(),
"bandwidth": self.get_bandwidth(),
"was_promo": 1,
"active": 1,
"show_public": 0,
"owned_since": "2022-01-01",
"ram_type": "GB",
"disk_type": "GB",
"currency": "USD",
"price": 4,
"payment_term": 1,
"hostname": self.hostname,
"next_due_date": "2022-02-01",
"ns1": self.nameservers[0] if self.nameservers else "",
"ns2": self.nameservers[1] if len(self.nameservers) > 1 else "",
"ip1": self.public_ip,
}
logging.info("Post data created")
return post_data

View File

@ -1,46 +0,0 @@
import json
import requests
import logging
class ServerManager:
def __init__(self, host, api_key):
self.host = host
self.api_key = api_key
self.headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + self.api_key,
}
logging.basicConfig(level=logging.INFO)
def send_request(self, method, endpoint, data=None):
url = self.host + endpoint
payload = json.dumps(data) if data else None
try:
response = requests.request(method, url, headers=self.headers, data=payload)
response.raise_for_status()
return response.json() if method == 'GET' else response.text
except requests.exceptions.RequestException as e:
logging.error(f"Request failed with {e}")
raise
def get_existing_servers(self):
return self.send_request('GET', '/api/servers')
def create_server(self, post_data):
logging.info("Creating server...")
return self.send_request('POST', '/api/servers', post_data)
def update_server(self, post_data, server_id):
# remove following keys from post_data
for key in ['ssh_port', 'ip1', 'currency', 'price', 'payment_term', 'next_due_date']:
post_data.pop(key, None)
logging.info(f"Updating server with id {server_id}...")
return self.send_request('PUT', '/api/servers/' + str(server_id), post_data)
def existing_server_id(self, post_data):
existing_servers = self.get_existing_servers()
for server in existing_servers:
if server['hostname'] == post_data['hostname']:
return server['id']
return None