package reactor.kafka.samples;

import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;

/* loaded from: input_file:reactor/kafka/samples/SampleConsumer.class */
public class SampleConsumer {
    private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class.getName());
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "demo-topic";
    private final ReceiverOptions<Integer, String> receiverOptions;
    private final SimpleDateFormat dateFormat;

    public SampleConsumer(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str);
        hashMap.put("client.id", "sample-consumer");
        hashMap.put("group.id", "sample-group");
        hashMap.put("key.deserializer", IntegerDeserializer.class);
        hashMap.put("value.deserializer", StringDeserializer.class);
        hashMap.put("auto.offset.reset", "earliest");
        this.receiverOptions = ReceiverOptions.create(hashMap);
        this.dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
    }

    public Disposable consumeMessages(String str, CountDownLatch countDownLatch) {
        return KafkaReceiver.create(this.receiverOptions.subscription(Collections.singleton(str)).addAssignListener(collection -> {
            log.debug("onPartitionsAssigned {}", collection);
        }).addRevokeListener(collection2 -> {
            log.debug("onPartitionsRevoked {}", collection2);
        })).receive().subscribe(receiverRecord -> {
            ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
            System.out.printf("Received message: topic-partition=%s offset=%d timestamp=%s key=%d value=%s\n", receiverOffset.topicPartition(), Long.valueOf(receiverOffset.offset()), this.dateFormat.format(new Date(receiverRecord.timestamp())), receiverRecord.key(), receiverRecord.value());
            receiverOffset.acknowledge();
            countDownLatch.countDown();
        });
    }

    public static void main(String[] strArr) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(20);
        Disposable consumeMessages = new SampleConsumer(BOOTSTRAP_SERVERS).consumeMessages(TOPIC, countDownLatch);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        consumeMessages.dispose();
    }
}
