beam_nuggets.io.relational_db module¶
PTransform
s for reading from
and writing to relational databases. SQLAlchemy is used for interfacing the
databases.
-
class
beam_nuggets.io.relational_db.
ReadFromDB
(source_config, table_name, query='', *args, **kwargs)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
A
PTransform
for reading tables on relational databases.It outputs a
PCollection
ofdict:s
, each corresponding to a row in the target database table.Parameters: - source_config (SourceConfiguration) – specifies the target database.
- table_name (str) – the name of the table to be read.
- query (str) – the SQL query to run against the table.
Examples
Reading from a table on a postgres database.
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_name = 'months' with beam.Pipeline(options=PipelineOptions()) as p: records = p | "Reading records from db" >> relational_db.ReadFromDB( source_config=source_config, table_name=table_name, query='select name, num from months' # optional. When omitted, all table records are returned. ) records | 'Writing to stdout' >> beam.Map(print)
The output will be something like
{'name': 'Jan', 'num': 1} {'name': 'Feb', 'num': 2}
Where “name” and “num” are the column names.
-
class
beam_nuggets.io.relational_db.
Write
(source_config, table_config, *args, **kwargs)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
A
PTransform
for writing to tables on relational databases.Parameters: - source_config (SourceConfiguration) – specifies the target database. If
source_config.create_if_missing
is set to True, the target database will be created if it was missing. - table_config (TableConfiguration) –
specifies the target table, as well as other optional parameters:
- option for specifying custom insert statements.
- options for creating the target table if it was missing.
See
TableConfiguration
for details.
Examples
Writing to postgres database table.
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db records = [ {'name': 'Jan', 'num': 1}, {'name': 'Feb', 'num': 2}, ] source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_config = relational_db.TableConfiguration( name='months', create_if_missing=True, primary_key_columns=['num'] ) with beam.Pipeline(options=PipelineOptions()) as p: months = p | "Reading month records" >> beam.Create(records) months | 'Writing to DB table' >> relational_db.Write( source_config=source_config, table_config=table_config )
- source_config (SourceConfiguration) – specifies the target database. If