beam_nuggets.io.relational_db_api module

Wrapper of SqlAlchemy code used for reading from and writing to databases

class beam_nuggets.io.relational_db_api.SourceConfiguration(drivername, host=None, port=None, database=None, username=None, password=None, create_if_missing=False)[source]

Bases: object

Holds parameters for accessing a database.

User to pass database access parameters to SqlAlchemyDB.

SourceConfiguration.url provides the database url used by SqlAlchemy to connect to the database.

Parameters:
  • drivername (str) –

    name of the database backend. It specifies the target database type and the driver (DBAPI) used by SqlAlchemy to communicate with database. The following drivernames are supported by and tested on beam-nuggets:

    • postgresql+pg8000: for PostgreSQL (pg8000 is the DB driver).
    • mysql+pymysql: for MySQL.
    • sqlite: for SQLite.

    Additional drivers can be used after installing their corresponding python libraries. Refer to SqlAlchemy dialects for more information on the supported databases and the corresponding drivers.

  • host (str) – database host name or IP.
  • port (int) – database port number.
  • database (str) – database name.
  • username (str) – database username.
  • password (str) – database password.
  • create_if_missing (bool) – If set to True, it instructs to create a missing database before writing to it.

Examples

MySQL database

from beam_nuggets.io import relational_db
src_cnf = relational_db.SourceConfiguration(
    drivername='mysql+pymysql',
    host='localhost',
    port=37311,
    username='root',
    database='test',
    create_if_missing=True,
)
print(src_cnf.url)
# mysql+pymysql://root@localhost:37311/test

PostgreSQL database

from beam_nuggets.io import relational_db
src_cnf = relational_db.SourceConfiguration(
    drivername='postgresql',
    host='localhost',
    port=42139,
    username='postgres',
    password='pass',
    database='test'
)
print(src_cnf.url)
# postgresql://postgres:pass@localhost:42139/test

SQLite database

from beam_nuggets.io import relational_db
src_cnf = relational_db.SourceConfiguration(
    drivername='sqlite',
    database='/tmp/test_db.sqlite'
)
print(src_cnf.url)
# sqlite:////tmp/test_db.sqlite
class beam_nuggets.io.relational_db_api.SqlAlchemyDB(source_config)[source]

Bases: object

Provides functionality to read and write from and to relational DBs. It uses SqlAlchemy.

Parameters:source_config (SourceConfiguration) – Information for accessing the target database.
close_session()[source]
query(table_name, query)[source]
read(table_name)[source]
start_session()[source]
write_record(table_config, record_dict)[source]

Writes a single record to the specified table.

Parameters:
  • table_config (TableConfiguration) – specifies the target table, how data should inserted and how to create it if it was missing. See TableConfiguration notes on table creation.
  • record_dict (dict) – the record to be written
exception beam_nuggets.io.relational_db_api.SqlAlchemyDbException[source]

Bases: Exception

class beam_nuggets.io.relational_db_api.TableConfiguration(name, define_table_f=None, create_if_missing=False, primary_key_columns=None, create_insert_f=None)[source]

Bases: object

Holds parameters for a database table. Used to pass table parameters to SqlAlchemyDB.

Parameters:
  • name (str) – the table name.
  • create_insert_f (function) –

    a function that takes as input an SqlAlchemy table and a row record, and returns an statement for inserting the record into the table. The function doesn’t execute the insert statement. If not specified, the following default implementations are used:

    As a mechanism to recover from failures, beam runners will attempt to apply a transform multiple times on the same data; because of that it is recommended to implement idempotent writes (e.g. create_upsert_mysql() and create_upsert_postgres()) to avoid data inconsistency issues arising from this beam behavior. The function has the following signature: (sqlalchemy.sql.schema.Table, dict) -> sqlalchemy.sql.dml.Insert.

  • create_if_missing (bool) – if set to True and the table is missing SqlAlchemyDB will create the table. See below notes on new table creation. See below note how this is used when creating new tables.
  • primary_key_columns (list) – list of column names to be used as primary key (if multiple columns are specified, a composite key is created), when creating the table. See below notes on new table creation.
  • define_table_f (function) – A function for defining an SqlAlchemy table (the function doesn’t create the table); the definition is used when creating the table. The function has the following signature: (Sqlalchemy.Metadata) -> sqlalchemy.Table. See below notes on how this is used when creating missing tables. See this define table tutorial for how to implement the function.

Notes

When attempting to write to a missing database table, SqlAlchemyDB will handle the situation based on the values create_if_missing, primary_key_columns and define_table_f of the passed TableConfiguration, as follows:

  • If the table is missing and create_if_missing is set to False (default), SqlAlchemyDB will raise an exception.
  • Only when the target table is missing and create_if_missing is set to True, table creation is attempted. This is the assumed state for all the following cases.
  • If define_table_f is specified, a new table will be created using the table definition returned by define_table_f, irrespective of primary_key_columns.
  • If primary_key_columns is specified and define_table_f is None, a new table will be created using the columns specified in primary_key_columns as the primary key. The full column list and their types are inferred automatically using the first record to be written. See infer_db_type() for information on the how the database column types are inferred from the python types. If primary_key_columns is also None, an auto_increment Integer column will be created and used as primary key this is done as some databases require a primary key to be specified when creating tables.

Examples

A configuration for creating the table if missing using the specified columns to create the primary key.

from beam_nuggets.io import relational_db

table_config = relational_db.TableConfiguration(
    name='students',
    create_if_missing=True,
    primary_key_columns=['id']
)

A configuration for creating the table if missing using the specified definition.

from sqlalchemy import Table, Integer, String, Column
from beam_nuggets.io import relational_db

table_name = 'students'

def define_students_table(metadata):
    return Table(
        table_name, metadata,
        Column(ID, Integer, primary_key=True),
        Column(NAME, String(100)),
        Column(AGE, Integer)
    )

table_config = relational_db.TableConfiguration(
    name=table_name,
    create_if_missing=True,
    define_table_f=define_students_table
)
beam_nuggets.io.relational_db_api.create_insert(table, record)[source]

Creates a statement for inserting the passed record to the passed table. The created statement is not executed by this function.

For information on creating insert and update statements Refer to these SqlAlchemy documentation and tutorial.

Parameters:
  • table (sqlalchemy.sql.schema.Table) – database table metadata.
  • record (dict) – a data record, corresponding to one row, to be inserted.
Returns:

a statement for inserting the passed record to the specified table.

Return type:

sqlalchemy.sql.dml.Insert

beam_nuggets.io.relational_db_api.create_table(session, name, table_config, record)[source]
beam_nuggets.io.relational_db_api.create_table_class(sqlalchemy_table)[source]
beam_nuggets.io.relational_db_api.create_upsert_mysql(table, record)[source]

Creates a statement for inserting the passed record to the passed table; if the record already exists, the existing record will be updated. This uses MySQL on_duplicate_key_update (hence upsert), and that why the returned statement is valid only for MySQL tables. Refer to this SqlAlchemy MySQL documentation for more information.

The created statement is not executed by this function.

Parameters:
  • table (sqlalchemy.sql.schema.Table) – database table metadata.
  • record (dict) – a data record, corresponding to one row, to be inserted.
Returns:

a statement for inserting the passed record to the specified table.

Return type:

sqlalchemy.sql.dml.Insert

beam_nuggets.io.relational_db_api.create_upsert_postgres(table, record)[source]

Creates a statement for inserting the passed record to the passed table; if the record already exists, the existing record will be updated. This uses PostgreSQL on_conflict_do_update (hence upsert), and that why the returned statement is just valid for PostgreSQL tables. Refer to this SqlAlchemy PostgreSQL documentation for more information.

The created statement is not executed by this function.

Parameters:
  • table (sqlalchemy.sql.schema.Table) – database table metadata.
  • record (dict) – a data record, corresponding to one row, to be inserted.
Returns:

a statement for inserting the passed record to the specified table.

Return type:

sqlalchemy.sql.dml.Insert

beam_nuggets.io.relational_db_api.get_column_names_from_table(table_class)[source]
beam_nuggets.io.relational_db_api.infer_db_type(val, drivername)[source]

Infer a database column type for storing values of the same type as the passed variable val in a database identified by drivername.

Column types are inferred based on the following mapping:

Python type Column type
bool Boolean
<number> Float (All python numbers are mapped to Float columns)
datetime.datetime DateTime
datetime.date Date
<default> String for PostgreSQL and SQLite and String(VARCHAR_DEFAULT_COL_SIZE) for other databases
Parameters:
  • val (object) – value used to infer the database column type.
  • drivername – specifies the database type and driver used for connecting to the database (the driver information isn’t used to infer the column type).
Returns:

one of sqlalchemy column types.

Return type:

object

beam_nuggets.io.relational_db_api.load_table(session, name)[source]