beam_nuggets.io.relational_db module¶
PTransform s for reading from
and writing to relational databases. SQLAlchemy is used for interfacing the
databases.
- 
class beam_nuggets.io.relational_db.ReadFromDB(source_config, table_name, query='', *args, **kwargs)[source]¶
- Bases: - apache_beam.transforms.ptransform.PTransform- A - PTransformfor reading tables on relational databases.- It outputs a - PCollectionof- dict:s, each corresponding to a row in the target database table.- Parameters: - source_config (SourceConfiguration) – specifies the target database.
- table_name (str) – the name of the table to be read.
- query (str) – the SQL query to run against the table.
 - Examples - Reading from a table on a postgres database. - import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_name = 'months' with beam.Pipeline(options=PipelineOptions()) as p: records = p | "Reading records from db" >> relational_db.ReadFromDB( source_config=source_config, table_name=table_name, query='select name, num from months' # optional. When omitted, all table records are returned. ) records | 'Writing to stdout' >> beam.Map(print) - The output will be something like - {'name': 'Jan', 'num': 1} {'name': 'Feb', 'num': 2} - Where “name” and “num” are the column names. 
- 
class beam_nuggets.io.relational_db.Write(source_config, table_config, *args, **kwargs)[source]¶
- Bases: - apache_beam.transforms.ptransform.PTransform- A - PTransformfor writing to tables on relational databases.- Parameters: - source_config (SourceConfiguration) – specifies the target database. If
source_config.create_if_missingis set to True, the target database will be created if it was missing.
- table_config (TableConfiguration) – specifies the target table, as well as other optional parameters: - option for specifying custom insert statements.
- options for creating the target table if it was missing.
 See TableConfigurationfor details.
 - Examples - Writing to postgres database table. - import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db records = [ {'name': 'Jan', 'num': 1}, {'name': 'Feb', 'num': 2}, ] source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_config = relational_db.TableConfiguration( name='months', create_if_missing=True, primary_key_columns=['num'] ) with beam.Pipeline(options=PipelineOptions()) as p: months = p | "Reading month records" >> beam.Create(records) months | 'Writing to DB table' >> relational_db.Write( source_config=source_config, table_config=table_config ) 
- source_config (SourceConfiguration) – specifies the target database. If