beam_nuggets.transforms.nested_dict module

class beam_nuggets.transforms.nested_dict.SelectFromNestedDict(keys, deepest_key_as_name=False, *args, **kwargs)[source]

Bases: apache_beam.transforms.core.DoFn

A DoFn for selecting subsets of records formed of nested dictionaries.

Parameters:
  • keys (list) – list of dictionary keys to be selected. Each key is a string formed of dot separated words, each is used for selecting from a dict in the nested dicts. The order of the word in the “dot.separated.string” correspond to the dict level to select from. For instance, if the input record is {'a': {'b': 3, 'c': {'d': 2} }} and keys is ['a.b', 'a.c.d'], the transform output will {'a_b': 3, 'a_c_d': 2} or {'b': 3, 'd': 2} depending on ``deepest_key_as_name `` below.
  • deepest_key_as_name (bool) – if set to True, the deepest selected fields keys will be used as names for the output dict keys.

Examples

Select from records formed of nested dicts.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.transforms import SelectFromNestedDict

inputs = [
    {
        'name': {'first': 'Star', 'second': 'Light'},
        'address': {'st': 'Jupiter', 'flat': 3},
        'email': 's@l.no'
    },
    {
        'name': {'first': 'Mark', 'second': 'Sight'},
        'address': {'st': 'Loon', 'flat': 5},
        'email': 'm@s.no'
    }
]
with beam.Pipeline(options=PipelineOptions()) as p:
    nested = p | "Reading nested dicts" >> beam.Create(inputs)
    transformed = nested | "filtering" >> beam.ParDo(SelectFromNestedDict(
        keys=['name.first', 'address.st', 'email'],
        # deepest_key_as_name=True,
    ))
    transformed | 'Writing to stdout' >> beam.Map(print)

The output will be something like:

{‘address_st’: ‘Jupiter’, ‘name_first’: ‘Star’, ‘email’: ‘s@l.no’} {‘address_st’: ‘Loon’, ‘name_first’: ‘Mark’, ‘email’: ‘m@s.no’}
process(element)[source]
Parameters:element (dict) –