import logging
import sys
from collections import namedtuple
from queue import Empty
from time import sleep
from types import GeneratorType
from bonobo.config import create_container
from bonobo.config.processors import ContextCurrifier
from bonobo.constants import BEGIN, END, INHERIT, NOT_MODIFIED, TICK_PERIOD, Flag, Token
from bonobo.errors import InactiveReadableError, UnrecoverableError, UnrecoverableTypeError
from bonobo.execution.contexts.base import BaseContext
from bonobo.structs.inputs import Input
from bonobo.util import ensure_tuple, get_name, isconfigurabletype
from bonobo.util.bags import BagType
from bonobo.util.statistics import WithStatistics
logger = logging.getLogger(__name__)
UnboundArguments = namedtuple('UnboundArguments', ['args', 'kwargs'])
[docs]class NodeExecutionContext(BaseContext, WithStatistics):
def __init__(self, wrapped, *, parent=None, services=None, _input=None, _outputs=None):
"""
Node execution context has the responsibility fo storing the state of a transformation during its execution.
:param wrapped: wrapped transformation
:param parent: parent context, most probably a graph context
:param services: dict-like collection of services
:param _input: input queue (optional)
:param _outputs: output queues (optional)
"""
BaseContext.__init__(self, wrapped, parent=parent)
WithStatistics.__init__(self, 'in', 'out', 'err', 'warn')
# Services: how we'll access external dependencies
if services:
if self.parent:
raise RuntimeError(
'Having services defined both in GraphExecutionContext and child NodeExecutionContext is not supported, for now.'
)
self.services = create_container(services)
else:
self.services = None
# Input / Output: how the wrapped node will communicate
self.input = _input or Input()
self.outputs = _outputs or []
# Types
self._input_type, self._input_length = None, None
self._output_type = None
# Stack: context decorators for the execution
self._stack = None
def __str__(self):
return self.__name__ + self.get_statistics_as_string(prefix=' ')
def __repr__(self):
name, type_name = get_name(self), get_name(type(self))
return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' '))
[docs] def start(self):
"""
Starts this context, a.k.a the phase where you setup everything which will be necessary during the whole
lifetime of a transformation.
The "ContextCurrifier" is in charge of setting up a decorating stack, that includes both services and context
processors, and will call the actual node callable with additional parameters.
"""
super().start()
try:
initial = self._get_initial_context()
self._stack = ContextCurrifier(self.wrapped, *initial.args, **initial.kwargs)
if isconfigurabletype(self.wrapped):
# Not normal to have a partially configured object here, so let's warn the user instead of having get into
# the hard trouble of understanding that by himself.
raise TypeError(
'Configurables should be instanciated before execution starts.\nGot {!r}.\n'.format(self.wrapped)
)
self._stack.setup(self)
except Exception:
# Set the logging level to the lowest possible, to avoid double log.
self.fatal(sys.exc_info(), level=0)
# We raise again, so the error is not ignored out of execution loops.
raise
[docs] def loop(self):
"""
The actual infinite loop for this transformation.
"""
logger.debug('Node loop starts for {!r}.'.format(self))
while self.should_loop:
try:
self.step()
except InactiveReadableError:
break
except Empty:
sleep(TICK_PERIOD) # XXX: How do we determine this constant?
continue
except (NotImplementedError, UnrecoverableError):
self.fatal(sys.exc_info()) # exit loop
except Exception: # pylint: disable=broad-except
self.error(sys.exc_info()) # does not exit loop
except BaseException:
self.fatal(sys.exc_info()) # exit loop
logger.debug('Node loop ends for {!r}.'.format(self))
[docs] def step(self):
"""
A single step in the loop.
Basically gets an input bag, send it to the node, interpret the results.
"""
# Pull and check data
input_bag = self._get()
# Sent through the stack
results = self._stack(input_bag)
# self._exec_time += timer.duration
# Put data onto output channels
if isinstance(results, GeneratorType):
while True:
try:
# if kill flag was step, stop iterating.
if self._killed:
break
result = next(results)
except StopIteration:
# That's not an error, we're just done.
break
else:
# Push data (in case of an iterator)
self._send(self._cast(input_bag, result))
elif results:
# Push data (returned value)
self._send(self._cast(input_bag, results))
else:
# case with no result, an execution went through anyway, use for stats.
# self._exec_count += 1
pass
[docs] def stop(self):
"""
Cleanup the context, after the loop ended.
"""
if self._stack:
try:
self._stack.teardown()
except Exception:
self.fatal(sys.exc_info())
super().stop()
[docs] def send(self, *_output, _input=None):
return self._send(self._cast(_input, _output))
### Input type and fields
@property
def input_type(self):
return self._input_type
[docs] def set_input_type(self, input_type):
if self._input_type is not None:
raise RuntimeError('Cannot override input type, already have %r.', self._input_type)
if not isinstance(input_type, type):
raise UnrecoverableTypeError('Input types must be regular python types.')
if not issubclass(input_type, tuple):
raise UnrecoverableTypeError('Input types must be subclasses of tuple (and act as tuples).')
self._input_type = input_type
[docs] def get_input_fields(self):
return self._input_type._fields if self._input_type and hasattr(self._input_type, '_fields') else None
[docs] def set_input_fields(self, fields, typename='Bag'):
self.set_input_type(BagType(typename, fields))
### Output type and fields
@property
def output_type(self):
return self._output_type
[docs] def set_output_type(self, output_type):
if self._output_type is not None:
raise RuntimeError('Cannot override output type, already have %r.', self._output_type)
if type(output_type) is not type:
raise UnrecoverableTypeError('Output types must be regular python types.')
if not issubclass(output_type, tuple):
raise UnrecoverableTypeError('Output types must be subclasses of tuple (and act as tuples).')
self._output_type = output_type
[docs] def get_output_fields(self):
return self._output_type._fields if self._output_type and hasattr(self._output_type, '_fields') else None
[docs] def set_output_fields(self, fields, typename='Bag'):
self.set_output_type(BagType(typename, fields))
### Attributes
[docs] def setdefault(self, attr, value):
try:
getattr(self, attr)
except AttributeError:
setattr(self, attr, value)
[docs] def write(self, *messages):
"""
Push a message list to this context's input queue.
:param mixed value: message
"""
for message in messages:
if isinstance(message, Token):
self.input.put(message)
elif self._input_type:
self.input.put(ensure_tuple(message, cls=self._input_type))
else:
self.input.put(ensure_tuple(message))
[docs] def write_sync(self, *messages):
self.write(BEGIN, *messages, END)
for _ in messages:
self.step()
[docs] def error(self, exc_info, *, level=logging.ERROR):
self.increment('err')
super().error(exc_info, level=level)
[docs] def fatal(self, exc_info, *, level=logging.CRITICAL):
self.increment('err')
super().fatal(exc_info, level=level)
self.input.shutdown()
[docs] def get_service(self, name):
if self.parent:
return self.parent.services.get(name)
return self.services.get(name)
def _get(self):
"""
Read from the input queue.
If Queue raises (like Timeout or Empty), stat won't be changed.
"""
input_bag = self.input.get()
# Store or check input type
if self._input_type is None:
self._input_type = type(input_bag)
elif type(input_bag) is not self._input_type:
raise UnrecoverableTypeError(
'Input type changed between calls to {!r}.\nGot {!r} which is not of type {!r}.'.format(
self.wrapped, input_bag, self._input_type
)
)
# Store or check input length, which is a soft fallback in case we're just using tuples
if self._input_length is None:
self._input_length = len(input_bag)
elif len(input_bag) != self._input_length:
raise UnrecoverableTypeError(
'Input length changed between calls to {!r}.\nExpected {} but got {}: {!r}.'.format(
self.wrapped, self._input_length, len(input_bag), input_bag
)
)
self.increment('in') # XXX should that go before type check ?
return input_bag
def _cast(self, _input, _output):
"""
Transforms a pair of input/output into the real slim output.
:param _input: Bag
:param _output: mixed
:return: Bag
"""
tokens, _output = split_token(_output)
if NOT_MODIFIED in tokens:
return ensure_tuple(_input, cls=(self.output_type or tuple))
if INHERIT in tokens:
if self._output_type is None:
self._output_type = concat_types(self._input_type, self._input_length, self._output_type, len(_output))
_output = _input + ensure_tuple(_output)
return ensure_tuple(_output, cls=(self._output_type or tuple))
def _send(self, value, _control=False):
"""
Sends a message to all of this context's outputs.
:param mixed value: message
:param _control: if true, won't count in statistics.
"""
if not _control:
self.increment('out')
for output in self.outputs:
output.put(value)
def _get_initial_context(self):
if self.parent:
return UnboundArguments((), self.parent.services.kwargs_for(self.wrapped))
if self.services:
return UnboundArguments((), self.services.kwargs_for(self.wrapped))
return UnboundArguments((), {})
def isflag(param):
return isinstance(param, Flag)
def split_token(output):
"""
Split an output into token tuple, real output tuple.
:param output:
:return: tuple, tuple
"""
output = ensure_tuple(output)
flags, i, len_output, data_allowed = set(), 0, len(output), True
while i < len_output and isflag(output[i]):
if output[i].must_be_first and i:
raise ValueError('{} flag must be first.'.format(output[i]))
if i and output[i - 1].must_be_last:
raise ValueError('{} flag must be last.'.format(output[i - 1]))
if output[i] in flags:
raise ValueError('Duplicate flag {}.'.format(output[i]))
flags.add(output[i])
data_allowed &= output[i].allows_data
i += 1
output = output[i:]
if not data_allowed and len(output):
raise ValueError('Output data provided after a flag that does not allow data.')
return flags, output
def concat_types(t1, l1, t2, l2):
t1, t2 = t1 or tuple, t2 or tuple
if t1 == t2 == tuple:
return tuple
f1 = t1._fields if hasattr(t1, '_fields') else tuple(range(l1))
f2 = t2._fields if hasattr(t2, '_fields') else tuple(range(l2))
return BagType('Inherited', f1 + f2)