Source code for molten.validation.field

# This file is a part of molten.
#
# Copyright (C) 2018 CLEARTYPE SRL <[email protected]>
#
# molten is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# molten is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import re
from typing import (
    Any, Callable, Dict, Generic, List, Optional, Sequence, Type, TypeVar, Union, no_type_check
)

from typing_extensions import Protocol
from typing_inspect import get_origin, is_generic_type, is_typevar, is_union_type

from ..errors import FieldValidationError, ValidationError
from ..typing import extract_optional_annotation, get_args
from .common import Missing, _Missing, is_schema
from .forward import is_forward_ref

_T = TypeVar("_T")


[docs]@no_type_check def field(*args, **kwargs) -> Any: """An alias for :class:`.Field` that tricks the type system into submission. """ return Field(*args, **kwargs)
[docs]class Validator(Protocol[_T]): # pragma: no cover """Validators ensure that values conform to arbitrary specifications. """ def can_validate_field(self, field: "Field[_T]") -> bool: """This should return True if this validator can validate the given Field. """ ... @no_type_check def validate(self, field: "Field[_T]", value: Any, **options: Any) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: If the value is not valid. """ ...
[docs]class Field(Generic[_T]): """An individual field on a schema. The @schema decorator automatically turns annotated attributes into fields, but the field class can also be used to enrich annotated attributes with metadata. Examples: >>> @schema ... class Application: ... name: str ... rating: int = Field(minimum=1, maximum=5) Parameters: name: The name of the field. Automatically populated by the schema decorator. annotation: The field's annotation. Like name, this is automatically populated by @schema. description: An optional description for the field. default: An optional default value for the field. default_factory: An optional default function for the field. request_name: The field's name within a request. This is the same as the field's name by default. response_name: The field's name within a response. This is the same as the field's name by default. request_only: Whether or not to exclude this field from responses. Defaults to False. response_only: Whether or not to ignore this field when loading requests. Defaults to False. allow_coerce: Whether or not values passed to this field may be coerced to the correct type. Defaults to False. validator: The validator to use when loading data. The schema decorator will automatically pick a validator for builtin types. **validator_options: Arbitrary options passed to the field's validator. """ __slots__ = [ "name", "annotation", "description", "default", "default_factory", "request_name", "response_name", "request_only", "response_only", "allow_coerce", "validator", "validator_options", ] def __init__( self, name: Optional[str] = None, annotation: Optional[Type[_T]] = None, description: Optional[str] = None, default: Union[_T, _Missing] = Missing, default_factory: Optional[Callable[[], _T]] = None, request_name: Optional[str] = None, response_name: Optional[str] = None, request_only: bool = False, response_only: bool = False, allow_coerce: bool = False, validator: Optional[Validator[_T]] = None, **validator_options: Any, ) -> None: self.name = name self.annotation = annotation self.description = description self.default = default self.default_factory = default_factory self.request_name = request_name or name self.response_name = response_name or name self.request_only = request_only self.response_only = response_only self.allow_coerce = allow_coerce self.validator = validator self.validator_options = validator_options
[docs] def select_validator(self) -> None: """Find a suitable Validator for this field. """ if self.validator is None: self.validator = _select_validator(self) if self.validator_options and not self.validator: raise RuntimeError(f"no validator could be selected for field {self}")
@property def has_default(self) -> bool: """Returns True if the field has either a default value or a default factory. """ return self.default is not Missing or self.default_factory is not None
[docs] @no_type_check def validate(self, value: Optional[Any]) -> _T: """Validate and possibly transform the given value. Raises: FieldValidationError: When the value is not valid. """ is_optional, annotation = extract_optional_annotation(self.annotation) # Distinguishing between missing values and null values is # important. Optional types can have None as a value whereas # types with a default cannot. Additionally, it's possible to # have an optional type without a default value. if value is Missing: if self.default is not Missing: return self.default elif self.default_factory: return self.default_factory() elif is_optional: return None raise FieldValidationError("this field is required") if value is None: if not is_optional: raise FieldValidationError("this field cannot be null") return value if annotation not in (Any,) and \ not is_forward_ref(annotation) and \ not is_generic_type(annotation) and \ not is_union_type(annotation) and \ not is_typevar(annotation) and \ not is_schema(annotation) and \ not isinstance(value, annotation): if not self.allow_coerce: raise FieldValidationError(f"unexpected type {type(value).__name__}") try: value = annotation(value) except Exception: raise FieldValidationError(f"value could not be coerced to {annotation.__name__}") if self.validator: return self.validator.validate(self, value, **self.validator_options) return value
def __repr__(self) -> str: params = ", ".join(f"{name}={repr(getattr(self, name))}" for name in self.__slots__) return f"{type(self).__name__}({params})"
class ForwardRefValidator: """Validates forward references. """ def can_validate_field(self, field: Field[_T]) -> bool: return is_forward_ref(field.annotation) or \ is_forward_ref(extract_optional_annotation(field.annotation)[1]) @no_type_check def validate(self, field: Field[_T], value: Any) -> Any: forward_ref = field.annotation if not is_forward_ref(field.annotation): _, forward_ref = extract_optional_annotation(field.annotation) field = Field(annotation=forward_ref.lookup()) field.select_validator() return field.validate(value)
[docs]class NumberValidator: """Validates numbers. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is int or annotation is float def validate( self, field: Field[_T], value: Union[int, float], minimum: Optional[Union[int, float]] = None, maximum: Optional[Union[int, float]] = None, multiple_of: Optional[Union[int, float]] = None, ) -> Union[int, float]: if minimum is not None and value < minimum: raise FieldValidationError(f"value must be >= {minimum}") if maximum is not None and value > maximum: raise FieldValidationError(f"value must be <= {maximum}") if multiple_of is not None and value % multiple_of != 0: raise FieldValidationError(f"value must be a multiple of {multiple_of}") return value
[docs]class StringValidator: """Validates strings. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return annotation is str def validate( self, field: Field[_T], value: str, choices: Optional[Sequence[str]] = None, pattern: Optional[str] = None, min_length: Optional[int] = None, max_length: Optional[int] = None, strip_spaces: bool = False, ) -> str: if choices is not None and value not in choices: raise FieldValidationError(f"must be one of: {', '.join(repr(choice) for choice in choices)}") if pattern is not None and not re.match(pattern, value): raise FieldValidationError(f"must match pattern {pattern!r}") if min_length is not None and len(value) < min_length: raise FieldValidationError(f"length must be >= {min_length}") if max_length is not None and len(value) > max_length: raise FieldValidationError(f"length must be <= {max_length}") if strip_spaces: return value.strip() return value
[docs]class ListValidator: """Validates lists. When a generic parameter is provided, then the values will be validated against that annotation:: >>> @schema ... class Setting: ... name: str ... value: str >>> @schema ... class Account: ... settings: List[Setting] >>> load_schema(Account, {"settings": [{"name": "a", "value": "b"}]}) Account(settings=[Setting(name="a", value="b")]) >>> load_schema(Account, {"settings": [{"name": "a"}]}) Traceback (most recent call last): ... ValidationError: {"settings": {0: {"value": "this field is required"}}} When a generic parameter isn't provided, then any list is accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return get_origin(annotation) in LIST_TYPES @no_type_check def validate( self, field: Field[_T], value: List[Any], min_items: Optional[int] = None, max_items: Optional[int] = None, item_validator_options: Optional[Dict[str, Any]] = None, ) -> List[Any]: if not isinstance(value, list): raise FieldValidationError("value must be a list") if min_items is not None and len(value) < min_items: raise FieldValidationError(f"length must be >= {min_items}") if max_items is not None and len(value) > max_items: raise FieldValidationError(f"length must be <= {max_items}") _, annotation = extract_optional_annotation(field.annotation) # If the argument is Any, then the list can contain anything, # otherwise each item needs to be validated. annotation_args = getattr(annotation, "__args__", []) if annotation_args != (Any,): # This is a little piggy but it works well enough in practice. item_validator_options = item_validator_options or {} sub_field = Field(annotation=annotation_args[0], **item_validator_options) sub_field.select_validator() items = [] for i, item in enumerate(value): try: items.append(sub_field.validate(item)) except FieldValidationError as e: raise ValidationError({i: str(e)}) except ValidationError as e: raise ValidationError({i: e.reasons}) return items return value
[docs]class DictValidator: """Validates dictionaries. When the ``fields`` option is provided, only the declared fields are going to be extracted from the input and will be validated. >>> @schema ... class Account: ... settings: Dict[str, str] = Field(fields={ ... "a": Field(annotation=str), ... }) >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b", "c": "d"}}) Account(settings={"settings" {"a": "b"}}) >>> load_schema(Account, {"settings": {"a": 42}}) Traceback (most recent call last): ... ValidationError: {"settings": {"a": "unexpected type int"}} When the ``fields`` option is not provided and the annotation has generic parameters, then the items from the input will be validated against the generic parameter annotations:: >>> @schema ... class Account: ... settings: Dict[str, str] >>> load_schema(Account, {"settings": {}}) Account(settings={}) >>> load_schema(Account, {"settings": {"a": "b"}}) Account(settings={"a": "b"}) >>> load_schema(Account, {"settings": {"a": 42}) # invalid Traceback (most recent call last): ... ValidationError: {"settings": {"a": "unexpected type int"}} When neither ``fields`` or generic parameters are provided, then any dictionary will be accepted. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return get_origin(annotation) in DICT_TYPES @no_type_check def validate( self, field: Field[_T], value: Dict[Any, Any], fields: Optional[Dict[str, Field[Any]]] = None, key_validator_options: Optional[Dict[str, Any]] = None, value_validator_options: Optional[Dict[str, Any]] = None, ) -> Dict[Any, Any]: if not isinstance(value, dict): raise FieldValidationError("value must be a dict") # If a field dictionary was provided then we select specific # items from the input, otherwise we just validate the input. if fields is not None: items = {} for item_name, item_field in fields.items(): try: item_field.select_validator() item_value = value.get(item_name, Missing) items[item_name] = item_field.validate(item_value) except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items _, annotation = extract_optional_annotation(field.annotation) # If the args are [Any, Any], then the dict can contain # anything, otherwise each item needs to be validated. annotation_args = getattr(annotation, "__args__", []) if annotation_args and annotation_args != (Any, Any): key_validator_options = key_validator_options or {} key_field = Field(annotation=annotation_args[0], **key_validator_options) key_field.select_validator() value_validator_options = value_validator_options or {} value_field = Field(annotation=annotation_args[1], **value_validator_options) value_field.select_validator() items = {} for item_name, item_value in value.items(): try: item_name = key_field.validate(item_name) item_value = value_field.validate(item_value) items[item_name] = item_value except FieldValidationError as e: raise ValidationError({item_name: str(e)}) except ValidationError as e: raise ValidationError({item_name: e.reasons}) return items return value
class UnionValidator: """Validates union types. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return is_union_type(annotation) def validate(self, field: Field[_T], value: Any) -> Any: _, annotation = extract_optional_annotation(field.annotation) annotations = get_args(annotation) error_groups = [] for annotation in annotations: error_group_name = getattr(annotation, "__name__", None) or str(annotation) error_groups.append(error_group_name) try: value_field: Field[Any] = Field(annotation=annotation) value_field.select_validator() return value_field.validate(value) except (FieldValidationError, ValidationError): continue else: # TODO: Figure out a better way to represent these errors. raise FieldValidationError(f"expected a valid {' or '.join(repr(group) for group in error_groups)} value")
[docs]class SchemaValidator: """Validates dictionaries against schema classes. """ def can_validate_field(self, field: Field[_T]) -> bool: _, annotation = extract_optional_annotation(field.annotation) return is_schema(annotation) def validate(self, field: Field[_T], value: Dict[str, Any]) -> Any: from .schema import load_schema _, annotation = extract_optional_annotation(field.annotation) return load_schema(annotation, value)
DICT_TYPES = {dict, Dict} LIST_TYPES = {list, List} #: The set of built-in validators. Fields will attempt to use one of #: these unless otherwise specified. VALIDATORS: List[Validator[Any]] = [ ForwardRefValidator(), NumberValidator(), StringValidator(), ListValidator(), DictValidator(), UnionValidator(), SchemaValidator(), ] def _select_validator(field: Field[_T]) -> Optional[Validator[_T]]: """Find a suitable validator for the given Field. """ for validator in VALIDATORS: if validator.can_validate_field(field): return validator return None