Warning: This document is for the development version of molten.

Source code for molten.contrib.sessions

# This file is a part of molten.
# Copyright (C) 2018 CLEARTYPE SRL <bogdan@cleartype.io>
# molten is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
# molten is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
import hmac
import json
from inspect import Parameter
from time import time
from typing import Any, Callable, Dict, Optional, Union, no_type_check
from uuid import uuid4

from typing_extensions import Protocol

from molten.dependency_injection import DependencyResolver
from molten.http import Cookie, Cookies

#: The name of the key the CookieStore inserts into session objects to
#: represent the expiration time of the session.

#: The default if the expiration key is not set on the incoming
#: session object.

[docs]class Session(Dict[str, Any]): """Session objects are ordinary dictionaries that are guaranteed to be constructed with an "id" key. """ def __init__(self, id: str, **data: Dict[str, Any]) -> None: super().__init__(id=id, **data) @classmethod def empty(cls) -> "Session": """Create an empty session with a random id. """ return cls(str(uuid4()))
[docs]class SessionStore(Protocol): """Protocol for session stores. """
[docs] @no_type_check def load(self) -> Session: """Load a session from the request. This method may request components via DI. """
[docs] @no_type_check def dump(self) -> Cookie: """Convert the session to a Cookie and possibly store it somewhere (eg. memcached, Redis). This method may request components via DI. """
[docs]class CookieStore: """A stateless session store based on cookies. Sessions are converted to JSON and then base64-encoded. The values are signed with a signing key and validated when the sessions are subsequently loaded. An expiration time is inserted into the session before it's dumped so as to provide minimal protection against session replay attacks. Warning: Don't store sensitive information in sessions using this store. They are tamper-proof, but users can decode them. """ __slots__ = [ "signing_key", "signing_method", "cookie_ttl", "cookie_name", "cookie_domain", "cookie_path", "cookie_secure", ] def __init__( self, signing_key: Union[bytes, str], *, signing_method: str = "sha256", cookie_ttl: int = 86400 * 7, cookie_name: str = "__sess__", cookie_domain: Optional[str] = None, cookie_path: Optional[str] = None, cookie_secure: bool = False, ) -> None: self.signing_key = signing_key if isinstance(signing_key, bytes) else signing_key.encode() self.signing_method = signing_method self.cookie_ttl = cookie_ttl self.cookie_name = cookie_name self.cookie_domain = cookie_domain self.cookie_path = cookie_path def load(self, cookies: Cookies) -> Session: cookie = cookies.get(self.cookie_name) if cookie is None: return Session.empty() data, _, signature = cookie.partition(",") if not hmac.compare_digest(signature, self.sign(data.encode())): return Session.empty() # Note: at this point, the data is guaranteed to be valid # given that the data is correctly signed. session_data = json.loads(base64.urlsafe_b64decode(data)) session = Session(**session_data) if session.get(COOKIE_EXPIRATION_KEY, DEFAULT_EXPIRATION_TIME) <= time(): return Session.empty() return session def dump(self, session: Session) -> Cookie: session[COOKIE_EXPIRATION_KEY] = expires = time() + self.cookie_ttl session_data = base64.urlsafe_b64encode(json.dumps(session).encode()) signature = self.sign(session_data) return Cookie( self.cookie_name, f"{session_data.decode()},{signature}", domain=self.cookie_domain, path=self.cookie_path, expires=expires, http_only=True, same_site="strict", ) def sign(self, value: bytes) -> str: return hmac.new(self.signing_key, value, self.signing_method).hexdigest()
[docs]class SessionComponent: """A component that loads Session objects from the request. Parameters: store: A session store. """ __slots__ = ["store"] is_cacheable = True is_singleton = False def __init__(self, store: SessionStore) -> None: self.store = store def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.annotation is Session def resolve(self, resolver: DependencyResolver) -> Session: return resolver.resolve(self.store.load)()
[docs]class SessionMiddleware: """A middleware that dumps Session data into the response. Parameters: store: A session store. """ __slots__ = ["store"] def __init__(self, store: SessionStore) -> None: self.store = store def __call__(self, handler: Callable[..., Any]) -> Callable[..., Any]: def middleware(resolver: DependencyResolver) -> Callable[..., Any]: response = handler() response.set_cookie(resolver.resolve(self.store.dump)()) return response return middleware