import os import urllib.error import urllib.request import logging import json import subprocess import re import shutil NON_UPDATABLE_KEYS = [ 'server_type', 'os_id', 'provider_id', 'location_id', 'ssh_port', 'bandwidth', 'was_promo', 'active', 'owned_since', 'currency', 'price', 'payment_term', 'next_due_date', 'ip1' ] class ServerData: def __init__(self): self.hostname = os.uname().nodename self.public_ip = self.get_public_ip() self.dmidecode_data = self.parse_dmidecode_output() self.hdparm_data = self.parse_hdparm_output() self.nvme_data = self.parse_nvme_devices() logging.basicConfig(level=logging.INFO) def parse_dmidecode_output(self): ''' Example dmidecode output: Handle 0x0069, DMI type 20, 35 bytes Memory Device Mapped Address Starting Address: 0x00600000000 Ending Address: 0x007FFFFFFFF Range Size: 8 GB Physical Device Handle: 0x0067 Memory Array Mapped Address Handle: 0x006A Partition Row Position: Unknown Interleave Position: 2 Interleaved Data Depth: 2 Handle 0x006A, DMI type 19, 31 bytes Memory Array Mapped Address Starting Address: 0x00000000000 Ending Address: 0x007FFFFFFFF Range Size: 32 GB Physical Array Handle: 0x005E Partition Width: 4 ''' if os.path.isfile('/usr/sbin/dmidecode'): try: # ignore error messages produced by the command output = subprocess.check_output(['sudo', '/usr/sbin/dmidecode'], stderr=subprocess.DEVNULL).decode('utf-8') # each section is separated by two newlines sections = output.split('\n\n') parsed_sections = [] for section in sections: # each line in a section is separated by a newline lines = section.split('\n') # first line contains the DMI type - only need number match = re.search(r'DMI type (\d+)', lines[0]) if match: dmi_type = int(match.group(1)) else: # skip if no DMI type in this section continue # each section is a dictionary with DMIType as key section_dict = {'DMIType': dmi_type} if len(lines) > 1: section_dict['description'] = lines[1].strip() for line in lines[2:]: # skip first two lines - already processed # each line has tabs at the beginning and space(maybe optional) between key and value match = re.match(r'\t(.+):\s*(.*)', line) if match: key, value = match.groups() section_dict[key] = value parsed_sections.append(section_dict) return parsed_sections except subprocess.CalledProcessError: pass return [] def parse_hdparm_output(self): devices = {} # Check if hdparm exists if shutil.which('hdparm') is None: logging.error("hdparm not found") return devices # Get the list of devices try: device_list = [device for device in os.listdir('/sys/block') if device.startswith('sd')] except OSError as e: logging.error("Failed to get device list: {}".format(e)) return devices for device in device_list: # Only get details of devices that start with 'sd': sda, sdb, etc. if not device.startswith('sd'): continue device = '/dev/' + device try: # Get the output output = subprocess.check_output(['hdparm', '-I', device], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: logging.error("Failed to get hdparm output for {}: {}".format(device, e)) continue # Parse the output details = {} model_number_match = re.search(r'Model Number:\s*(.*)', output) if model_number_match is None: logging.warning("Skipping device {} as it does not have a model number".format(device)) continue details['model_number'] = model_number_match.group(1) details['serial_number'] = re.search(r'Serial Number:\s*(.*)', output).group(1) details['firmware_revision'] = re.search(r'Firmware Revision:\s*(.*)', output).group(1) details['transport'] = re.search(r'Transport:\s*(.*)', output).group(1) details['checksum'] = re.search(r'Checksum:\s*(.*)', output).group(1) details['buffer_size'] = re.search(r'cache/buffer size\s*=\s*(.*)', output).group(1) try: details['form_factor'] = re.search(r'Form Factor:\s*(.*)', output).group(1) except AttributeError: details['form_factor'] = "N/A" devices[device] = details return devices def parse_nvme_devices(self): ''' Example nvme id-ctrl output: NVME Identify Controller: vid : 0x144d ssvid : 0x144d sn : S3RVNA0K502408F mn : Samsung SSD 970 EVO Plus 1TB fr : 2B2QEXM7 rab : 2 .... ''' devices = {} # Check if nvme exists if shutil.which('nvme') is None: print("nvme not found") return devices # Get the list of nvme devices starting with 'nvme' nvme_devices = [device for device in os.listdir('/sys/block') if device.startswith('nvme')] for device in nvme_devices: device_path = '/dev/' + device try: # Get the output of nvme id-ctrl command output = subprocess.check_output(['nvme', 'id-ctrl', device_path], stderr=subprocess.STDOUT, universal_newlines=True) except subprocess.CalledProcessError as e: print("Failed to get nvme id-ctrl output for {}: {}".format(device_path, e)) continue # Split the output into lines lines = output.split('\n') # Parse each line device_info = {} for line in lines: if ':' in line: key, value = line.split(':', 1) device_info[key.strip()] = value.strip() devices[device_path] = device_info return devices def get_ram_and_disk(self): # RAM information with open('/proc/meminfo', 'r') as f: meminfo = f.read() ram = int([x for x in meminfo.split('\n') if 'MemTotal' in x][0].split()[1]) // 1024 # Disk space information disk = 0 for device in os.listdir('/sys/block'): device_path = '/sys/block/{}/device'.format(device) size_path = '/sys/block/{}/size'.format(device) if not os.path.islink(device_path): continue try: with open(size_path, 'r') as f: size = int(f.read().strip()) disk += size except Exception: pass disk = disk * 512 // (1024**3) # convert to GB logging.info("RAM: {}MB, Disk: {}GB".format(ram, disk)) return ram, disk def get_cpu_count(self): cpu_count = 0 for section in self.dmidecode_data: if section['DMIType'] == 4: # 4 corresponds to processor core_count = int(section.get('Core Count', '0')) thread_count = int(section.get('Thread Count', '0')) if thread_count: cpu_count += thread_count else: cpu_count += core_count if cpu_count == 0: with open('/proc/cpuinfo', 'r') as f: cpuinfo = f.read() cpu_count = cpuinfo.count('processor') logging.info("CPU Count: {}".format(cpu_count)) return cpu_count def get_bandwidth(self): bandwidth = 2000 logging.info("Bandwidth: {}".format(bandwidth)) return bandwidth def get_public_ip(self): try: response = urllib.request.urlopen('https://api.ipify.org') return response.read().decode() except Exception as e: logging.error("Failed to get public IP: {}".format(e)) return '127.0.0.1' def get_os_release_info(self): os_release_info = {} # reading from /etc/os-release try: with open('/etc/os-release') as f: lines = f.read().splitlines() os_release_info = {line.split('=')[0]: line.split('=')[1].strip('"') for line in lines if '=' in line} except FileNotFoundError: logging.warning("/etc/os-release not found, trying /etc/VERSION") except Exception as e: logging.error("Failed to read /etc/os-release: {}".format(e)) return {} # If /etc/os-release is not found or empty, fallback to /etc/VERSION if not os_release_info: try: with open('/etc/VERSION') as f: lines = f.read().splitlines() for line in lines: if '=' in line: key, value = line.split('=', 1) os_release_info[key.strip()] = value.strip() except Exception as e: logging.error("Failed to read /etc/VERSION: {}".format(e)) return {} return os_release_info def get_os_id(self, os_list): os_info = self.get_os_release_info() if not os_info: logging.error("No OS release info found.") for os_entry in os_list: if os_entry['name'].lower() in ["other", "custom"]: return os_entry['id'] return 1 if 'ID' in os_info and 'VERSION_ID' in os_info: current_os = (os_info.get('ID', 'Unknown') + " " + os_info.get('VERSION_ID', '').strip()).strip() else: current_os = (os_info.get('os_name', 'Unknown') +" "+ os_info.get('productversion', '').strip()).strip().lower() for os_entry in os_list: if current_os in os_entry['name'].lower(): return os_entry['id'] # Fallback checks for common OS names if full name doesn't match for os_entry in os_list: if 'ubuntu' in current_os and 'ubuntu' in os_entry['name'].lower(): return os_entry['id'] elif 'centos' in current_os and 'centos' in os_entry['name'].lower(): return os_entry['id'] elif 'fedora' in current_os and 'fedora' in os_entry['name'].lower(): return os_entry['id'] # Default to 'other' or 'custom' if no match found for os_entry in os_list: if os_entry['name'].lower() in ["other", "custom"]: return os_entry['id'] return 1 def create_post_data(self, os_list): ram, disk = self.get_ram_and_disk() post_data = { "server_type": 1, "os_id": self.get_os_id(os_list), "provider_id": 10, "location_id": 15, "ssh_port": 22, "ram": ram >> 10, # convert to GB "ram_as_mb": ram, # in MB "disk": disk, # in GB "disk_as_gb": disk, # in GB "cpu": self.get_cpu_count(), "bandwidth": self.get_bandwidth(), "was_promo": 1, "active": 1, "show_public": 0, "owned_since": "2022-01-01", "ram_type": "GB", "disk_type": "GB", "currency": "USD", "price": 4, "payment_term": 1, "hostname": self.hostname, "next_due_date": "2022-02-01", "ip1": self.public_ip, } logging.info("Post data created") return post_data def create_note_data(self): chassis_info = None for section in self.dmidecode_data: if section['DMIType'] == 1: chassis_info = section break if chassis_info: chassis_model = chassis_info.get('Product Name', 'Unknown') chassis_serial = chassis_info.get('Serial Number', 'Unknown') else: chassis_model = chassis_serial = 'Unknown' processor_info = [section for section in self.dmidecode_data if section['DMIType'] == 4] processor_model = processor_info[0].get('Version', 'Unknown') if processor_info else 'Unknown' processor_count = len(processor_info) ram_info = [section for section in self.dmidecode_data if section['DMIType'] == 17] ram_details = [] for ram in ram_info: size = ram.get('Size', 'Unknown') speed = ram.get('Speed', 'Unknown') configured_speed = ram.get('Configured Memory Speed', 'Unknown') try: total_width = int(ram.get('Total Width', "0").split()[0]) except ValueError: total_width = 0 try: data_width = int(ram.get('Data Width', "0").split()[0]) except ValueError: data_width = 0 ecc = 'Yes' if total_width > data_width else 'No' serial_number = ram.get('Serial Number', 'Unknown') ram_type = ram.get('Type', 'Unknown') ram_details.append("Size: {}, Speed: {} @ {}, ECC: {}, Serial Number: {}, Type: {}".format(size, speed, configured_speed, ecc, serial_number, ram_type)) # SATA Storage media sata_details = [] for device, details in self.hdparm_data.items(): sata_details.append("Device: {}, Model: {}, Serial: {}, Checksum: {}, Buffer Size: {}, Form Factor: {}".format( device, details['model_number'], details['serial_number'], details['checksum'], details['buffer_size'], details['form_factor'])) # NVMe Storage media nvme_details = [] for device, details in self.nvme_data.items(): nvme_details.append("Device: {}, Model: {}, Serial: {}, Firmware: {}".format( device, details.get('mn', 'Unknown'), details.get('sn', 'Unknown'), details.get('fr', 'Unknown'))) note = "Chassis Model: {} | Serial Number: {} ||| Processor Model: {} | Count: {} ||| RAM Details: {} ||| SATA Details {} ||| NVME Details {}".format( chassis_model, chassis_serial, processor_model, processor_count, ' | '.join(ram_details), ' | '.join(sata_details), ' | '.join(nvme_details)) note_data = { 'note': note, } return note_data class ServerManager: def __init__(self, host, api_key): self.host = host self.api_key = api_key self.headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + self.api_key, } logging.basicConfig(level=logging.INFO) def send_request(self, method, endpoint, data=None): url = self.host + endpoint payload = json.dumps(data).encode('utf-8') if data else None req = urllib.request.Request(url, data=payload, headers=self.headers, method=method) try: with urllib.request.urlopen(req) as response: if method == 'GET': return json.loads(response.read().decode()) else: return response.read().decode() except urllib.error.HTTPError as e: logging.error("Request failed with {}".format(e)) raise def get_existing_servers(self): return self.send_request('GET', '/api/servers') def get_note(self, service_id): return self.send_request('GET', '/api/notes/' + str(service_id)) def create_note(self, post_data): logging.info("Creating note...") return self.send_request('POST', '/api/notes', post_data) def update_note(self, post_data, service_id): logging.info("Updating note with id {}...".format(service_id)) return self.send_request('PUT', '/api/notes/' + str(service_id), post_data) def create_server(self, post_data): logging.info("Creating server...") return self.send_request('POST', '/api/servers', post_data) def update_server(self, post_data, server_id): # remove following keys from post_data for key in NON_UPDATABLE_KEYS: post_data.pop(key, None) logging.info("Updating server with id {}...".format(server_id)) return self.send_request('PUT', '/api/servers/' + str(server_id), post_data) def existing_server_id(self, post_data): existing_servers = self.get_existing_servers() for server in existing_servers: if server['hostname'] == post_data['hostname']: return server['id'] return None def upsert_server(self, post_data): server_id = self.existing_server_id(post_data) if server_id: logging.info('Server already exists with id: {}, Updating...'.format(server_id)) response = self.update_server(post_data, server_id) else: logging.info('Server does not exist, Creating...') response = self.create_server(post_data) # Extract the server_id from the response server_id = json.loads(response).get('server_id', None) if server_id is None: logging.error('Failed to get server_id from response: {}'.format(response)) raise ValueError('Failed to get server_id from response') return server_id def upsert_note(self, note_data, server_id): note_data['service_id'] = server_id try: note = self.get_note(server_id) except urllib.error.HTTPError: note = None if note: return self.update_note(note_data, server_id) else: return self.create_note(note_data) def get_os_list(self): os_list = self.send_request('GET', '/api/os') logging.info("OS list fetched successfully") if os_list else logging.error("Failed to fetch OS list") return os_list or [] def validate_env_vars(): api_key = os.getenv('AGENT_API') host = os.getenv('HOST') if not api_key: raise Exception('AGENT_API not found in environment variables') if not host: raise Exception('HOST not found in environment variables') return host, api_key def main(): logging.basicConfig(level=logging.INFO) host, api_key = validate_env_vars() server_manager = ServerManager(host, api_key) server_data = ServerData() os_list = server_manager.get_os_list() post_data = server_data.create_post_data(os_list) server_id = server_manager.upsert_server(post_data) logging.info('Server id: {}'.format(server_id)) note_data = server_data.create_note_data() server_manager.upsert_note(note_data, server_id) if __name__ == '__main__': main()