"""A library of helper functions for the Cheroot test suite.""" from __future__ import absolute_import, division, print_function __metaclass__ = type import datetime import logging import os import sys import time import threading import types from six.moves import http_client import six import cheroot.server import cheroot.wsgi from cheroot.test import webtest log = logging.getLogger(__name__) thisdir = os.path.abspath(os.path.dirname(__file__)) config = { 'bind_addr': ('', 54583), 'server': 'wsgi', 'wsgi_app': None, } class CherootWebCase(webtest.WebCase): """Helper class for a web app test suite.""" script_name = '' scheme = 'http' available_servers = { 'wsgi': cheroot.wsgi.Server, 'native': cheroot.server.HTTPServer, } @classmethod def setup_class(cls): """Create and run one HTTP server per class.""" conf = config.copy() conf.update(getattr(cls, 'config', {})) s_class = conf.pop('server', 'wsgi') server_factory = cls.available_servers.get(s_class) if server_factory is None: raise RuntimeError('Unknown server in config: %s' % conf['server']) cls.httpserver = server_factory(**conf) cls.HOST, cls.PORT = cls.httpserver.bind_addr if cls.httpserver.ssl_adapter is None: ssl = '' cls.scheme = 'http' else: ssl = ' (ssl)' cls.HTTP_CONN = http_client.HTTPSConnection cls.scheme = 'https' v = sys.version.split()[0] log.info('Python version used to run this test script: %s' % v) log.info('Cheroot version: %s' % cheroot.__version__) log.info('HTTP server version: %s%s' % (cls.httpserver.protocol, ssl)) log.info('PID: %s' % os.getpid()) if hasattr(cls, 'setup_server'): # Clear the wsgi server so that # it can be updated with the new root cls.setup_server() cls.start() @classmethod def teardown_class(cls): """Cleanup HTTP server.""" if hasattr(cls, 'setup_server'): cls.stop() @classmethod def start(cls): """Load and start the HTTP server.""" threading.Thread(target=cls.httpserver.safe_start).start() while not cls.httpserver.ready: time.sleep(0.1) @classmethod def stop(cls): """Terminate HTTP server.""" cls.httpserver.stop() td = getattr(cls, 'teardown', None) if td: td() date_tolerance = 2 def assertEqualDates(self, dt1, dt2, seconds=None): """Assert ``abs(dt1 - dt2)`` is within ``Y`` seconds.""" if seconds is None: seconds = self.date_tolerance if dt1 > dt2: diff = dt1 - dt2 else: diff = dt2 - dt1 if not diff < datetime.timedelta(seconds=seconds): raise AssertionError( '%r and %r are not within %r seconds.' % (dt1, dt2, seconds), ) class Request: """HTTP request container.""" def __init__(self, environ): """Initialize HTTP request.""" self.environ = environ class Response: """HTTP response container.""" def __init__(self): """Initialize HTTP response.""" self.status = '200 OK' self.headers = {'Content-Type': 'text/html'} self.body = None def output(self): """Generate iterable response body object.""" if self.body is None: return [] elif isinstance(self.body, six.text_type): return [self.body.encode('iso-8859-1')] elif isinstance(self.body, six.binary_type): return [self.body] else: return [x.encode('iso-8859-1') for x in self.body] class Controller: """WSGI app for tests.""" def __call__(self, environ, start_response): """WSGI request handler.""" req, resp = Request(environ), Response() try: # Python 3 supports unicode attribute names # Python 2 encodes them handler = self.handlers[environ['PATH_INFO']] except KeyError: resp.status = '404 Not Found' else: output = handler(req, resp) if ( output is not None and not any( resp.status.startswith(status_code) for status_code in ('204', '304') ) ): resp.body = output try: resp.headers.setdefault('Content-Length', str(len(output))) except TypeError: if not isinstance(output, types.GeneratorType): raise start_response(resp.status, resp.headers.items()) return resp.output()