/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.samples;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Cancellation;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.Receiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.Sender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

public class SampleScenarios {
    private static final Logger log = LoggerFactory.getLogger((String)SampleScenarios.class.getName());
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String[] TOPICS = new String[]{"sample-topic1", "sample-topic2", "sample-topic3"};

    public static void main(String[] args) throws Exception {
        AbstractScenario sampleScenario;
        if (args.length != 1) {
            System.out.println("Usage: " + SampleScenarios.class.getName() + " <scenario>");
            System.exit(1);
        }
        Scenario scenario = Scenario.valueOf(args[0]);
        switch (scenario) {
            case KAFKA_SINK: {
                sampleScenario = new KafkaSink(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            }
            case KAFKA_SOURCE: {
                sampleScenario = new KafkaSource(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            }
            case KAFKA_TRANSFORM: {
                sampleScenario = new KafkaTransform(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1]);
                break;
            }
            case ATMOST_ONCE: {
                sampleScenario = new AtmostOnce(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1]);
                break;
            }
            case FAN_OUT: {
                sampleScenario = new FanOut(BOOTSTRAP_SERVERS, TOPICS[0], TOPICS[1], TOPICS[2]);
                break;
            }
            case PARTITION: {
                sampleScenario = new PartitionProcessor(BOOTSTRAP_SERVERS, TOPICS[0]);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported scenario " + (Object)((Object)scenario));
            }
        }
        sampleScenario.runScenario();
    }

    static abstract class AbstractScenario {
        String bootstrapServers = "localhost:9092";
        String groupId = "sample-group";
        CommittableSource source;
        Sender<Integer, Person> sender;
        List<Cancellation> cancellations = new ArrayList<Cancellation>();

        AbstractScenario(String bootstrapServers) {
            this.bootstrapServers = bootstrapServers;
        }

        public abstract Flux<?> flux();

        public void runScenario() throws InterruptedException {
            this.flux().blockLast();
            this.close();
        }

        public void close() {
            if (this.sender != null) {
                this.sender.close();
            }
            for (Cancellation cancellation : this.cancellations) {
                cancellation.dispose();
            }
        }

        public SenderOptions<Integer, Person> senderOptions() {
            HashMap<String, Object> props = new HashMap<String, Object>();
            props.put("bootstrap.servers", this.bootstrapServers);
            props.put("client.id", "sample-producer");
            props.put("acks", "all");
            props.put("key.serializer", IntegerSerializer.class);
            props.put("value.serializer", PersonSerDes.class);
            return SenderOptions.create(props);
        }

        public Sender<Integer, Person> sender(SenderOptions<Integer, Person> senderOptions) {
            return Sender.create(senderOptions);
        }

        public ReceiverOptions<Integer, Person> receiverOptions() {
            HashMap<String, Object> props = new HashMap<String, Object>();
            props.put("bootstrap.servers", this.bootstrapServers);
            props.put("group.id", this.groupId);
            props.put("client.id", "sample-consumer");
            props.put("key.deserializer", IntegerDeserializer.class);
            props.put("value.deserializer", PersonSerDes.class);
            return ReceiverOptions.create(props);
        }

        public ReceiverOptions<Integer, Person> receiverOptions(Collection<String> topics) {
            return this.receiverOptions().addAssignListener(p -> log.info("Group {} partitions assigned {}", (Object)this.groupId, p)).addRevokeListener(p -> log.info("Group {} partitions assigned {}", (Object)this.groupId, p)).subscription(topics);
        }

        public void source(CommittableSource source) {
            this.source = source;
        }

        public CommittableSource source() {
            return this.source;
        }
    }

    static class CommittableSource {
        private List<Person> sourceList = new ArrayList<Person>();

        CommittableSource() {
            this.sourceList.add(new Person(1, "John", "Doe"));
            this.sourceList.add(new Person(1, "Ada", "Lovelace"));
        }

        CommittableSource(List<Person> list) {
            this.sourceList.addAll(list);
        }

        Flux<Person> flux() {
            return Flux.fromIterable(this.sourceList);
        }

        void commit(int id) {
            log.trace("Committing {}", (Object)id);
        }
    }

    public static class PersonSerDes
    implements Serializer<Person>,
    Deserializer<Person> {
        public void configure(Map<String, ?> configs, boolean isKey) {
        }

        public byte[] serialize(String topic, Person person) {
            byte[] firstName = person.firstName().getBytes(StandardCharsets.UTF_8);
            byte[] lastName = person.lastName().getBytes(StandardCharsets.UTF_8);
            byte[] email = person.email().getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.allocate(8 + firstName.length + 4 + lastName.length + 4 + email.length);
            buffer.putInt(person.id());
            buffer.putInt(firstName.length);
            buffer.put(firstName);
            buffer.putInt(lastName.length);
            buffer.put(lastName);
            buffer.putInt(email.length);
            buffer.put(email);
            return buffer.array();
        }

        public Person deserialize(String topic, byte[] data) {
            ByteBuffer buffer = ByteBuffer.wrap(data);
            int id = buffer.getInt();
            byte[] first = new byte[buffer.getInt()];
            buffer.get(first);
            String firstName = new String(first, StandardCharsets.UTF_8);
            byte[] last = new byte[buffer.getInt()];
            buffer.get(last);
            String lastName = new String(last, StandardCharsets.UTF_8);
            Person person = new Person(id, firstName, lastName);
            byte[] email = new byte[buffer.getInt()];
            if (email.length > 0) {
                buffer.get(email);
                person.email(new String(email, StandardCharsets.UTF_8));
            }
            return person;
        }

        public void close() {
        }
    }

    public static class Person {
        private final int id;
        private final String firstName;
        private final String lastName;
        private String email;

        public Person(int id, String firstName, String lastName) {
            this.id = id;
            this.firstName = firstName;
            this.lastName = lastName;
        }

        public int id() {
            return this.id;
        }

        public String firstName() {
            return this.firstName;
        }

        public String lastName() {
            return this.lastName;
        }

        public void email(String email) {
            this.email = email;
        }

        public String email() {
            return this.email == null ? "" : this.email;
        }

        public boolean equals(Object other) {
            if (!(other instanceof Person)) {
                return false;
            }
            Person p = (Person)other;
            if (this.id != p.id) {
                return false;
            }
            return this.stringEquals(this.firstName, p.firstName) && this.stringEquals(this.lastName, p.lastName) && this.stringEquals(this.email, p.email);
        }

        public int hashCode() {
            int result = Integer.hashCode(this.id);
            result = 31 * result + (this.firstName != null ? this.firstName.hashCode() : 0);
            result = 31 * result + (this.lastName != null ? this.lastName.hashCode() : 0);
            return result;
        }

        public String toString() {
            return "Person{id='" + this.id + '\'' + ", firstName='" + this.firstName + '\'' + ", lastName='" + this.lastName + '\'' + '}';
        }

        private boolean stringEquals(String str1, String str2) {
            return str1 == null ? str2 == null : str1.equals(str2);
        }
    }

    public static class PartitionProcessor
    extends AbstractScenario {
        private final String topic;

        public PartitionProcessor(String bootstrapServers, String topic) {
            super(bootstrapServers);
            this.topic = topic;
        }

        @Override
        public Flux<?> flux() {
            Scheduler scheduler = Schedulers.newElastic((String)"sample", (int)60, (boolean)true);
            return Receiver.create((ReceiverOptions)this.receiverOptions(Collections.singleton(this.topic)).commitInterval(Duration.ZERO)).receive().groupBy(m -> m.offset().topicPartition()).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler).map(r -> this.processRecord((TopicPartition)partitionFlux.key(), (ReceiverRecord<Integer, Person>)r)).sample(Duration.ofMillis(5000L)).concatMap(offset -> offset.commit()));
        }

        public ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, Person> message) {
            log.info("Processing record {} from partition {} in thread{}", new Object[]{((Person)message.record().value()).id(), topicPartition, Thread.currentThread().getName()});
            return message.offset();
        }
    }

    public static class FanOut
    extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic1;
        private final String destTopic2;

        public FanOut(String bootstrapServers, String sourceTopic, String destTopic1, String destTopic2) {
            super(bootstrapServers);
            this.sourceTopic = sourceTopic;
            this.destTopic1 = destTopic1;
            this.destTopic2 = destTopic2;
        }

        @Override
        public Flux<?> flux() {
            Scheduler scheduler1 = Schedulers.newSingle((String)"sample1", (boolean)true);
            Scheduler scheduler2 = Schedulers.newSingle((String)"sample2", (boolean)true);
            this.sender = this.sender(this.senderOptions());
            EmitterProcessor processor = EmitterProcessor.create();
            BlockingSink incoming = processor.connectSink();
            Flux inFlux = Receiver.create((ReceiverOptions)this.receiverOptions(Collections.singleton(this.sourceTopic))).receiveAutoAck().concatMap(r -> r).doOnNext(m -> {
                BlockingSink.Emission emission = incoming.emit(m.value());
                log.debug("Emit {} {}", (Object)((Person)m.value()).id(), (Object)emission);
            });
            Flux stream1 = this.sender.send((Publisher)processor.publishOn(scheduler1).map(p -> SenderRecord.create(this.process1((Person)p, true), (Object)p.id())), false);
            Flux stream2 = this.sender.send((Publisher)processor.publishOn(scheduler2).map(p -> SenderRecord.create(this.process2((Person)p, true), (Object)p.id())), false);
            AtomicReference cancelRef = new AtomicReference();
            Consumer<AtomicReference> cancel = cr -> {
                Cancellation c = cr.getAndSet(null);
                if (c != null) {
                    c.dispose();
                }
            };
            return Flux.merge((Publisher[])new Publisher[]{stream1, stream2}).doOnSubscribe(s -> cancelRef.set(inFlux.subscribe())).doOnCancel(() -> cancel.accept(cancelRef));
        }

        public ProducerRecord<Integer, Person> process1(Person p, boolean debug) {
            if (debug) {
                log.debug("Processing person {} on stream1 in thread {}", (Object)p.id(), (Object)Thread.currentThread().getName());
            }
            Person transformed = new Person(p.id(), p.firstName(), p.lastName());
            transformed.email(p.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord(this.destTopic1, (Object)p.id(), (Object)transformed);
        }

        public ProducerRecord<Integer, Person> process2(Person p, boolean debug) {
            if (debug) {
                log.debug("Processing person {} on stream2 in thread {}", (Object)p.id(), (Object)Thread.currentThread().getName());
            }
            Person transformed = new Person(p.id(), p.firstName(), p.lastName());
            transformed.email(p.lastName().toLowerCase(Locale.ROOT) + "@reactor.io");
            return new ProducerRecord(this.destTopic2, (Object)p.id(), (Object)transformed);
        }
    }

    public static class AtmostOnce
    extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic;

        public AtmostOnce(String bootstrapServers, String sourceTopic, String destTopic) {
            super(bootstrapServers);
            this.sourceTopic = sourceTopic;
            this.destTopic = destTopic;
        }

        @Override
        public Flux<?> flux() {
            SenderOptions senderOptions = this.senderOptions().producerProperty("acks", (Object)"0").producerProperty("retries", (Object)"0");
            return this.sender(senderOptions).send((Publisher)Receiver.create((ReceiverOptions)this.receiverOptions(Collections.singleton(this.sourceTopic))).receiveAtmostOnce().map(cr -> SenderRecord.create(this.transform((Person)cr.value()), (Object)cr.offset())), true);
        }

        public ProducerRecord<Integer, Person> transform(Person p) {
            Person transformed = new Person(p.id(), p.firstName(), p.lastName());
            transformed.email(p.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord(this.destTopic, (Object)p.id(), (Object)transformed);
        }
    }

    public static class KafkaTransform
    extends AbstractScenario {
        private final String sourceTopic;
        private final String destTopic;

        public KafkaTransform(String bootstrapServers, String sourceTopic, String destTopic) {
            super(bootstrapServers);
            this.sourceTopic = sourceTopic;
            this.destTopic = destTopic;
        }

        @Override
        public Flux<?> flux() {
            Sender sender = this.sender(this.senderOptions());
            return sender.send((Publisher)Receiver.create((ReceiverOptions)this.receiverOptions(Collections.singleton(this.sourceTopic))).receive().map(m -> SenderRecord.create(this.transform((Person)m.record().value()), (Object)m.offset())), false).doOnNext(m -> ((ReceiverOffset)m.correlationMetadata()).acknowledge());
        }

        public ProducerRecord<Integer, Person> transform(Person p) {
            Person transformed = new Person(p.id(), p.firstName(), p.lastName());
            transformed.email(p.firstName().toLowerCase(Locale.ROOT) + "@kafka.io");
            return new ProducerRecord(this.destTopic, (Object)p.id(), (Object)transformed);
        }
    }

    public static class KafkaSource
    extends AbstractScenario {
        private final String topic;

        public KafkaSource(String bootstrapServers, String topic) {
            super(bootstrapServers);
            this.topic = topic;
        }

        @Override
        public Flux<?> flux() {
            return Receiver.create((ReceiverOptions)this.receiverOptions(Collections.singletonList(this.topic)).commitInterval(Duration.ZERO)).receive().publishOn(Schedulers.newSingle((String)"sample", (boolean)true)).concatMap(m -> this.storeInDB((Person)m.record().value()).doOnSuccess(r -> {
                Void cfr_ignored_0 = (Void)m.offset().commit().block();
            })).retry();
        }

        public Mono<Void> storeInDB(Person person) {
            log.info("Successfully processed person with id {} from Kafka", (Object)person.id());
            return Mono.empty();
        }
    }

    public static class KafkaSink
    extends AbstractScenario {
        private final String topic;

        public KafkaSink(String bootstrapServers, String topic) {
            super(bootstrapServers);
            this.topic = topic;
        }

        @Override
        public Flux<?> flux() {
            SenderOptions senderOptions = this.senderOptions().producerProperty("acks", (Object)"all").producerProperty("max.block.ms", (Object)Long.MAX_VALUE).producerProperty("retries", (Object)Integer.MAX_VALUE);
            Flux<Person> srcFlux = this.source().flux();
            return this.sender(senderOptions).send((Publisher)srcFlux.map(p -> SenderRecord.create((ProducerRecord)new ProducerRecord(this.topic, (Object)p.id(), p), (Object)p.id())), false).doOnError(e -> log.error("Send failed, terminating.", e)).doOnNext(r -> {
                int id = (Integer)r.correlationMetadata();
                log.trace("Successfully stored person with id {} in Kafka", (Object)id);
                this.source.commit(id);
            });
        }
    }

    static enum Scenario {
        KAFKA_SINK,
        KAFKA_SOURCE,
        KAFKA_TRANSFORM,
        ATMOST_ONCE,
        FAN_OUT,
        PARTITION;

    }
}

