/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.samples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.Sender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class SampleProducer {
    private static final Logger log = LoggerFactory.getLogger((String)SampleProducer.class.getName());
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "demo-topic";
    private final Sender<Integer, String> sender;
    private final SimpleDateFormat dateFormat;

    public SampleProducer(String bootstrapServers) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("client.id", "sample-producer");
        props.put("acks", "all");
        props.put("key.serializer", IntegerSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        SenderOptions senderOptions = SenderOptions.create(props);
        this.sender = Sender.create((SenderOptions)senderOptions);
        this.dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy");
    }

    public void sendMessages(String topic, int count, CountDownLatch latch) throws InterruptedException {
        this.sender.send((Publisher)Flux.range((int)1, (int)count).map(i -> SenderRecord.create((ProducerRecord)new ProducerRecord(topic, i, (Object)("Message_" + i)), (Object)i)), true).doOnError(e -> log.error("Send failed", e)).subscribe(r -> {
            RecordMetadata metadata = r.recordMetadata();
            System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n", r.correlationMetadata(), metadata.topic(), metadata.partition(), metadata.offset(), this.dateFormat.format(new Date(metadata.timestamp())));
            latch.countDown();
        });
    }

    public void close() {
        this.sender.close();
    }

    public static void main(String[] args) throws Exception {
        int count = 20;
        CountDownLatch latch = new CountDownLatch(count);
        SampleProducer producer = new SampleProducer(BOOTSTRAP_SERVERS);
        producer.sendMessages(TOPIC, count, latch);
        latch.await(10L, TimeUnit.SECONDS);
        producer.close();
    }
}

