# Here be Dragons. OpenAPI and especially JSONSchema are not very
# good specs, but at least they're popular!
import ast
import inspect
from collections import defaultdict
from operator import itemgetter
from textwrap import dedent
from types import FunctionType, MethodType
from typing import (
Any, Callable, Dict, List, Optional, Set, Tuple, Union, get_type_hints, no_type_check
from typing_inspect import get_origin, is_generic_type, is_typevar, is_union_type
from ..app import BaseApp
from ..http import UploadedFile
from ..router import get_route_parameters
from ..typing import Header, QueryParam, extract_optional_annotation, get_args
from ..validation import Field, dump_schema, field, is_schema, schema
class License:
"""License information for the exposed API.
name: str
url: Optional[str] = None
class Schema:
"""Describes the type and attributes of a value.
type: Optional[str] = None
format: Optional[str] = None # noqa
description: Optional[str] = None
required: Optional[List[str]] = None
properties: Optional[Dict[str, Any]] = None
nullable: Optional[bool] = None
all_of: Optional[List[Dict[str, str]]] = field(response_name="allOf", default=None)
any_of: Optional[List[Dict[str, str]]] = field(response_name="anyOf", default=None)
items: Optional[Dict[str, str]] = None
choices: Optional[List[str]] = field(response_name="enum", default=None)
pattern: Optional[str] = None
minimum: Optional[Union[int, float]] = None
maximum: Optional[Union[int, float]] = None
multiple_of: Optional[Union[int, float]] = field(response_name="multipleOf", default=None)
min_length: Optional[int] = field(response_name="minLength", default=None)
max_length: Optional[int] = field(response_name="maxLength", default=None)
read_only: Optional[bool] = field(response_name="readOnly", default=None)
write_only: Optional[bool] = field(response_name="writeOnly", default=None)
ref: Optional[str] = field(response_name="$ref", default=None)
class Parameter:
"""Describes a single handler parameter.
name: str
in_: str = field(response_name="in", choices=["query", "header", "path", "cookie"])
description: Optional[str] = None
required: Optional[bool] = None
deprecated: bool = False
schema: Optional[Schema] = None
class APIKeySecurityScheme:
"""Describes an API key-based security scheme.
name: str = field(request_only=True)
param_name: str = field(response_name="name")
in_: str = field(response_name="in", choices=["query", "header", "cookie"])
type: str = field(response_only=True, choices=["apiKey"], default="apiKey")
class HTTPSecurityScheme:
"""Describes an HTTP-based security scheme.
name: str
scheme: str = field(choices=["basic", "bearer"])
type: str = field(response_only=True, choices=["http"], default="http")
#: The union of acceptable security schemes.
SecurityScheme = Union[
def generate_openapi_document(
app: BaseApp,
metadata: Metadata,
security_schemes: List[SecurityScheme],
default_security_scheme: Optional[str] = None,
) -> Dict[str, Any]:
"""Generate an OpenAPI v3 document from an application object and
some metadata.
request_mime_types = {parser.mime_type for parser in app.parsers}
response_mime_types = {renderer.mime_type for renderer in app.renderers}
schemas: Dict[str, Schema] = {}
paths: Dict[str, Dict[str, Any]] = defaultdict(dict)
for method, routes in app.router._routes_by_method.items():
method = method.lower()
for route in routes:
handler = route.handler
if not isinstance(handler, (FunctionType, MethodType)):
handler = handler.__call__ # type: ignore
parameters = []
request_schema_name = None
annotations = get_type_hints(handler)
route_template_parameters = get_route_parameters(route.template)
for name, annotation in annotations.items():
if name == "return":
is_optional, annotation = extract_optional_annotation(annotation)
if name in route_template_parameters:
name, "path",
description=_get_annotation(handler, f"param_{name}_description"),
elif annotation is QueryParam:
name, "query",
description=_get_annotation(handler, f"param_{name}_description"),
required=not is_optional,
elif annotation is Header:
name.replace("_", "-"), "header",
description=_get_annotation(handler, f"param_{name}_description"),
required=not is_optional,
elif is_schema(annotation):
request_schema_name = _generate_schema(annotation, schemas)
operation = paths[route.template][method] = {
"tags": _get_annotation(handler, "tags", []),
"operationId": route.name,
"description": dedent(handler.__doc__ or "").rstrip(),
"parameters": [dump_schema(param, sparse=True) for param in parameters],
"deprecated": _get_annotation(handler, "deprecated", False),
"responses": {"200": {"description": "A successful response.", "content": {}}},
if request_schema_name is not None:
operation["requestBody"] = {"content": {}}
for media_type in request_mime_types:
operation["requestBody"]["content"][media_type] = {
"schema": _make_schema_ref(request_schema_name),
# Sort the media types for tools like the Swagger UI.
operation["requestBody"]["content"] = _sort_dict(operation["requestBody"]["content"])
# When the response annotation is a tuple of the form
# [str, ...] then we assume that the responses will
# contain custom status codes followed by the response
# objects themselves.
response_annotation = annotations.get("return")
response_annotation_origin = _get_origin(response_annotation)
if response_annotation is not None and response_annotation_origin in _TUPLE_TYPES:
arguments = get_args(response_annotation)
if len(arguments) == 2 and arguments[0] is str and is_schema(arguments[1]):
response_annotation = arguments[1]
if response_annotation is not None:
if is_schema(response_annotation):
response_schema_name = _generate_schema(response_annotation, schemas)
for media_type in response_mime_types:
operation["responses"]["200"]["content"][media_type] = {
"schema": _make_schema_ref(response_schema_name),
elif response_annotation_origin in _LIST_TYPES:
arguments = get_args(response_annotation)
if is_schema(arguments[0]):
response_schema_name = _generate_schema(arguments[0], schemas)
for media_type in response_mime_types:
operation["responses"]["200"]["content"][media_type] = {
"schema": {
"type": "array",
"items": _make_schema_ref(response_schema_name),
status_codes = _extract_status_codes(handler)
for status_code in status_codes:
status_code_ob = operation["responses"][str(status_code)] = {}
if status_code < 300 and status_code != 204:
description = _get_annotation(handler, f"response_{status_code}_description")
# If the declared return type is a response-code-tuple and
# the status code finder couldn't find a 200 status code,
# then it should be safe to drop that code from the
# responses object.
if _get_origin(annotations.get("return")) in _TUPLE_TYPES and 200 not in status_codes:
del operation["responses"]["200"]
# TODO: Add support for OAuth2 security scheme.
# TODO: Make it possible to annotate that a handler
# doesn't require a security scheme.
if default_security_scheme is not None:
operation["security"] = [{default_security_scheme: []}]
return {
"openapi": "3.0.1",
"info": dump_schema(metadata, sparse=True),
"paths": _sort_dict(paths),
"components": {
"schemas": _sort_dict({name: dump_schema(schema, sparse=True) for name, schema in schemas.items()}),
"securitySchemes": _sort_dict({scheme.name: dump_schema(scheme, sparse=True) for scheme in security_schemes}),
def _generate_schema(schema: Any, schemas: Dict[str, Schema]) -> str:
name = f"{schema.__module__}.{schema.__name__}"
if name in schemas:
return name
definition = Schema(
for field in schema._FIELDS.values(): # noqa
if field.request_name == field.response_name:
field_names = [field.request_name]
field_names = [field.request_name, field.response_name]
for field_name in field_names:
is_optional, field_schema = _generate_field_schema(field_name, field, schemas)
if field_schema is not None:
definition.properties[field_name] = field_schema
if not is_optional:
schemas[name] = definition
return name
def _get_origin(annotation: Any) -> Any:
return get_origin(annotation) or annotation
def _is_generic_type(annotation: Any) -> bool:
# On 3.9, typing_inspect doesn't clasify annotations without type
# parameters as generic so we add fallback cases here.
return is_generic_type(annotation) or \
annotation in _LIST_TYPES or \
annotation in _DICT_TYPES
def _generate_field_schema(field_name: str, field: Field, schemas: Dict[str, Schema]) -> Tuple[bool, Schema]:
is_optional, annotation = extract_optional_annotation(field.annotation)
if is_schema(annotation):
field_schema_name = _generate_schema(annotation, schemas)
field_schema = Schema(ref=_make_ref_path(field_schema_name))
elif _is_generic_type(annotation):
origin = _get_origin(annotation)
if origin in _LIST_TYPES:
arguments = get_args(annotation)
if arguments and is_schema(arguments[0]):
item_schema_name = _generate_schema(arguments[0], schemas)
field_schema = Schema("array", items=_make_schema_ref(item_schema_name))
field_schema = _generate_primitive_schema(annotation)
elif origin in _DICT_TYPES:
# TODO: Add support for additionalFields.
field_schema = _generate_primitive_schema(dict)
else: # pragma: no cover
raise ValueError(f"Unsupported type {origin} for field {field.name!r}.")
elif is_union_type(annotation):
sub_schemas = []
for arg in get_args(annotation):
_, sub_schema = _generate_field_schema("", Field(annotation=arg), schemas)
sub_schemas.append(dump_schema(sub_schema, sparse=True))
field_schema = Schema(any_of=sub_schemas)
field_schema = _generate_primitive_schema(annotation)
if field_schema is not None:
field_schema.description = field.description
if field.request_name != field.response_name:
if field_name == field.request_name:
field_schema.write_only = True
field_schema.read_only = True
elif field.response_only:
field_schema.read_only = True
elif field.request_only:
field_schema.write_only = True
for option, value in field.validator_options.items():
if option in Schema._FIELDS:
setattr(field_schema, option, value)
return is_optional, field_schema
def _generate_primitive_schema(annotation: Any) -> Optional[Schema]:
arguments = _PRIMITIVE_ANNOTATION_MAP[annotation]
return Schema(*arguments)
except KeyError:
origin = _get_origin(annotation)
if origin in _LIST_TYPES:
arguments = get_args(annotation)
if not arguments or is_typevar(arguments[0]):
return Schema("array", items=_ANY_VALUE)
return Schema("array", items=_generate_primitive_schema(arguments[0]))
# TODO: Add support for additionalFields.
return Schema("string")
def _extract_status_codes(handler: Callable[..., Any]) -> List[int]:
source = inspect.getsource(handler)
finder = _StatusCodeFinder()
return finder.find(ast.parse(dedent(source)))
except OSError: # pragma: no cover
return []
def _make_ref_path(name: str) -> str:
return f"#/components/schemas/{name}"
def _make_schema_ref(name: str) -> Dict[str, str]:
return {"$ref": _make_ref_path(name)}
def _get_annotation(handler: Callable[..., Any], name: str, default: Any = None) -> Any:
return getattr(handler, f"openapi_{name}", default)
def _sort_dict(data: Dict[Any, Any]) -> Dict[Any, Any]:
# This relies on the ordered dict implementation in Py3.6+.
return dict(sorted(data.items(), key=itemgetter(0)))
class _StatusCodeFinder(ast.NodeVisitor):
"""Finds usages of HTTP_* in an AST.
def __init__(self) -> None:
self.status_codes: Set[int] = set()
def find(self, tree: Any) -> List[int]:
return sorted(list(self.status_codes))
def visit_Name(self, node: Any) -> None:
if node.id.startswith("HTTP_"):
except ValueError: # pragma: no cover
#: Maps primitive types to Schema parameters.
UploadedFile: ["string", "binary"],
int: ["integer", "int64"],
str: ["string"],
bool: ["boolean"],
dict: ["object"],
float: ["number", "double"],
bytes: ["string", "binary"],
#: A schema that accepts any value whatsoever. Used in generic types
#: w/o a type annotation (eg. List).
"description": "Can be any value, including null.",
"nullable": True,
_DICT_TYPES = {dict, Dict}
_LIST_TYPES = {list, List}
_TUPLE_TYPES = {tuple, Tuple}