Source code for beam_nuggets.transforms.nested_dict

from __future__ import division, print_function

import apache_beam as beam


[docs]class SelectFromNestedDict(beam.DoFn): """A :class:`~apache_beam.DoFn` for selecting subsets of records formed of nested dictionaries. Args: keys (list): list of dictionary keys to be selected. Each key is a string formed of dot separated words, each is used for selecting from a dict in the nested dicts. The order of the word in the "dot.separated.string" correspond to the dict level to select from. For instance, if the input record is ``{'a': {'b': 3, 'c': {'d': 2} }}`` and keys is ``['a.b', 'a.c.d']``, the transform output will ``{'a_b': 3, 'a_c_d': 2} or {'b': 3, 'd': 2}`` depending on ``deepest_key_as_name `` below. deepest_key_as_name (bool): if set to True, the deepest selected fields keys will be used as names for the output dict keys. Examples: Select from records formed of nested dicts. :: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.transforms import SelectFromNestedDict inputs = [ { 'name': {'first': 'Star', 'second': 'Light'}, 'address': {'st': 'Jupiter', 'flat': 3}, 'email': 's@l.no' }, { 'name': {'first': 'Mark', 'second': 'Sight'}, 'address': {'st': 'Loon', 'flat': 5}, 'email': 'm@s.no' } ] with beam.Pipeline(options=PipelineOptions()) as p: nested = p | "Reading nested dicts" >> beam.Create(inputs) transformed = nested | "filtering" >> beam.ParDo(SelectFromNestedDict( keys=['name.first', 'address.st', 'email'], # deepest_key_as_name=True, )) transformed | 'Writing to stdout' >> beam.Map(print) The output will be something like: {'address_st': 'Jupiter', 'name_first': 'Star', 'email': 's@l.no'} {'address_st': 'Loon', 'name_first': 'Mark', 'email': 'm@s.no'} """ def __init__(self, keys, deepest_key_as_name=False, *args, **kwargs): super(SelectFromNestedDict, self).__init__(*args, **kwargs) self._compiled_keys = self._compile_keys(keys, deepest_key_as_name)
[docs] def process(self, element): """ Args: element(dict): """ yield { out_key: self._retrieve(nested_keys, element) for nested_keys, out_key in self._compiled_keys }
@staticmethod def _retrieve(nested_keys, element): for key in nested_keys: element = element[key] return element @staticmethod def _compile_keys(keys, deepest_key_as_name): def _get_out_dict_key(nested_keys): if deepest_key_as_name: return nested_keys[-1] else: return '_'.join(nested_keys) return [ ( nested_keys, # ['a', 'b'] used for retrieving nested values _get_out_dict_key(nested_keys), # 'a_b' or 'b' ) for nested_keys in [key.split('.') for key in keys] # ['a.b'] ]