Source code for beam_nuggets.io.kafkaio

from __future__ import division, print_function

from apache_beam import PTransform, ParDo, DoFn, Create
from kafka import KafkaConsumer, KafkaProducer


[docs]class KafkaConsume(PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading from an Apache Kafka topic. This is a streaming Transform that never returns. The transform uses `KafkaConsumer` from the `kafka` python library. It outputs a :class:`~apache_beam.pvalue.PCollection` of ``key-values:s``, each object is a Kafka message in the form (msg-key, msg) Args: consumer_config (dict): the kafka consumer configuration. The topic to be subscribed to should be specified with a key called 'topic'. The remaining configurations are those of `KafkaConsumer` from the `kafka` python library. value_decoder (function): Optional function to decode the consumed message value. If not specified, "bytes.decode" is used by default. "bytes.decode" which assumes "utf-8" encoding. Examples: Consuming from a Kafka Topic `notifications` :: import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import kafkaio consumer_config = {"topic": "notifications", "bootstrap_servers": "localhost:9092", "group_id": "notification_consumer_group"} with beam.Pipeline(options=PipelineOptions()) as p: notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume( consumer_config=consumer_config, value_decoder=bytes.decode, # optional ) notifications | 'Writing to stdout' >> beam.Map(print) The output will be something like :: ("device 1", {"status": "healthy"}) ("job #2647", {"status": "failed"}) Where the first element of the tuple is the Kafka message key and the second element is the Kafka message being passed through the topic """ def __init__(self, consumer_config, value_decoder=None, *args, **kwargs): """Initializes ``KafkaConsume`` """ super(KafkaConsume, self).__init__() self._consumer_args = dict( consumer_config=consumer_config, value_decoder=value_decoder, )
[docs] def expand(self, pcoll): return ( pcoll | Create([self._consumer_args]) | ParDo(_ConsumeKafkaTopic()) )
class _ConsumeKafkaTopic(DoFn): """Internal ``DoFn`` to read from Kafka topic and return messages""" def process(self, consumer_args): consumer_config = consumer_args.pop('consumer_config') topic = consumer_config.pop('topic') value_decoder = consumer_args.pop('value_decoder') or bytes.decode consumer = KafkaConsumer(topic, **consumer_config) for msg in consumer: try: yield msg.key, value_decoder(msg.value) except Exception as e: print(e) continue
[docs]class KafkaProduce(PTransform): """A :class:`~apache_beam.transforms.ptransform.PTransform` for pushing messages into an Apache Kafka topic. This class expects a tuple with the first element being the message key and the second element being the message. The transform uses `KafkaProducer` from the `kafka` python library. Args: topic: Kafka topic to publish to servers: list of Kafka servers to listen to Examples: Examples: Pushing message to a Kafka Topic `notifications` :: from __future__ import print_function import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import kafkaio with beam.Pipeline(options=PipelineOptions()) as p: notifications = (p | "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')]) | "Pushing messages to Kafka" >> kafkaio.KafkaProduce( topic='notifications', servers="localhost:9092" ) ) notifications | 'Writing to stdout' >> beam.Map(print) The output will be something like :: ("dev_1", '{"device": "0001", status": "healthy"}') Where the key is the Kafka topic published to and the element is the Kafka message produced """ def __init__(self, topic=None, servers='127.0.0.1:9092'): """Initializes ``KafkaProduce`` """ super(KafkaProduce, self).__init__() self._attributes = dict( topic=topic, servers=servers)
[docs] def expand(self, pcoll): return ( pcoll | ParDo(_ProduceKafkaMessage(self._attributes)) )
class _ProduceKafkaMessage(DoFn): """Internal ``DoFn`` to publish message to Kafka topic""" def __init__(self, attributes, *args, **kwargs): super(_ProduceKafkaMessage, self).__init__(*args, **kwargs) self.attributes = attributes def start_bundle(self): self._producer = KafkaProducer(bootstrap_servers=self.attributes["servers"]) def finish_bundle(self): self._producer.close() def process(self, element): try: self._producer.send(self.attributes['topic'], element[1].encode(), key=element[0].encode()) yield element except Exception as e: raise