Source code for beam_nuggets.io.csvio

from __future__ import division, print_function

import csv

import apache_beam as beam


[docs]class Read(beam.PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading csv files. It outputs a :class:`~apache_beam.pvalue.PCollection` of ``dict:s``, each corresponding to a row in the csv file. Args: csv_path (str): csv file path. Examples: Reading content of a csv file. :: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import csvio path_to_csv = '/path/to/students.csv' with beam.Pipeline(options=PipelineOptions()) as p: students = p | "Reading students records" >> csvio.Read(path_to_csv) students | 'Writing to stdout' >> beam.Map(print) The output will be something like :: {'lastName': 'Norvell', 'firstName': 'Andrel', 'level': '10'} {'lastName': 'Proudfoot', 'firstName': 'Dinorah', 'level': '8'} {'lastName': 'Plotkin', 'firstName': 'Trulal', 'level': '14'} """ def __init__(self, csv_path, *args, **kwargs): super(Read, self).__init__(*args, **kwargs) self._csv_path = csv_path
[docs] def expand(self, pcoll): return pcoll | beam.io.Read(_CsvSource(self._csv_path))
class _CsvSource(beam.io.filebasedsource.FileBasedSource): def read_records(self, file_name, range_tracker): # FIXME handle concurrent read self._file = self.open_file(file_name) for rec in csv.DictReader(self._file): yield rec