beam_nuggets.io.relational_db module

PTransform s for reading from and writing to relational databases. SQLAlchemy is used for interfacing the databases.

class beam_nuggets.io.relational_db.ReadFromDB(source_config, table_name, query='', *args, **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for reading tables on relational databases.

It outputs a PCollection of dict:s, each corresponding to a row in the target database table.

Parameters:
  • source_config (SourceConfiguration) – specifies the target database.
  • table_name (str) – the name of the table to be read.
  • query (str) – the SQL query to run against the table.

Examples

Reading from a table on a postgres database.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

source_config = relational_db.SourceConfiguration(
    drivername='postgresql',
    host='localhost',
    port=5432,
    username='postgres',
    password='password',
    database='calendar',
)
table_name = 'months'

with beam.Pipeline(options=PipelineOptions()) as p:
    records = p | "Reading records from db" >> relational_db.ReadFromDB(
        source_config=source_config,
        table_name=table_name,
        query='select name, num from months'  # optional. When omitted, all table records are returned.
    )
    records | 'Writing to stdout' >> beam.Map(print)

The output will be something like

{'name': 'Jan', 'num': 1}
{'name': 'Feb', 'num': 2}

Where “name” and “num” are the column names.

expand(pcoll)[source]
class beam_nuggets.io.relational_db.Write(source_config, table_config, *args, **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for writing to tables on relational databases.

Parameters:
  • source_config (SourceConfiguration) – specifies the target database. If source_config.create_if_missing is set to True, the target database will be created if it was missing.
  • table_config (TableConfiguration) –

    specifies the target table, as well as other optional parameters:

    • option for specifying custom insert statements.
    • options for creating the target table if it was missing.

    See TableConfiguration for details.

Examples

Writing to postgres database table.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

records = [
    {'name': 'Jan', 'num': 1},
    {'name': 'Feb', 'num': 2},
]

source_config = relational_db.SourceConfiguration(
    drivername='postgresql',
    host='localhost',
    port=5432,
    username='postgres',
    password='password',
    database='calendar',
)

table_config = relational_db.TableConfiguration(
    name='months',
    create_if_missing=True,
    primary_key_columns=['num']
)

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create(records)
    months | 'Writing to DB table' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )
expand(pcoll)[source]