beam_nuggets.io.kafkaio module¶
-
class
beam_nuggets.io.kafkaio.
KafkaConsume
(consumer_config, value_decoder=None, *args, **kwargs)[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
A
PTransform
for reading from an Apache Kafka topic. This is a streaming Transform that never returns. The transform uses KafkaConsumer from the kafka python library.It outputs a
PCollection
ofkey-values:s
, each object is a Kafka message in the form (msg-key, msg)Parameters: - consumer_config (dict) – the kafka consumer configuration. The topic to be subscribed to should be specified with a key called ‘topic’. The remaining configurations are those of KafkaConsumer from the kafka python library.
- value_decoder (function) – Optional function to decode the consumed message value. If not specified, “bytes.decode” is used by default. “bytes.decode” which assumes “utf-8” encoding.
Examples
Consuming from a Kafka Topic notifications
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import kafkaio consumer_config = {"topic": "notifications", "bootstrap_servers": "localhost:9092", "group_id": "notification_consumer_group"} with beam.Pipeline(options=PipelineOptions()) as p: notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume( consumer_config=consumer_config, value_decoder=bytes.decode, # optional ) notifications | 'Writing to stdout' >> beam.Map(print)
The output will be something like
("device 1", {"status": "healthy"}) ("job #2647", {"status": "failed"})
Where the first element of the tuple is the Kafka message key and the second element is the Kafka message being passed through the topic
-
class
beam_nuggets.io.kafkaio.
KafkaProduce
(topic=None, servers='127.0.0.1:9092')[source]¶ Bases:
apache_beam.transforms.ptransform.PTransform
A
PTransform
for pushing messages into an Apache Kafka topic. This class expects a tuple with the first element being the message key and the second element being the message. The transform uses KafkaProducer from the kafka python library.Parameters: - topic – Kafka topic to publish to
- servers – list of Kafka servers to listen to
Examples
Examples: Pushing message to a Kafka Topic notifications
from __future__ import print_function import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import kafkaio with beam.Pipeline(options=PipelineOptions()) as p: notifications = (p | "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')]) | "Pushing messages to Kafka" >> kafkaio.KafkaProduce( topic='notifications', servers="localhost:9092" ) ) notifications | 'Writing to stdout' >> beam.Map(print)
The output will be something like
("dev_1", '{"device": "0001", status": "healthy"}')
Where the key is the Kafka topic published to and the element is the Kafka message produced