Working with SQLAlchemy

Note

This extension is currently BETA.

Things will change, and although we use it on some real-world software, it may, or may not, satisfy your needs.

Read the introduction: https://www.bonobo-project.org/with/sqlalchemy

Installation

To install the extension, use the sqlalchemy extra:

$ pip install bonobo[sqlalchemy]

Note

You can install more than one extra at a time separating the names with commas.

Overview and examples

First, you’ll need a database connection (sqlalchemy.engine.Engine instance), that must be provided as a service.

import sqlalchemy

def get_services():
    return {
        'sqlalchemy.engine': sqlalchemy.create_engine(...)
    }

The sqlalchemy.engine name is the default name used by the provided transformations, but you can override it (for example if you need more than one connection) and specify the service name using engine=’myengine’ while building your transformations.

Lets create some tables and add some data. (You may need to edit the SQL if your database server uses a different version of SQL.)

CREATE TABLE test_in (
  id INTEGER PRIMARY KEY NOT NULL,
  text TEXT
);

CREATE TABLE test_out (
  id INTEGER PRIMARY KEY NOT NULL,
  text TEXT
);

INSERT INTO test_in (id, text) VALUES (1, 'Cat');
INSERT INTO test_in (id, text) VALUES (2, 'Dog');

There are two transformation classes provided by this extension.

One reader, one writer.

Let’s select some data:

import bonobo
import bonobo_sqlalchemy

def get_graph():
    graph = bonobo.Graph()
    graph.add_chain(
        bonobo_sqlalchemy.Select('SELECT * FROM test_in', limit=100),
        bonobo.PrettyPrinter(),
    )
    return graph

You should see:

$ python tutorial.py

│ id[0] = 1
│ text[1] = 'Cat'


│ id[0] = 2
│ text[1] = 'Dog'

 - Select in=1 out=2 [done]
 - PrettyPrinter in=2 out=2 [done]

Now let’s insert some data:

import bonobo
import bonobo_sqlalchemy


def get_graph(**options):
    graph = bonobo.Graph()
    graph.add_chain(
        bonobo_sqlalchemy.Select('SELECT * FROM test_in', limit=100),
        bonobo_sqlalchemy.InsertOrUpdate('test_out')
    )

    return graph

If you check the test_out table, it should now have the data.

Reference

bonobo_sqlalchemy

class Select(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

Reads data from a database using a SQL query and a limit-offset based pagination.

Example:

Select('SELECT * from foo;')

Caveats:

We’re using “limit-offset” pagination, but limit-offset pagination can be inconsistent.

Suppose a user moves from page n to n+1 while simultaneously a new element is inserted into page n. This will cause both a duplication (the previously-final element of page n is pushed into page n+1) and an omission (the new element). Alternatively consider an element removed from page n just as the user moves to page n+1. The previously initial element of page n+1 will be shifted to page n and be omitted.

A better implementation could be to use database-side cursors, to have the external system mark the last row extracted and “stabilize” pagination. Here is an example of how this can be done (although it’s not implemented in bonobo-sqlalchemy, for now).

-- We must be in a transaction
BEGIN;
-- Open a cursor for a query
DECLARE select_cursor CURSOR FOR SELECT * FROM foo;
-- Retrieve ten rows
FETCH 10 FROM select_cursor;
-- ...
-- Retrieve ten more from where we left off
FETCH 10 FROM select_cursor;
-- All done
COMMIT;
Parameters
  • query (str) –

    The actual SQL query to run.

    Default: ‘SELECT 1’

  • pack_size (int) –

    How many rows to retrieve at once.

    Default: 1000

  • limit (int) – Maximum rows to retrieve, in total.

  • engine (str) –

    Database connection (an sqlalchemy.engine).

    Default: ‘sqlalchemy.engine’

Custom instance builder. If not all options are fulfilled, will return a PartiallyConfigured instance which is just a functools.partial object that behaves like a Configurable instance.

The special _final argument can be used to force final instance to be created, or an error raised if options are missing.

Parameters
  • args

  • _final – bool

  • kwargs

Returns

Configurable or PartiallyConfigured

formatter(context, index, row)[source]

Formats a result row into whataver you need to send on this transformations’ output stream.

Parameters
  • context

  • index

  • row

Returns

mixed

engine

Database connection (an sqlalchemy.engine).

Default: ‘sqlalchemy.engine’

limit

Maximum rows to retrieve, in total.

pack_size

How many rows to retrieve at once.

Default: 1000

property parameters

Provide parameters for input query.

See https://www.python.org/dev/peps/pep-0249/#paramstyle

Returns

dict

query

The actual SQL query to run.

Default: ‘SELECT 1’

class InsertOrUpdate(*args, **kwargs)[source]

Bases: bonobo.config.configurables.Configurable

TODO: fields vs columns, choose a name (XXX)

Maybe the obvious choice is to keep “field” for row fields, as it’s already the name used by bonobo, and call the database columns “columns”.

Parameters
  • table_name (str) –

  • fetch_columns (tuple) –

  • insert_only_fields (tuple) –

  • discriminant (tuple) –

  • created_at_field (str) –

  • updated_at_field (str) –

  • allowed_operations (tuple) –

  • buffer_size (int) –

  • engine (str) –

Custom instance builder. If not all options are fulfilled, will return a PartiallyConfigured instance which is just a functools.partial object that behaves like a Configurable instance.

The special _final argument can be used to force final instance to be created, or an error raised if options are missing.

Parameters
  • args

  • _final – bool

  • kwargs

Returns

Configurable or PartiallyConfigured

add_fetch_columns(*columns, **aliased_columns)[source]
commit(table, connection, buffer, force=False)[source]
find(connection, table, row)[source]
get_columns_for(column_names, row, dbrow=None)[source]

Retrieve list of table column names for which we have a value in given hash.

insert_or_update(table, connection, row)[source]

Actual database load transformation logic, without the buffering / transaction logic.

allowed_operations
buffer_size
create_buffer
create_connection
create_table
created_at_field
discriminant
engine
fetch_columns
insert_only_fields
table_name
updated_at_field