beam_nuggets.io.relational_db_api module¶
Wrapper of SqlAlchemy code used for reading from and writing to databases
-
class
beam_nuggets.io.relational_db_api.
SourceConfiguration
(drivername, host=None, port=None, database=None, username=None, password=None, create_if_missing=False)[source]¶ Bases:
object
Holds parameters for accessing a database.
User to pass database access parameters to
SqlAlchemyDB
.SourceConfiguration.url
provides the database url used by SqlAlchemy to connect to the database.Parameters: - drivername (str) –
name of the database backend. It specifies the target database type and the driver (DBAPI) used by SqlAlchemy to communicate with database. The following drivernames are supported by and tested on beam-nuggets:
- postgresql+pg8000: for PostgreSQL (pg8000 is the DB driver).
- mysql+pymysql: for MySQL.
- sqlite: for SQLite.
Additional drivers can be used after installing their corresponding python libraries. Refer to SqlAlchemy dialects for more information on the supported databases and the corresponding drivers.
- host (str) – database host name or IP.
- port (int) – database port number.
- database (str) – database name.
- username (str) – database username.
- password (str) – database password.
- create_if_missing (bool) – If set to True, it instructs to create a missing database before writing to it.
Examples
MySQL database
from beam_nuggets.io import relational_db src_cnf = relational_db.SourceConfiguration( drivername='mysql+pymysql', host='localhost', port=37311, username='root', database='test', create_if_missing=True, ) print(src_cnf.url) # mysql+pymysql://root@localhost:37311/test
PostgreSQL database
from beam_nuggets.io import relational_db src_cnf = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=42139, username='postgres', password='pass', database='test' ) print(src_cnf.url) # postgresql://postgres:pass@localhost:42139/test
SQLite database
from beam_nuggets.io import relational_db src_cnf = relational_db.SourceConfiguration( drivername='sqlite', database='/tmp/test_db.sqlite' ) print(src_cnf.url) # sqlite:////tmp/test_db.sqlite
- drivername (str) –
-
class
beam_nuggets.io.relational_db_api.
SqlAlchemyDB
(source_config)[source]¶ Bases:
object
Provides functionality to read and write from and to relational DBs. It uses SqlAlchemy.
Parameters: source_config (SourceConfiguration) – Information for accessing the target database. -
write_record
(table_config, record_dict)[source]¶ Writes a single record to the specified table.
Parameters: - table_config (TableConfiguration) – specifies the target table,
how data should inserted and how to create it if it was
missing. See
TableConfiguration
notes on table creation. - record_dict (dict) – the record to be written
- table_config (TableConfiguration) – specifies the target table,
how data should inserted and how to create it if it was
missing. See
-
-
class
beam_nuggets.io.relational_db_api.
TableConfiguration
(name, define_table_f=None, create_if_missing=False, primary_key_columns=None, create_insert_f=None)[source]¶ Bases:
object
Holds parameters for a database table. Used to pass table parameters to
SqlAlchemyDB
.Parameters: - name (str) – the table name.
- create_insert_f (function) –
a function that takes as input an SqlAlchemy table and a row record, and returns an statement for inserting the record into the table. The function doesn’t execute the insert statement. If not specified, the following default implementations are used:
create_upsert_mysql()
for MySQL tables.create_upsert_postgres()
for PostgreSQL tables.create_insert()
for other databases.
As a mechanism to recover from failures, beam runners will attempt to apply a transform multiple times on the same data; because of that it is recommended to implement idempotent writes (e.g.
create_upsert_mysql()
andcreate_upsert_postgres()
) to avoid data inconsistency issues arising from this beam behavior. The function has the following signature: (sqlalchemy.sql.schema.Table
,dict
) ->sqlalchemy.sql.dml.Insert
. - create_if_missing (bool) – if set to True and the table is missing
SqlAlchemyDB
will create the table. See below notes on new table creation. See below note how this is used when creating new tables. - primary_key_columns (list) – list of column names to be used as primary key (if multiple columns are specified, a composite key is created), when creating the table. See below notes on new table creation.
- define_table_f (function) – A function for defining an SqlAlchemy
table (the function doesn’t create the table); the definition is
used when creating the table. The function has the following
signature: (
Sqlalchemy.Metadata
) ->sqlalchemy.Table
. See below notes on how this is used when creating missing tables. See this define table tutorial for how to implement the function.
Notes
When attempting to write to a missing database table,
SqlAlchemyDB
will handle the situation based on the valuescreate_if_missing
,primary_key_columns
anddefine_table_f
of the passedTableConfiguration
, as follows:- If the table is missing and
create_if_missing
is set to False (default),SqlAlchemyDB
will raise an exception. - Only when the target table is missing and
create_if_missing
is set to True, table creation is attempted. This is the assumed state for all the following cases. - If
define_table_f
is specified, a new table will be created using the table definition returned bydefine_table_f
, irrespective ofprimary_key_columns
. - If
primary_key_columns
is specified anddefine_table_f
is None, a new table will be created using the columns specified inprimary_key_columns
as the primary key. The full column list and their types are inferred automatically using the first record to be written. Seeinfer_db_type()
for information on the how the database column types are inferred from the python types. Ifprimary_key_columns
is also None, an auto_increment Integer column will be created and used as primary key this is done as some databases require a primary key to be specified when creating tables.
Examples
A configuration for creating the table if missing using the specified columns to create the primary key.
from beam_nuggets.io import relational_db table_config = relational_db.TableConfiguration( name='students', create_if_missing=True, primary_key_columns=['id'] )
A configuration for creating the table if missing using the specified definition.
from sqlalchemy import Table, Integer, String, Column from beam_nuggets.io import relational_db table_name = 'students' def define_students_table(metadata): return Table( table_name, metadata, Column(ID, Integer, primary_key=True), Column(NAME, String(100)), Column(AGE, Integer) ) table_config = relational_db.TableConfiguration( name=table_name, create_if_missing=True, define_table_f=define_students_table )
-
beam_nuggets.io.relational_db_api.
create_insert
(table, record)[source]¶ Creates a statement for inserting the passed record to the passed table. The created statement is not executed by this function.
For information on creating insert and update statements Refer to these SqlAlchemy documentation and tutorial.
Parameters: - table (sqlalchemy.sql.schema.Table) – database table metadata.
- record (dict) – a data record, corresponding to one row, to be inserted.
Returns: a statement for inserting the passed record to the specified table.
Return type: sqlalchemy.sql.dml.Insert
-
beam_nuggets.io.relational_db_api.
create_upsert_mysql
(table, record)[source]¶ Creates a statement for inserting the passed record to the passed table; if the record already exists, the existing record will be updated. This uses MySQL on_duplicate_key_update (hence upsert), and that why the returned statement is valid only for MySQL tables. Refer to this SqlAlchemy MySQL documentation for more information.
The created statement is not executed by this function.
Parameters: - table (sqlalchemy.sql.schema.Table) – database table metadata.
- record (dict) – a data record, corresponding to one row, to be inserted.
Returns: a statement for inserting the passed record to the specified table.
Return type: sqlalchemy.sql.dml.Insert
-
beam_nuggets.io.relational_db_api.
create_upsert_postgres
(table, record)[source]¶ Creates a statement for inserting the passed record to the passed table; if the record already exists, the existing record will be updated. This uses PostgreSQL on_conflict_do_update (hence upsert), and that why the returned statement is just valid for PostgreSQL tables. Refer to this SqlAlchemy PostgreSQL documentation for more information.
The created statement is not executed by this function.
Parameters: - table (sqlalchemy.sql.schema.Table) – database table metadata.
- record (dict) – a data record, corresponding to one row, to be inserted.
Returns: a statement for inserting the passed record to the specified table.
Return type: sqlalchemy.sql.dml.Insert
-
beam_nuggets.io.relational_db_api.
infer_db_type
(val, drivername)[source]¶ Infer a database column type for storing values of the same type as the passed variable val in a database identified by drivername.
Column types are inferred based on the following mapping:
Python type Column type bool
Boolean
<number>
Float
(All python numbers are mapped to Float columns)datetime.datetime
DateTime
datetime.date
Date
<default>
String
for PostgreSQL and SQLite and String(VARCHAR_DEFAULT_COL_SIZE
) for other databasesParameters: - val (object) – value used to infer the database column type.
- drivername – specifies the database type and driver used for connecting to the database (the driver information isn’t used to infer the column type).
Returns: one of sqlalchemy column types.
Return type: object