cloud-init-cherrypy/main.py

330 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Serve cloud init files
"""
import configparser
import os
import socket
import sys
import uuid as uuid_module
from ipaddress import AddressValueError, IPv4Address, IPv6Address
from os import access, R_OK
import cherrypy
# from cherrypy.lib.static import serve_file
from jinja2 import Template
import yaml
PATH = os.path.dirname(os.path.abspath(__file__))
CONFIG = configparser.ConfigParser()
CONFIG.read(os.path.join(PATH, "config.ini"))
USER_DATA_FILENAME = CONFIG["app"].get("user_data", "user-data")
META_DATA_FILENAME = CONFIG["app"].get("meta_data", "meta-data")
REDIRECT_FILENAME = CONFIG["app"].get("redirect", "redirect")
class CloudInitRequest:
"""
Request data for persistence across methods
"""
def __init__(self, request, uuid=None):
self.remoteip = None
self.hostinfo = ('localhost', )
self.request = request
self.meta_data = None
self.meta_data_loaded = False
self.user_data = None
try:
self.uuid = str(uuid_module.UUID('{' + uuid + '}'))
# ValueError is wrong UUID syntax
# TypeError is None
except (ValueError, TypeError):
self.uuid = None
self._init_ip()
self._generate_default_meta_data()
def _can_ip_be_proxy(self):
"""
Assuming the connection is through a proxy, is this proxy permitted?
Can't proxy from a publicly reachable IP.
"""
self.remoteip = self.request.remote.ip
try:
ipobj = IPv4Address(self.remoteip)
except AddressValueError:
try:
ipobj = IPv6Address(self.remoteip)
except AddressValueError:
return False
return not ipobj.is_global
def _init_ip(self):
"""
Get remote IP
"""
if self._can_ip_be_proxy():
try:
self.remoteip = self.request.headers.get(
'X-Real-Ip',
self.request.remote.ip
)
except KeyError:
pass
try:
self.hostinfo = socket.gethostbyaddr(self.remoteip)
forward_lookup = socket.gethostbyname(self.hostinfo[0])
if forward_lookup != self.remoteip:
self.hostinfo = ('localhost', )
except socket.herror:
self.hostinfo = ('localhost', )
except socket.gaierror:
self.hostinfo = (self.remoteip, )
def _generate_default_meta_data(self):
"""
Build default meta-data based on the requesting IP
"""
hostname = self.hostinfo[0]
base = self.request.base
self.meta_data = {
"instance-id": hostname.split(".")[0],
"local-hostname": hostname,
"baseurl": base
}
def _update_meta_data_from_file(self, filename):
"""
Load meta-data from a file
"""
with open(filename, "r") as metadata:
self.meta_data.update(yaml.safe_load(metadata))
def _get_filename(self, filetype):
"""
Get the best available filename for a given type
"""
check_dirs = ['localhost', ]
if self.uuid:
check_dirs.insert(0, self.uuid)
if self.meta_data['local-hostname'] != 'localhost':
check_dirs.insert(0, self.meta_data['local-hostname'])
for subdir in check_dirs:
filepath = os.path.join(PATH, "data",
subdir,
filetype)
if not os.path.isfile(filepath) \
or not access(filepath, R_OK):
continue
return filepath
return None
def get_meta_data(self):
"""
Return the appropriate meta-data for this request
"""
if self.meta_data_loaded:
return self.meta_data
self._generate_default_meta_data()
filename = self._get_filename(META_DATA_FILENAME)
if filename:
self._update_meta_data_from_file(
filename
)
self.meta_data_loaded = True
return self.meta_data
def save_meta_data(self, data):
"""
Save metadata
"""
filename = self._get_filename(META_DATA_FILENAME)
with open(filename, "wt") as filehandle:
filehandle.write(data)
def get_user_data(self):
"""
Return the appropriate meta-data for this request
"""
if self.user_data:
return self.user_data
filename = self._get_filename(USER_DATA_FILENAME)
if not filename:
return None
with open(filename, "r") as userdata:
self.user_data = userdata.read()
return self.user_data
return None
def optional_redirect(self):
"""
Return optional redirect, or None
"""
filename = self._get_filename(REDIRECT_FILENAME)
if not (filename and os.path.exists(filename)):
return None
try:
with open(filename) as redirect:
content = redirect.read().splitlines()
return content[0]
except IOError:
return None
return None
class CloudInitApp:
"""
Serve cloud init files
"""
@staticmethod
def _redirect_if_needed(request):
redirect = request.optional_redirect()
if redirect:
raise cherrypy.HTTPRedirect(redirect, 301)
return False
@staticmethod
def _content_type(data):
if not data:
return "text/cloud-config"
if data.startswith("#include"):
return "text/x-include-url"
if data.startswith("## template: jinja"):
return "text/jinja2"
return "text/cloud-config"
@staticmethod
def _wrap_metadata(metadata):
return {'ds': {
'meta_data': metadata
}}
def _user_data(self, uuid=None, SYSUUID=None):
"""
Serves a static file
But can process a template if x-include-url
"""
if SYSUUID: # SYSLINUX support
uuid = SYSUUID
request = CloudInitRequest(cherrypy.request, uuid)
self._redirect_if_needed(request)
user_data = request.get_user_data()
c__ = self._content_type(user_data)
cherrypy.response.headers['Content-Type'] = c__
cherrypy.response.headers['Content-Disposition'] = \
'attachment; filename="user-data"'
if c__ == "text/x-include-url":
t__ = Template(user_data)
metadata = request.get_meta_data()
wrapped = CloudInitApp._wrap_metadata(metadata)
return t__.render(**wrapped)
return user_data
def _meta_data(self, uuid=None):
"""
Return meta-data in YAML
"""
request = CloudInitRequest(cherrypy.request, uuid)
self._redirect_if_needed(request)
cherrypy.response.headers['Content-Type'] = \
'text/yaml'
cherrypy.response.headers['Content-Disposition'] = \
'attachment; filename="meta-data"'
return yaml.dump(request.get_meta_data())
@staticmethod
def _vendor_data():
return ""
@cherrypy.expose
def user_data(self):
"""
v1 api endpoint user-data
"""
return self._user_data()
@cherrypy.expose
def meta_data(self):
"""
v1 api endpoint meta-data
"""
return self._meta_data()
@cherrypy.expose
def vendor_data(self):
"""
v1 api endpoint vendor-data
"""
return self._vendor_data()
@cherrypy.expose
def apiv2(self, uuid=None, filetype="user-data"):
"""
API endpoint v2
extract command from arguments
"""
if filetype == "user-data":
return self._user_data(uuid)
if filetype == "meta-data":
return self._meta_data(uuid)
if filetype == "vendor-data":
return self._vendor_data()
return ""
# unused
# @cherrypy.expose
def finished(self, data):
"""
Saves additional meta-data
:param data: meta-data to be added
"""
request = CloudInitRequest(cherrypy.request)
self._redirect_if_needed(request)
request.save_meta_data(data)
ROOT = CloudInitApp()
if __name__ == "__main__":
cherrypy.server.socket_host = \
CONFIG["server"].get("server_host", "127.0.0.1")
cherrypy.server.socket_port = \
CONFIG["server"].getint("server_port", 8081)
ENGINE = cherrypy.engine
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG = {
'/include': {
'tools.staticdir.on': True,
'tools.staticdir.dir': os.path.join(CURRENT_DIR,
'data',
'include'),
'tools.staticdir.content_types': {
'yml': 'text/yaml'
}
}
}
cherrypy.tree.mount(ROOT, config=CONFIG)
if hasattr(ENGINE, "signal_handler"):
ENGINE.signal_handler.subscribe()
if hasattr(ENGINE, "console_control_handler"):
ENGINE.console_control_handler.subscribe()
try:
ENGINE.start()
except Exception: # pylint: disable=broad-except
sys.exit(1)
else:
ENGINE.block()