Source code for bonobo.config.processors

from collections import Iterable
from contextlib import contextmanager
from functools import partial
from inspect import signature

from bonobo.config import Option
from bonobo.errors import UnrecoverableTypeError
from bonobo.util import deprecated_alias, ensure_tuple

_raw = object()
_args = object()
_none = object()

INPUT_FORMATS = {_raw, _args, _none}


[docs]class ContextProcessor(Option): """ A ContextProcessor is a kind of transformation decorator that can setup and teardown a transformation and runtime related dependencies, at the execution level. It works like a yielding context manager, and is the recommended way to setup and teardown objects you'll need in the context of one execution. It's the way to overcome the stateless nature of transformations. The yielded values will be passed as positional arguments to the next context processors (order does matter), and finally to the __call__ method of the transformation. Warning: this may change for a similar but simpler implementation, don't rely too much on it (yet). Example: >>> from bonobo.config import Configurable >>> from bonobo.util import ValueHolder >>> class Counter(Configurable): ... @ContextProcessor ... def counter(self, context): ... yield ValueHolder(0) ... ... def __call__(self, counter, *args, **kwargs): ... counter += 1 ... yield counter.get() """ @property def __name__(self): return self.func.__name__ def __init__(self, func): self.func = func super(ContextProcessor, self).__init__(required=False, default=self.__name__) self.name = self.__name__ def __repr__(self): return repr(self.func).replace('<function', '<{}'.format(type(self).__name__)) def __call__(self, *args, **kwargs): return self.func(*args, **kwargs)
class bound(partial): @property def kwargs(self): return self.keywords class ContextCurrifier: """ This is a helper to resolve processors. """ def __init__(self, wrapped, *args, **kwargs): self.wrapped = wrapped self.args = args self.kwargs = kwargs self.format = getattr(wrapped, '__input_format__', _args) self._stack, self._stack_values = None, None def __iter__(self): yield from self.wrapped def _bind(self, _input): try: bind = signature(self.wrapped).bind except ValueError: bind = partial(bound, self.wrapped) if self.format is _args: return bind(*self.args, *_input, **self.kwargs) if self.format is _raw: return bind(*self.args, _input, **self.kwargs) if self.format is _none: return bind(*self.args, **self.kwargs) raise NotImplementedError('Invalid format {!r}.'.format(self.format)) def __call__(self, _input): if not callable(self.wrapped): if isinstance(self.wrapped, Iterable): return self.__iter__() raise UnrecoverableTypeError('Uncallable node {}'.format(self.wrapped)) try: bound = self._bind(_input) except TypeError as exc: raise UnrecoverableTypeError( ( 'Input of {wrapped!r} does not bind to the node signature.\n' 'Args: {args}\n' 'Input: {input}\n' 'Kwargs: {kwargs}\n' 'Signature: {sig}' ).format( wrapped=self.wrapped, args=self.args, input=_input, kwargs=self.kwargs, sig=signature(self.wrapped) ) ) from exc return self.wrapped(*bound.args, **bound.kwargs) def setup(self, *context): if self._stack is not None: raise RuntimeError('Cannot setup context currification twice.') self._stack, self._stack_values = list(), list() for processor in resolve_processors(self.wrapped): _processed = processor(self.wrapped, *context, *self.args, **self.kwargs) _append_to_context = next(_processed) self._stack_values.append(_append_to_context) if _append_to_context is not None: self.args += ensure_tuple(_append_to_context) self._stack.append(_processed) def teardown(self): while self._stack: processor = self._stack.pop() try: # todo yield from ? how to ? processor.send(self._stack_values.pop()) except StopIteration: # This is normal, and wanted. pass else: # No error ? We should have had StopIteration ... raise RuntimeError('Context processors should not yield more than once.') self._stack, self._stack_values = None, None @contextmanager def as_contextmanager(self, *context): """ Convenience method to use it as a contextmanager, mostly for test purposes. Example: >>> with ContextCurrifier(node).as_contextmanager(context) as stack: ... stack() :param context: :return: """ self.setup(*context) yield self self.teardown() def resolve_processors(mixed): try: yield from mixed.__processors__ except AttributeError: yield from () get_context_processors = deprecated_alias('get_context_processors', resolve_processors)
[docs]def use_context(f): def context(self, context, *args, **kwargs): yield context return use_context_processor(context)(f)
[docs]def use_context_processor(context_processor): def using_context_processor(cls_or_func): nonlocal context_processor try: cls_or_func.__processors__ except AttributeError: cls_or_func.__processors__ = [] cls_or_func.__processors__.append(ContextProcessor(context_processor)) return cls_or_func return using_context_processor
def _use_input_format(input_format): if input_format not in INPUT_FORMATS: raise ValueError( 'Invalid input format {!r}. Choices: {}'.format(input_format, ', '.join(sorted(INPUT_FORMATS))) ) def _set_input_format(f): setattr(f, '__input_format__', input_format) return f return _set_input_format use_no_input = _use_input_format(_none) use_raw_input = _use_input_format(_raw)