Source code for beam_nuggets.transforms.json_

from __future__ import division, print_function

import json

import apache_beam as beam
from beam_nuggets.compat import iteritems


[docs]class ParseJson(beam.DoFn): def __init__(self, only_keys=None, *unused_args, **unused_kwargs): self.only_keys = only_keys super(ParseJson, self).__init__(*unused_args, **unused_kwargs)
[docs] def process(self, element): """ Args: element(dict): """ yield { k: json.loads(v) if k in self.only_keys else v for k, v in iteritems(element) }