# This file is a part of molten.
#
# Copyright (C) 2018 CLEARTYPE SRL <[email protected]>
#
# molten is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# molten is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Any, Dict, List, Optional, Type, TypeVar, get_type_hints, no_type_check
from ..errors import FieldValidationError, ValidationError
from .common import Missing, is_schema
from .field import Field
_T = TypeVar("_T")
[docs]def schema(cls: Type[_T]) -> Type[_T]:
"""Construct a schema from a class.
Schemas are plain Python classes with automatically-generated
``__init__``, ``__eq__`` and ``__repr__`` methods. They may be
used to validate requests and serialize responses.
Examples:
>>> @schema
... class Account:
... username: str
... password: str = Field(request_only=True)
... is_admin: bool = Field(response_only=True, default=False)
>>> load_schema(Account, {})
Traceback (most recent call last):
...
ValidationError: {'username': 'this field is required', 'password': 'this field is required'}
>>> load_schema(Account, {"username": "example", "password": "secret"})
Account(username='example', password='secret', is_admin=False)
>>> dump_schema(load_schema(Account, {"username": "example", "password": "secret"}))
{'username': 'example', 'is_admin': False}
Raises:
RuntimeError: When the attributes are invalid.
"""
fields = {}
for base in cls.__mro__[-1:0:-1]:
base_fields = getattr(base, "_FIELDS", {})
for name, field in base_fields.items():
fields[name] = field
annotations = get_type_hints(cls)
found_default = False
for name, annotation in annotations.items():
value = getattr(cls, name, Missing)
if value is Missing:
value = fields.get(name, value)
if isinstance(value, Field):
value.name = name
value.annotation = annotation
value.request_name = value.request_name or name
value.response_name = value.response_name or name
fields[name] = value
else:
fields[name] = Field(name=name, annotation=annotation, default=value)
# At this point the field instance has an annotation for sure
# so it's safe to select a Validator.
field = fields[name]
field.select_validator()
# Make sure fields without a default don't come after fields
# with one.
if field.has_default:
found_default = True
elif found_default:
raise RuntimeError("attributes without a default cannot follow ones with a default")
# Remove the attribute from the class definition.
try:
if value is not Missing:
delattr(cls, name)
except AttributeError:
pass
if not fields:
raise RuntimeError(f"schema {cls.__name__} doesn't have any fields")
setattr(cls, "__slots__", list(fields))
setattr(cls, "_SCHEMA", True)
setattr(cls, "_FIELDS", fields)
_add_init(cls, fields)
_add_fn(cls, "__eq__", ["self", "other"], _EQ_FN_BODY)
_add_fn(cls, "__repr__", ["self"], _REPR_FN_BODY)
return cls
[docs]@no_type_check
def load_schema(schema: Type[_T], data: Dict[str, Any]) -> _T:
"""Validate the given data dictionary against a schema and
instantiate the schema.
Raises:
ValidationError: When the input data is not valid.
Parameters:
schema: The schema class to validate the data against.
data: Data to validate against and populate the schema with.
"""
if not is_schema(schema):
raise TypeError(f"{schema} is not a schema")
errors, params = {}, {}
for field in schema._FIELDS.values():
if field.response_only:
# Response-only fields without an explicit default have to
# default to _something_ so we choose None.
if not field.has_default:
params[field.name] = None
continue
try:
value = data.get(field.request_name, Missing)
params[field.name] = field.validate(value)
except FieldValidationError as e:
errors[field.request_name] = str(e)
except ValidationError as e:
errors[field.request_name] = e.reasons
if errors:
raise ValidationError(errors)
return schema(**params)
[docs]@no_type_check
def dump_schema(ob: Any, *, sparse: bool = False) -> Dict[str, Any]:
"""Convert a schema instance into a dictionary.
Raises:
TypeError: If ob is not a schema instance.
Parameters:
ob: An instance of a schema.
sparse: If true, fields whose values are None are going to be
dropped from the output.
"""
if not is_schema(type(ob)):
raise TypeError(f"{ob} is not a schema")
data = {}
for field in ob._FIELDS.values():
if field.request_only:
continue
value = getattr(ob, field.name)
if is_schema(type(value)):
value = dump_schema(value, sparse=sparse)
elif isinstance(value, list):
value = [dump_schema(item, sparse=sparse) if is_schema(type(item)) else item for item in value]
elif isinstance(value, dict):
value = {name: dump_schema(item, sparse=sparse) if is_schema(type(item)) else item for name, item in value.items()}
if sparse and value is None:
continue
data[field.response_name] = value
return data
def _add_fn(
cls: Type[Any],
name: str,
params: List[str],
body: List[str],
fn_globals: Optional[Dict[str, Any]] = None,
fn_locals: Optional[Dict[str, Any]] = None,
) -> None:
"""Construct a function and add it to a class.
"""
if name in cls.__dict__:
return
fn_globals = {"Missing": Missing, **(fn_globals or {})}
fn_locals = fn_locals or {}
definition = _FN_TEMPLATE.format(
name=name,
params=", ".join(params),
body="\n ".join(body),
)
exec(definition, fn_globals, fn_locals)
setattr(cls, name, fn_locals[name])
def _add_init(cls: Type[Any], fields: Dict[str, Field[_T]]) -> None:
"""Construct and add an init function to a schema.
"""
fn_globals: Dict[str, Any] = {}
fn_params = ["self"]
fn_body = []
for field in fields.values():
if field.default is not Missing:
default_name = f"_{field.name}_default"
fn_globals[default_name] = field.default
fn_params.append(f"{field.name}=Missing")
fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {default_name}")
elif field.default_factory:
factory_name = f"_{field.name}_default_factory"
fn_globals[factory_name] = field.default_factory
fn_params.append(f"{field.name}=Missing")
fn_body.append(f"self.{field.name} = {field.name} if {field.name} is not Missing else {factory_name}()")
else:
fn_params.append(f"{field.name}")
fn_body.append(f"self.{field.name} = {field.name}")
_add_fn(cls, "__init__", fn_params, fn_body, fn_globals)
_FN_TEMPLATE = """\
def {name}({params}):
{body}
""".rstrip()
_EQ_FN_BODY = """\
try:
return all(getattr(self, name) == getattr(other, name) for name in self._FIELDS)
except AttributeError:
return False
""".rstrip().split("\n")
_REPR_FN_BODY = """\
params = ', '.join(f'{name}={repr(getattr(self, name))}' for name in self._FIELDS)
return f'{type(self).__name__}({params})'
""".rstrip().split("\n")