from __future__ import division, print_function
from apache_beam import PTransform, ParDo, DoFn, Create
from kafka import KafkaConsumer, KafkaProducer
[docs]class KafkaConsume(PTransform):
    """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading from an Apache Kafka topic. This is a streaming
    Transform that never returns. The transform uses `KafkaConsumer` from the
    `kafka` python library.
    It outputs a :class:`~apache_beam.pvalue.PCollection` of
    ``key-values:s``, each object is a Kafka message in the form (msg-key, msg)
    Args:
        consumer_config (dict): the kafka consumer configuration. The topic to
            be subscribed to should be specified with a key called 'topic'. The
            remaining configurations are those of `KafkaConsumer` from the
            `kafka` python library.
        value_decoder (function): Optional function to decode the consumed
            message value. If not specified, "bytes.decode" is used by default.
            "bytes.decode" which assumes "utf-8" encoding.
    Examples:
        Consuming from a Kafka Topic `notifications` ::
            import apache_beam as beam
            from apache_beam.options.pipeline_options import PipelineOptions
            from beam_nuggets.io import kafkaio
            consumer_config = {"topic": "notifications",
                               "bootstrap_servers": "localhost:9092",
                               "group_id": "notification_consumer_group"}
            with beam.Pipeline(options=PipelineOptions()) as p:
                notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(
                    consumer_config=consumer_config,
                    value_decoder=bytes.decode,  # optional
                )
                notifications | 'Writing to stdout' >> beam.Map(print)
        The output will be something like ::
            ("device 1", {"status": "healthy"})
            ("job #2647", {"status": "failed"})
        Where the first element of the tuple is the Kafka message key and the second element is the Kafka message being passed through the topic
    """
    def __init__(self, consumer_config, value_decoder=None, *args, **kwargs):
        """Initializes ``KafkaConsume``
        """
        super(KafkaConsume, self).__init__()
        self._consumer_args = dict(
            consumer_config=consumer_config,
            value_decoder=value_decoder,
        )
[docs]    def expand(self, pcoll):
        return (
            pcoll
            | Create([self._consumer_args])
            | ParDo(_ConsumeKafkaTopic())
        )  
class _ConsumeKafkaTopic(DoFn):
    """Internal ``DoFn`` to read from Kafka topic and return messages"""
    def process(self, consumer_args):
        consumer_config = consumer_args.pop('consumer_config')
        topic = consumer_config.pop('topic')
        value_decoder = consumer_args.pop('value_decoder') or bytes.decode
        consumer = KafkaConsumer(topic, **consumer_config)
        for msg in consumer:
            try:
                yield msg.key, value_decoder(msg.value)
            except Exception as e:
                print(e)
                continue
[docs]class KafkaProduce(PTransform):
    """A :class:`~apache_beam.transforms.ptransform.PTransform` for pushing messages
    into an Apache Kafka topic. This class expects a tuple with the first element being the message key
    and the second element being the message. The transform uses `KafkaProducer`
    from the `kafka` python library.
    Args:
        topic: Kafka topic to publish to
        servers: list of Kafka servers to listen to
    Examples:
        Examples:
        Pushing message to a Kafka Topic `notifications` ::
            from __future__ import print_function
            import apache_beam as beam
            from apache_beam.options.pipeline_options import PipelineOptions
            from beam_nuggets.io import kafkaio
            with beam.Pipeline(options=PipelineOptions()) as p:
                notifications = (p 
                                 | "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')])
                                 | "Pushing messages to Kafka" >> kafkaio.KafkaProduce(
                                                                                        topic='notifications',
                                                                                        servers="localhost:9092"
                                                                                    )
                                )
                notifications | 'Writing to stdout' >> beam.Map(print)
        The output will be something like ::
            ("dev_1", '{"device": "0001", status": "healthy"}')
        Where the key is the Kafka topic published to and the element is the Kafka message produced
    """
    def __init__(self, topic=None, servers='127.0.0.1:9092'):
        """Initializes ``KafkaProduce``
        """
        super(KafkaProduce, self).__init__()
        self._attributes = dict(
            topic=topic, 
            servers=servers)
[docs]    def expand(self, pcoll):
        return (
            pcoll
            | ParDo(_ProduceKafkaMessage(self._attributes))
        )  
class _ProduceKafkaMessage(DoFn):
    """Internal ``DoFn`` to publish message to Kafka topic"""
    def __init__(self, attributes, *args, **kwargs):
        super(_ProduceKafkaMessage, self).__init__(*args, **kwargs)
        self.attributes = attributes
    def start_bundle(self):
        self._producer = KafkaProducer(bootstrap_servers=self.attributes["servers"])
    def finish_bundle(self):
        self._producer.close()
    def process(self, element):
        try:
            self._producer.send(self.attributes['topic'], element[1].encode(), key=element[0].encode())
            yield element
        except Exception as e:
            raise