Transformations¶
Transformations are the smallest building blocks in Bonobo.
There is no special data-structure used to represent transformations, it’s basically just a regular python callable, or even an iterable object (if it requires no input data).
Once in a graph, transformations become nodes and the data-flow between them is described using edges.
Note
In this chapter, we’ll consider that anytime we need a “database”, it’s something we can get from the global namespace. This practice OK-ish for small jobs, but not at scale.
You’ll learn in Services and dependencies how to manage external dependencies the right way.
Transformation types¶
General case¶
The general case is a transformation that yields n outputs for each input.
You can implement it using a generator:
db = ...
def get_orders(user_id):
for order in db.get_orders(user_id):
yield user_id, order
Here, each row (containing a user id) will be transformed into a set of rows, each containing an user_id and an “order” object.
Extractor case¶
An extractor is a transformation that generates output without using any input. Usually, it does not generate this data out of nowhere, but instead connects to an external system (database, api, http, files …) to read the data from there.
It can be implemented in two different ways.
You can implement it using a generator, like in the general case:
db = ...
def extract_user_ids():
yield from db.select_all_user_ids()
You can also use an iterator directly:
import bonobo
db = ...
def get_graph():
graph = bonobo.Graph()
graph.add_chain(
db.select_all_user_ids(),
...
)
return graph
It is very convenient in many cases, when your existing system already have an interface that gives you iterators.
Note
It’s important to use a generative approach that yield data as it is provided and not generate everything at once before returning, so Bonobo can pass the data to the next nodes as soon as it starts streaming.
Loader case¶
A loader is a transformation that sends its input into an external system. To have a perfect symmetry with
extractors, we’d like not to have any output but as a convenience and because it has a negligible cost
in Bonobo, the convention is that all loaders return bonobo.constants.NOT_MODIFIED
, meaning that all rows that
streamed into this node’s input will also stream into its outputs, not modified. It allows to chain transformations even
after a loader happened, and avoid using shenanigans to achieve the same thing:
from bonobo.constants import NOT_MODIFIED
analytics_db = ...
def load_into_analytics_db(user_id, order):
analytics_db.insert_or_update_order(user_id, order['id'], order['amount'])
return NOT_MODIFIED
Execution Context¶
Transformations being regular functions, a bit of machinery is required to use them as nodes in a streaming flow.
When a bonobo.Graph
is executed, each node is wrapped in a
bonobo.execution.contexts.NodeExecutionContext
which is responsible for keeping the state of a node, within a
given execution.
Inputs and Outputs¶
When run in an execution context, transformations have inputs and outputs, which means that Bonobo will pass data that comes in the input queue as calls, and push returned / yielded values into the output queue.
For thread-based strategies, the underlying implementation if the input and output queues is the standard
queue.Queue
.
Inputs¶
Todo
proofread, check consistency and correctness
All input is retrieved via the call arguments. Each line of input means one call to the callable provided. Arguments will be, in order:
Injected dependencies (database, http, filesystem, …)
Position based arguments
Keyword based arguments
You’ll see below how to pass each of those.
Output¶
Todo
proofread, check consistency and correctness
Each callable can return/yield different things (all examples will use yield, but if there is only one output per input line, you can also return your output row and expect the exact same behaviour).
Todo
add rules for output parsing
The logic is defined in this piece of code, documentation will be added soon:
def _cast(self, _input, _output):
"""
Transforms a pair of input/output into the real slim shoutput.
:param _input: Bag
:param _output: mixed
:return: Bag
"""
if isenvelope(_output):
_output, _flags, _options = _output.unfold()
else:
_flags, _options = [], {}
if len(_flags):
# TODO: parse flags to check constraints are respected (like not modified alone, etc.)
if F_NOT_MODIFIED in _flags:
if self._output_type:
return ensure_tuple(_input, cls=self._output_type)
return _input
if F_INHERIT in _flags:
if self._output_type is None:
self._output_type = concat_types(
self._input_type, self._input_length, self._output_type, len(_output)
)
_output = _input + ensure_tuple(_output)
if not self._output_type:
if issubclass(type(_output), tuple):
self._output_type = type(_output)
return ensure_tuple(_output, cls=self._output_type)
Basically, after checking a few flags (NOT_MODIFIED, then INHERIT), it will “cast” the data into the “output type”, which is either tuple or a kind of namedtuple.
Todo
document cast/input_type/output_type logic.
Class-based Transformations¶
For use cases that are either less simple or that requires better reusability, you may want to use classes to define some of your transformations.
Todo
narrative doc
See:
Naming conventions¶
The naming convention used is the following.
If you’re naming something which is an actual transformation, that can be used directly as a graph node, then use underscores and lowercase names:
# instance of a class based transformation
filter = Filter(...)
# function based transformation
def uppercase(s: str) -> str:
return s.upper()
If you’re naming something which is configurable, that will need to be instantiated or called to obtain something that can be used as a graph node, then use camelcase names:
# configurable
class ChangeCase(Configurable):
modifier = Option(default='upper')
def __call__(self, s: str) -> str:
return getattr(s, self.modifier)()
# transformation factory
def Apply(method):
@functools.wraps(method)
def apply(s: str) -> str:
return method(s)
return apply
# result is a graph node candidate
upper = Apply(str.upper)
Testing¶
As Bonobo use plain old python objects as transformations, it’s very easy to unit test your transformations using your favourite testing framework. We’re using pytest internally for Bonobo, but it’s up to you to use the one you prefer.
If you want to test a transformation with the surrounding context provided (for example, service instances injected, and
context processors applied), you can use bonobo.execution.NodeExecutionContext
as a context processor and have
bonobo send the data to your transformation.
from bonobo.execution import NodeExecutionContext
with NodeExecutionContext(
JsonWriter(filename), services={'fs': ...}
) as context:
# Write a list of rows, including BEGIN/END control messages.
context.write_sync(
{'foo': 'bar'},
{'foo': 'baz'},
)