Source code for bonobo.contrib.django.commands
from logging import getLogger
from types import GeneratorType
from colorama import Back, Fore, Style
from django.core.management import BaseCommand
from django.core.management.base import OutputWrapper
from mondrian import term
import bonobo
from bonobo.plugins.console import ConsoleOutputPlugin
from bonobo.util.term import CLEAR_EOL
from .utils import create_or_update
[docs]class ETLCommand(BaseCommand):
@property
def logger(self):
try:
return self._logger
except AttributeError:
self._logger = getLogger(type(self).__module__)
return self._logger
create_or_update = staticmethod(create_or_update)
[docs] def create_parser(self, prog_name, subcommand):
return bonobo.get_argument_parser(super().create_parser(prog_name, subcommand))
[docs] def add_arguments(self, parser):
"""
Entry point for subclassed commands to add custom arguments.
"""
pass
[docs] def get_graph(self, *args, **options):
def not_implemented():
raise NotImplementedError('You must implement {}.get_graph() method.'.format(self))
return bonobo.Graph(not_implemented)
[docs] def get_services(self):
return {}
[docs] def get_strategy(self):
return None
[docs] def info(self, *args, **kwargs):
self.logger.info(*args, **kwargs)
[docs] def run(self, *args, **options):
results = []
with bonobo.parse_args(options) as options:
services = self.get_services()
strategy = self.get_strategy()
graph_coll = self.get_graph(*args, **options)
if not isinstance(graph_coll, GeneratorType):
graph_coll = (graph_coll,)
for i, graph in enumerate(graph_coll):
if not isinstance(graph, bonobo.Graph):
raise ValueError('Expected a Graph instance, got {!r}.'.format(graph))
print(term.lightwhite('{}. {}'.format(i + 1, graph.name)))
result = bonobo.run(graph, services=services, strategy=strategy)
results.append(result)
print(term.lightblack(' ... return value: ' + str(result)))
print()
return results
[docs] def handle(self, *args, **options):
_stdout_backup, _stderr_backup = self.stdout, self.stderr
self.stdout = OutputWrapper(ConsoleOutputPlugin._stdout, ending=CLEAR_EOL + '\n')
self.stderr = OutputWrapper(ConsoleOutputPlugin._stderr, ending=CLEAR_EOL + '\n')
self.stderr.style_func = lambda x: Fore.LIGHTRED_EX + Back.RED + '!' + Style.RESET_ALL + ' ' + x
self.run(*args, **options)
self.stdout, self.stderr = _stdout_backup, _stderr_backup