Source code for bonobo.execution.contexts.graph
from functools import partial
from time import sleep
from whistle import EventDispatcher
from bonobo.config import create_container
from bonobo.constants import BEGIN, END
from bonobo.execution import events
from bonobo.execution.contexts.node import NodeExecutionContext
from bonobo.execution.contexts.plugin import PluginExecutionContext
[docs]class GraphExecutionContext:
NodeExecutionContextType = NodeExecutionContext
PluginExecutionContextType = PluginExecutionContext
TICK_PERIOD = 0.25
@property
def started(self):
return any(node.started for node in self.nodes)
@property
def stopped(self):
return all(node.started and node.stopped for node in self.nodes)
@property
def alive(self):
return any(node.alive for node in self.nodes)
def __init__(self, graph, plugins=None, services=None, dispatcher=None):
self.dispatcher = dispatcher or EventDispatcher()
self.graph = graph
self.nodes = [self.create_node_execution_context_for(node) for node in self.graph]
self.plugins = [self.create_plugin_execution_context_for(plugin) for plugin in plugins or ()]
self.services = create_container(services)
# Probably not a good idea to use it unless you really know what you're doing. But you can access the context.
self.services['__graph_context'] = self
for i, node_context in enumerate(self):
outputs = self.graph.outputs_of(i)
if len(outputs):
node_context.outputs = [self[j].input for j in outputs]
node_context.input.on_begin = partial(node_context._send, BEGIN, _control=True)
node_context.input.on_end = partial(node_context._send, END, _control=True)
node_context.input.on_finalize = partial(node_context.stop)
def __getitem__(self, item):
return self.nodes[item]
def __len__(self):
return len(self.nodes)
def __iter__(self):
yield from self.nodes
[docs] def create_node_execution_context_for(self, node):
return self.NodeExecutionContextType(node, parent=self)
[docs] def create_plugin_execution_context_for(self, plugin):
if isinstance(plugin, type):
plugin = plugin()
return self.PluginExecutionContextType(plugin, parent=self)
[docs] def write(self, *messages):
"""Push a list of messages in the inputs of this graph's inputs, matching the output of special node "BEGIN" in
our graph."""
for i in self.graph.outputs_of(BEGIN):
for message in messages:
self[i].write(message)
[docs] def dispatch(self, name):
self.dispatcher.dispatch(name, events.ExecutionEvent(self))
[docs] def start(self, starter=None):
self.register_plugins()
self.dispatch(events.START)
self.tick(pause=False)
for node in self.nodes:
if starter is None:
node.start()
else:
starter(node)
self.dispatch(events.STARTED)
[docs] def tick(self, pause=True):
self.dispatch(events.TICK)
if pause:
sleep(self.TICK_PERIOD)
[docs] def kill(self):
self.dispatch(events.KILL)
for node_context in self.nodes:
node_context.kill()
self.tick()
[docs] def stop(self, stopper=None):
self.dispatch(events.STOP)
for node_context in self.nodes:
if stopper is None:
node_context.stop()
else:
stopper(node_context)
self.tick(pause=False)
self.dispatch(events.STOPPED)
self.unregister_plugins()
[docs] def register_plugins(self):
for plugin_context in self.plugins:
plugin_context.register()
[docs] def unregister_plugins(self):
for plugin_context in self.plugins:
plugin_context.unregister()