Source code for beam_nuggets.io.relational_db

""":class:`~apache_beam.transforms.ptransform.PTransform` s for reading from
and writing to relational databases. SQLAlchemy_ is used for interfacing the
databases.

.. _SQLAlchemy: https://www.sqlalchemy.org/
"""

from __future__ import division, print_function

from apache_beam import PTransform, Create, ParDo, DoFn

from beam_nuggets.io.relational_db_api import (
    SqlAlchemyDB,
    SourceConfiguration,
    TableConfiguration
)

# It is intended SourceConfiguration and TableConfiguration are imported
# from this module by the library users. Below is to make sure they've been
# imported to this module.
assert SourceConfiguration is not None
assert TableConfiguration is not None


[docs]class ReadFromDB(PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading tables on relational databases. It outputs a :class:`~apache_beam.pvalue.PCollection` of ``dict:s``, each corresponding to a row in the target database table. Args: source_config (SourceConfiguration): specifies the target database. table_name (str): the name of the table to be read. query (str): the SQL query to run against the table. Examples: Reading from a table on a postgres database. :: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_name = 'months' with beam.Pipeline(options=PipelineOptions()) as p: records = p | "Reading records from db" >> relational_db.ReadFromDB( source_config=source_config, table_name=table_name, query='select name, num from months' # optional. When omitted, all table records are returned. ) records | 'Writing to stdout' >> beam.Map(print) The output will be something like :: {'name': 'Jan', 'num': 1} {'name': 'Feb', 'num': 2} Where "name" and "num" are the column names. """ def __init__(self, source_config, table_name, query='', *args, **kwargs): super(ReadFromDB, self).__init__(*args, **kwargs) self._read_args = dict( source_config=source_config, table_name=table_name, query=query )
[docs] def expand(self, pcoll): return ( pcoll | Create([self._read_args]) | ParDo(_ReadFromRelationalDBFn()) )
class _ReadFromRelationalDBFn(DoFn): def process(self, element): db_args = dict(element) table_name = db_args.pop('table_name') query = db_args.pop('query') db = SqlAlchemyDB(**db_args) db.start_session() try: if query: for record in db.query(table_name, query): yield record else: for record in db.read(table_name): yield record except: raise finally: db.close_session()
[docs]class Write(PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to tables on relational databases. Args: source_config (SourceConfiguration): specifies the target database. If ``source_config.create_if_missing`` is set to True, the target database will be created if it was missing. table_config (TableConfiguration): specifies the target table, as well as other optional parameters: - option for specifying custom insert statements. - options for creating the target table if it was missing. See :class:`~beam_nuggets.io.relational_db_api.TableConfiguration` for details. Examples: Writing to postgres database table. :: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db records = [ {'name': 'Jan', 'num': 1}, {'name': 'Feb', 'num': 2}, ] source_config = relational_db.SourceConfiguration( drivername='postgresql', host='localhost', port=5432, username='postgres', password='password', database='calendar', ) table_config = relational_db.TableConfiguration( name='months', create_if_missing=True, primary_key_columns=['num'] ) with beam.Pipeline(options=PipelineOptions()) as p: months = p | "Reading month records" >> beam.Create(records) months | 'Writing to DB table' >> relational_db.Write( source_config=source_config, table_config=table_config ) """ def __init__(self, source_config, table_config, *args, **kwargs): super(Write, self).__init__(*args, **kwargs) self.source_config = source_config self.table_config = table_config
[docs] def expand(self, pcoll): return pcoll | ParDo(_WriteToRelationalDBFn( source_config=self.source_config, table_config=self.table_config ))
class _WriteToRelationalDBFn(DoFn): def __init__(self, source_config, table_config, *args, **kwargs): super(_WriteToRelationalDBFn, self).__init__(*args, **kwargs) self.source_config = source_config self.table_config = table_config def start_bundle(self): self._db = SqlAlchemyDB(self.source_config) self._db.start_session() def process(self, element): assert isinstance(element, dict) self._db.write_record(self.table_config, element) def finish_bundle(self): self._db.close_session()