beam_nuggets.io.kafkaio module

class beam_nuggets.io.kafkaio.KafkaConsume(consumer_config, value_decoder=None, *args, **kwargs)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for reading from an Apache Kafka topic. This is a streaming Transform that never returns. The transform uses KafkaConsumer from the kafka python library.

It outputs a PCollection of key-values:s, each object is a Kafka message in the form (msg-key, msg)

Parameters:
  • consumer_config (dict) – the kafka consumer configuration. The topic to be subscribed to should be specified with a key called ‘topic’. The remaining configurations are those of KafkaConsumer from the kafka python library.
  • value_decoder (function) – Optional function to decode the consumed message value. If not specified, “bytes.decode” is used by default. “bytes.decode” which assumes “utf-8” encoding.

Examples

Consuming from a Kafka Topic notifications

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

consumer_config = {"topic": "notifications",
                   "bootstrap_servers": "localhost:9092",
                   "group_id": "notification_consumer_group"}

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(
        consumer_config=consumer_config,
        value_decoder=bytes.decode,  # optional
    )
    notifications | 'Writing to stdout' >> beam.Map(print)

The output will be something like

("device 1", {"status": "healthy"})
("job #2647", {"status": "failed"})

Where the first element of the tuple is the Kafka message key and the second element is the Kafka message being passed through the topic

expand(pcoll)[source]
class beam_nuggets.io.kafkaio.KafkaProduce(topic=None, servers='127.0.0.1:9092')[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for pushing messages into an Apache Kafka topic. This class expects a tuple with the first element being the message key and the second element being the message. The transform uses KafkaProducer from the kafka python library.

Parameters:
  • topic – Kafka topic to publish to
  • servers – list of Kafka servers to listen to

Examples

Examples: Pushing message to a Kafka Topic notifications

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = (p
                     | "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')])
                     | "Pushing messages to Kafka" >> kafkaio.KafkaProduce(
                                                                            topic='notifications',
                                                                            servers="localhost:9092"
                                                                        )
                    )
    notifications | 'Writing to stdout' >> beam.Map(print)

The output will be something like

("dev_1", '{"device": "0001", status": "healthy"}')

Where the key is the Kafka topic published to and the element is the Kafka message produced

expand(pcoll)[source]