from __future__ import division, print_function
import apache_beam as beam
[docs]class SelectFromNestedDict(beam.DoFn):
"""A :class:`~apache_beam.DoFn` for selecting subsets of records formed of
nested dictionaries.
Args:
keys (list): list of dictionary keys to be selected. Each key is a
string formed of dot separated words, each is used for selecting
from a dict in the nested dicts. The order of the word in the
"dot.separated.string" correspond to the dict level to select from.
For instance, if the input record is ``{'a': {'b': 3, 'c': {'d': 2} }}``
and keys is ``['a.b', 'a.c.d']``, the transform output will
``{'a_b': 3, 'a_c_d': 2} or {'b': 3, 'd': 2}`` depending on
``deepest_key_as_name `` below.
deepest_key_as_name (bool): if set to True, the deepest selected fields
keys will be used as names for the output dict keys.
Examples:
Select from records formed of nested dicts. ::
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.transforms import SelectFromNestedDict
inputs = [
{
'name': {'first': 'Star', 'second': 'Light'},
'address': {'st': 'Jupiter', 'flat': 3},
'email': 's@l.no'
},
{
'name': {'first': 'Mark', 'second': 'Sight'},
'address': {'st': 'Loon', 'flat': 5},
'email': 'm@s.no'
}
]
with beam.Pipeline(options=PipelineOptions()) as p:
nested = p | "Reading nested dicts" >> beam.Create(inputs)
transformed = nested | "filtering" >> beam.ParDo(SelectFromNestedDict(
keys=['name.first', 'address.st', 'email'],
# deepest_key_as_name=True,
))
transformed | 'Writing to stdout' >> beam.Map(print)
The output will be something like:
{'address_st': 'Jupiter', 'name_first': 'Star', 'email': 's@l.no'}
{'address_st': 'Loon', 'name_first': 'Mark', 'email': 'm@s.no'}
"""
def __init__(self, keys, deepest_key_as_name=False, *args, **kwargs):
super(SelectFromNestedDict, self).__init__(*args, **kwargs)
self._compiled_keys = self._compile_keys(keys, deepest_key_as_name)
[docs] def process(self, element):
"""
Args:
element(dict):
"""
yield {
out_key: self._retrieve(nested_keys, element)
for nested_keys, out_key in self._compiled_keys
}
@staticmethod
def _retrieve(nested_keys, element):
for key in nested_keys:
element = element[key]
return element
@staticmethod
def _compile_keys(keys, deepest_key_as_name):
def _get_out_dict_key(nested_keys):
if deepest_key_as_name:
return nested_keys[-1]
else:
return '_'.join(nested_keys)
return [
(
nested_keys, # ['a', 'b'] used for retrieving nested values
_get_out_dict_key(nested_keys), # 'a_b' or 'b'
)
for nested_keys in [key.split('.') for key in keys] # ['a.b']
]