Source code for

import re
import threading
import types
from contextlib import ContextDecorator

from bonobo.config.options import Option
from bonobo.errors import MissingServiceImplementationError

_service_name_re = re.compile(r"^[^\d\W]\w*(:?\.[^\d\W]\w*)*$", re.UNICODE)

def validate_service_name(name):
    if not _service_name_re.match(name):
        raise ValueError('Invalid service name {!r}.'.format(name))
    return name

[docs]class Service(Option): """ A Service is a special kind of option defining a dependency to something that will be resolved at runtime, using an identifier. For example, you can create a Configurable that has a "database" Service in its attribute, meaning that you'll define which database to use, by name, when creating the instance of this class, then provide an implementation when running the graph using a strategy. Example:: import bonobo class QueryExtractor(bonobo.Configurable): database = bonobo.Service(default='sqlalchemy.engine.default') graph = bonobo.Graph( QueryExtractor(database='sqlalchemy.engine.secondary'), *more_transformations, ) if __name__ == '__main__': engine = create_engine('... dsn ...'), services={ 'sqlalchemy.engine.secondary': engine }) The main goal is not to tie transformations to actual dependencies, so the same can be run in different contexts (stages like preprod, prod, or tenants like client1, client2, or anything you want). .. attribute:: name Service name will be used to retrieve the implementation at runtime. """ def __init__(self, name, __doc__=None): super().__init__(str, required=False, default=name, __doc__=__doc__) def __set__(self, inst, value): inst._options_values[] = validate_service_name(value)
[docs] def resolve(self, inst, services): try: name = getattr(inst, except AttributeError: name = return services.get(name)
[docs]class Container(dict): def __new__(cls, *args, **kwargs): if len(args) == 1: if len(kwargs): raise ValueError( 'You can either use {} with one positional argument or with keyword arguments, not both.'.format( cls.__name__ ) ) if not args[0]: return super().__new__(cls) if isinstance(args[0], cls): return cls return super().__new__(cls, *args, **kwargs)
[docs] def kwargs_for(self, mixed): try: options = dict(mixed.__options__) except AttributeError: options = {} return {name: option.resolve(mixed, self) for name, option in options.items() if isinstance(option, Service)}
[docs] def get(self, name, default=None): if not name in self: if default: return default raise MissingServiceImplementationError( 'Cannot resolve service {!r} using provided service collection.'.format(name) ) value = super().get(name) # XXX this is not documented and can lead to errors. if isinstance(value, types.LambdaType): value = value(self) return value
[docs]def create_container(services=None, factory=Container): """ Create a container with reasonable default service implementations for commonly use, standard-named, services. Services: - `fs` defaults to a fs2 instance based on current working directory - `http`defaults to requests :param services: :return: """ container = factory(services) if services else factory() if not 'fs' in container: import bonobo container.setdefault('fs', bonobo.open_fs()) if not 'http' in container: import requests container.setdefault('http', requests) return container
[docs]class Exclusive(ContextDecorator): """ Decorator and context manager used to require exclusive usage of an object, most probably a service. It's usefull for example if call order matters on a service implementation (think of an http api that requires a nonce or version parameter ...). Usage: >>> def handler(some_service): ... with Exclusive(some_service): ... some_service.call_1() ... some_service.call_2() ... some_service.call_3() This will ensure that nobody else is using the same service while in the "with" block, using a lock primitive to ensure that. """ _locks = {} _locks_creation_lock = threading.Lock() def __init__(self, wrapped): self._wrapped = wrapped
[docs] def get_lock(self): _id = id(self._wrapped) with Exclusive._locks_creation_lock: if not _id in Exclusive._locks: Exclusive._locks[_id] = threading.RLock() return Exclusive._locks[_id]
def __enter__(self): self.get_lock().acquire() return self._wrapped def __exit__(self, *exc): self.get_lock().release()
[docs]def use(*service_names): def decorate(mixed): try: options = mixed.__options__ except AttributeError: mixed.__options__ = options = {} for service_name in service_names: service = Service(service_name) = service_name options[service_name] = service return mixed return decorate