# This file is a part of molten.
#
# Copyright (C) 2018 CLEARTYPE SRL <[email protected]>
#
# molten is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# molten is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import os
import re
from tempfile import SpooledTemporaryFile
from typing import Any, Dict, Iterator, List, Tuple, Union, no_type_check
from urllib.parse import parse_qsl
from typing_extensions import Protocol
from .common import MultiDict
from .errors import FieldTooLarge, FileTooLarge, ParseError, TooManyFields
from .http import Headers, UploadedFile
from .typing import Header, RequestBody, RequestInput
[docs]class RequestParser(Protocol): # pragma: no cover
"""Protocol for request parsers.
"""
@property
def mime_type(self) -> str:
"""Returns a string representing the mime type of the rendered
content. This is used to generate OpenAPI documents.
"""
[docs] def can_parse_content(self, content_type: str) -> bool:
"""Returns True if this parser can parse the given content type.
"""
[docs] @no_type_check
def parse(self) -> Any:
"""Attempt to parse the input data.
Raises:
ParseError: if the data cannot be parsed.
"""
[docs]class JSONParser:
"""A JSON request parser.
"""
mime_type = "application/json"
def can_parse_content(self, content_type: str) -> bool:
return content_type.startswith("application/json")
def parse(self, data: RequestBody) -> Any:
try:
return json.loads(data)
except json.JSONDecodeError:
raise ParseError("JSON input could not be parsed")
[docs]class URLEncodingParser:
"""A parser for urlencoded requests.
"""
mime_type = "application/x-www-form-urlencoded"
def can_parse_content(self, content_type: str) -> bool:
return content_type.startswith("application/x-www-form-urlencoded")
def parse(self, data: RequestBody) -> MultiDict[str, str]:
try:
return MultiDict(parse_qsl(data.decode("utf-8"), strict_parsing=True))
except ValueError:
raise ParseError("failed to parse urlencoded data")
[docs]class MultiPartParser:
"""A parser for multipart requests. Returns a MultiDict mapping
field names to lists of field string values or UploadedFiles.
This is a reasonably simple streaming parser implementation for
the multipart/form-data media type. As such, it does not support
deprecated parts of RFC7578 like multipart/mixed content and
content-transfer-encoding headers.
Parameters:
bufsize: The max size of the streaming data buffer. This should
be a 32 bit integer that's a multiple of 4. In some cases,
the streaming data buffer may contain double this amount so
take that into account when choosing a value. Additionally,
the value should be greater than the longest individual header
value you want to accept.
encoding: The codec to use when decoding form field values.
encoding_errors: What to do when an decoding error is encountered.
max_field_size: The max number of bytes a field can contain.
max_file_size: The max number of bytes a file can contain.
max_num_fields: The max number of fields accepted per request.
max_spooled_size: The max number of bytes a file in the request
can have before it's written to a temporary file on disk.
"""
__slots__ = [
"bufsize",
"encoding",
"encoding_errors",
"max_field_size",
"max_file_size",
"max_num_fields",
"max_spooled_size",
]
BOUNDARY_RE = re.compile("boundary=(.+)")
PARAMS_RE = re.compile('([A-Za-z]+)="([^"]+)"')
mime_type = "multipart/form-data"
def __init__(
self, *,
bufsize: int = 64 * 1024,
encoding: str = "utf-8",
encoding_errors: str = "replace",
max_field_size: int = 500 * 1024,
max_file_size: int = 10 * 1024 * 1024,
max_num_fields: int = 100,
max_spooled_size: int = 1024 * 1024,
) -> None:
self.bufsize = bufsize
self.encoding = encoding
self.encoding_errors = encoding_errors
self.max_field_size = max_field_size
self.max_file_size = max_file_size
self.max_num_fields = max_num_fields
self.max_spooled_size = max_spooled_size
def can_parse_content(self, content_type: str) -> bool:
return content_type.startswith("multipart/form-data")
def parse(self, content_type: Header, content_length: Header, body_file: RequestInput) -> MultiDict[str, Union[str, UploadedFile]]: # noqa
matches = self.BOUNDARY_RE.search(content_type)
if not matches:
raise ParseError("boundary missing from content-type header")
boundary = matches.group(1)
lines = self._iter_lines(body_file, boundary, int(content_length))
parts = self._iter_parts(lines, boundary)
return MultiDict(parts)
def _iter_lines(self, stream: RequestInput, boundary: str, limit: int) -> Iterator[bytes]:
buff = b""
remaining = limit
while remaining > 0:
data = stream.read(self.bufsize)
remaining -= len(data)
if not data:
return
buff += data
if remaining > 0 and len(buff) < self.bufsize:
continue
while buff:
try:
i = buff.index(b"\r\n")
except ValueError:
break
line, buff = buff[:i + 2], buff[i + 2:]
yield line
if len(buff) >= self.bufsize and not buff.endswith(b"\r"):
yield buff
buff = b""
def _iter_parts(self, lines: Iterator[bytes], boundary: str) -> Iterator[Tuple[str, Union[str, UploadedFile]]]:
next_part = f"--{boundary}\r\n".encode()
last_part = f"--{boundary}--\r\n".encode()
def prepare_current_part() -> Tuple[str, Union[str, UploadedFile]]:
nonlocal total_field_count
headers = Headers(current_part_headers)
name = current_part_disposition["name"]
value: Union[str, UploadedFile]
if "filename" in current_part_disposition:
# Strip CRLF from the end of the file and then rewind.
current_part_container.seek(-2, os.SEEK_END)
current_part_container.truncate()
headers.add("content-length", str(current_part_container.tell()))
current_part_container.seek(0)
filename = current_part_disposition["filename"]
value = UploadedFile(filename, headers, current_part_container)
else:
# Strip CRLF from the end of the buffer.
data = current_part_container[:-2]
value = data.decode(self.encoding, errors=self.encoding_errors)
total_field_count += 1
return name, value
def append_bytes(data: bytes) -> None:
nonlocal current_part_container
current_part_container += data
total_field_count = 1
current_part_bytes: int = 0
current_part_is_file: bool = False
current_part_container: Any = None
current_part_writer: Any = None
current_part_headers: Dict[str, Union[str, List[str]]] = {}
current_part_disposition: Dict[str, str] = {}
current_part_past_headers: bool = False
for line in lines:
if total_field_count > self.max_num_fields:
raise TooManyFields("the input contains too many fields")
if line == last_part:
if current_part_container is not None:
yield prepare_current_part()
break
elif line == next_part:
if current_part_container is not None:
yield prepare_current_part()
current_part_bytes = 0
current_part_is_file = False
current_part_container = None
current_part_writer = None
current_part_headers = {}
current_part_disposition = {}
current_part_past_headers = False
elif not current_part_past_headers:
line = line.rstrip()
if not line:
if current_part_container is None:
raise ParseError("content-disposition header is missing")
current_part_past_headers = True
continue
header_name, _, header_value = line.decode().partition(": ")
current_part_headers[header_name] = header_value
if header_name.lower() == "content-disposition":
current_part_disposition = dict(self.PARAMS_RE.findall(header_value))
if "name" not in current_part_disposition:
raise ParseError("content-disposition header without a name")
if "filename" in current_part_disposition:
current_part_is_file = True
current_part_container = SpooledTemporaryFile(mode="wb+", max_size=self.max_spooled_size)
current_part_writer = current_part_container.write
else:
current_part_is_file = False
current_part_container = b""
current_part_writer = append_bytes
else:
current_part_bytes += len(line)
if current_part_is_file and current_part_bytes >= self.max_file_size:
message = f"file '{current_part_disposition['name']}' exceeds the file size limit"
raise FileTooLarge(message)
elif not current_part_is_file and current_part_bytes >= self.max_field_size:
message = f"field '{current_part_disposition['name']}' exceeds the field size limit"
raise FieldTooLarge(message)
current_part_writer(line)
else:
raise ParseError("unexpected end of input")