Source code for beam_nuggets.transforms.json_
from __future__ import division, print_function
import json
import apache_beam as beam
from beam_nuggets.compat import iteritems
[docs]class ParseJson(beam.DoFn):
def __init__(self, only_keys=None, *unused_args, **unused_kwargs):
self.only_keys = only_keys
super(ParseJson, self).__init__(*unused_args, **unused_kwargs)
[docs] def process(self, element):
"""
Args:
element(dict):
"""
yield {
k: json.loads(v) if k in self.only_keys else v
for k, v in iteritems(element)
}