Source code for molten.contrib.dramatiq

# This file is a part of molten.
#
# Copyright (C) 2018 CLEARTYPE SRL <[email protected]>
#
# molten is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# molten is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import functools
import inspect
from inspect import Parameter
from typing import Any, Callable, Dict, Optional, Sequence, no_type_check

from molten import BaseApp

try:
    import dramatiq
except ImportError:  # pragma: no cover
    raise ImportError("'dramatiq' package missing. Run 'pip install dramatiq'.")


#: The global dependency injector instace.  Call setup_dramatiq to set
#: this up.
_INJECTOR = None


[docs]def setup_dramatiq(app: BaseApp) -> None: """Sets up the global state required to be able to inject components into Dramatiq actors. Examples: >>> from molten.contrib.dramatiq import setup_dramatiq >>> # All components that were registered with your app will be >>> # available to your actors once you call this function. >>> setup_dramatiq(app) """ global _INJECTOR _INJECTOR = app.injector
[docs]@no_type_check def actor(fn=None, **kwargs): """Use this in place of dramatiq.actor in order to create actors that can request components via dependency injection. This is just a wrapper around dramatiq.actor and it takes the same set of parameters. Examples: >>> from molten.contrib.dramatiq import actor >>> @actor(queue_name="example") ... def add(x, y, database: Database) -> None: ... database.put(x + y) ... >>> add.send(1, 2) """ def decorator(fn): return dramatiq.actor(_inject(fn), **kwargs) if fn is None: return decorator return decorator(fn)
@no_type_check def _inject(fn: Optional[Callable[..., Any]] = None) -> Callable[..., Any]: def decorator(fn): parameters = {name: i for i, name in enumerate(inspect.signature(fn).parameters)} @functools.wraps(fn) def wrapper(*args, **kwargs): try: resolver = _INJECTOR.get_resolver() resolver.add_component(_ArgumentResolver(parameters, args, kwargs)) except AttributeError: # pragma: no cover raise RuntimeError( "Dramatiq support is not set up correctly. " "Don't forget to call setup_dramatiq()." ) resolved_fn = resolver.resolve(fn) return resolved_fn() return wrapper if fn is None: # pragma: no cover return decorator return decorator(fn) class _ArgumentResolver: is_cacheable = False is_singleton = False def __init__(self, parameters: Dict[str, int], args: Sequence[Any], kwargs: Dict[str, Any]) -> None: self.state = state = kwargs for name, idx in parameters.items(): if name not in state: try: state[name] = args[idx] except IndexError: continue def can_handle_parameter(self, parameter: Parameter) -> bool: return parameter.name in self.state or \ parameter.default is not Parameter.empty def resolve(self, parameter: Parameter) -> Any: try: return self.state[parameter.name] except KeyError: return parameter.default