/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.samples;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Cancellation;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.Receiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;

public class SampleConsumer {
    private static final Logger log = LoggerFactory.getLogger((String)SampleConsumer.class.getName());
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "demo-topic";
    private final ReceiverOptions<Integer, String> receiverOptions;
    private final SimpleDateFormat dateFormat;

    public SampleConsumer(String bootstrapServers) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("client.id", "sample-consumer");
        props.put("group.id", "sample-group");
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        props.put("auto.offset.reset", "earliest");
        this.receiverOptions = ReceiverOptions.create(props);
        this.dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
    }

    public Cancellation consumeMessages(String topic, CountDownLatch latch) {
        ReceiverOptions options = this.receiverOptions.subscription(Collections.singleton(topic)).addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions)).addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        Flux kafkaFlux = Receiver.create((ReceiverOptions)options).receive();
        return kafkaFlux.subscribe(message -> {
            ReceiverOffset offset = message.offset();
            ConsumerRecord record = message.record();
            System.out.printf("Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n", offset.topicPartition(), offset.offset(), this.dateFormat.format(new Date(record.timestamp())), record.key(), record.value());
            latch.countDown();
        });
    }

    public static void main(String[] args) throws Exception {
        int count = 20;
        CountDownLatch latch = new CountDownLatch(count);
        SampleConsumer consumer = new SampleConsumer(BOOTSTRAP_SERVERS);
        Cancellation cancellation = consumer.consumeMessages(TOPIC, latch);
        latch.await(10L, TimeUnit.SECONDS);
        cancellation.dispose();
    }
}

