package reactor.kafka.tools.perf;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

/* loaded from: input_file:reactor/kafka/tools/perf/EndToEndLatency.class */
public class EndToEndLatency {

    /* loaded from: input_file:reactor/kafka/tools/perf/EndToEndLatency$AbstractEndToEndLatency.class */
    static abstract class AbstractEndToEndLatency {
        final String topic;
        final Map<String, Object> consumerProps = new HashMap();
        final Map<String, Object> producerProps;

        AbstractEndToEndLatency(Map<String, Object> map, Map<String, Object> map2, String str, String str2) {
            this.topic = str2;
            this.consumerProps.put("bootstrap.servers", str);
            this.consumerProps.put("group.id", "test-group-" + System.currentTimeMillis());
            this.consumerProps.put("enable.auto.commit", "false");
            this.consumerProps.put("auto.offset.reset", "latest");
            this.consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
            this.consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
            this.consumerProps.put("fetch.max.wait.ms", "0");
            this.consumerProps.putAll(map);
            this.producerProps = new HashMap();
            this.producerProps.put("bootstrap.servers", str);
            this.producerProps.put("linger.ms", "0");
            this.producerProps.put("max.block.ms", String.valueOf(Long.MAX_VALUE));
            this.producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.producerProps.putAll(map2);
        }

        public double[] runTest(int i, int i2, Long l) throws Exception {
            long j;
            double d = 0.0d;
            double[] dArr = new double[i];
            Random random = new Random(0L);
            initialize();
            for (int i3 = 0; i3 < i; i3++) {
                byte[] randomBytesOfLen = EndToEndLatency.randomBytesOfLen(random, i2);
                long nanoTime = System.nanoTime();
                Iterator<ConsumerRecord<byte[], byte[]>> sendAndReceive = sendAndReceive(this.topic, randomBytesOfLen, l.longValue());
                long nanoTime2 = System.nanoTime() - nanoTime;
                if (!sendAndReceive.hasNext()) {
                    close();
                    throw new RuntimeException("poll() timed out before finding a result : timeout=" + l);
                }
                String str = new String(randomBytesOfLen);
                String str2 = new String((byte[]) sendAndReceive.next().value());
                if (!str2.equals(str)) {
                    close();
                    throw new RuntimeException("The message read " + str2 + " did not match the message sent " + str);
                }
                long j2 = 0;
                while (true) {
                    j = j2;
                    if (!sendAndReceive.hasNext()) {
                        break;
                    }
                    sendAndReceive.next();
                    j2 = j + 1;
                }
                if (j > 0) {
                    throw new RuntimeException("Only one result was expected during this test. We found " + j);
                }
                if (i3 % 1000 == 0) {
                    System.out.println(i3 + "\t" + ((nanoTime2 / 1000.0d) / 1000.0d));
                }
                d += nanoTime2;
                dArr[i3] = (nanoTime2 / 1000.0d) / 1000.0d;
            }
            System.out.printf("Avg latency: %.4f ms\n\n", Double.valueOf(((d / i) / 1000.0d) / 1000.0d));
            Arrays.sort(dArr);
            System.out.printf("Percentiles: 50th = %.4f, 75th = %.4f, 90th = %.4f, 99th = %.4f, 99.9th = %.4f\n", Double.valueOf(dArr[(int) (dArr.length * 0.5d)]), Double.valueOf(dArr[(int) (dArr.length * 0.75d)]), Double.valueOf(dArr[(int) (dArr.length * 0.9d)]), Double.valueOf(dArr[(int) (dArr.length * 0.99d)]), Double.valueOf(dArr[(int) (dArr.length * 0.999d)]));
            close();
            return dArr;
        }

        abstract void initialize();

        abstract Iterator<ConsumerRecord<byte[], byte[]>> sendAndReceive(String str, byte[] bArr, long j) throws Exception;

        abstract void close();
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/EndToEndLatency$NonReactiveEndToEndLatency.class */
    static class NonReactiveEndToEndLatency extends AbstractEndToEndLatency {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final KafkaProducer<byte[], byte[]> producer;
        private final AtomicBoolean isAssigned;

        NonReactiveEndToEndLatency(Map<String, Object> map, Map<String, Object> map2, String str, String str2) {
            super(map, map2, str, str2);
            this.isAssigned = new AtomicBoolean();
            this.consumer = new KafkaConsumer<>(this.consumerProps);
            this.consumer.subscribe(Collections.singletonList(str2), new ConsumerRebalanceListener() { // from class: reactor.kafka.tools.perf.EndToEndLatency.NonReactiveEndToEndLatency.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    NonReactiveEndToEndLatency.this.isAssigned.set(true);
                }
            });
            this.producer = new KafkaProducer<>(this.producerProps);
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public void initialize() {
            long currentTimeMillis = System.currentTimeMillis() + 10000;
            while (!this.isAssigned.get() && System.currentTimeMillis() < currentTimeMillis) {
                this.consumer.poll(100L);
            }
            if (!this.isAssigned.get()) {
                throw new IllegalStateException("Timed out waiting for assignment");
            }
            this.consumer.seekToEnd(Collections.emptyList());
            this.consumer.poll(0L);
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public Iterator<ConsumerRecord<byte[], byte[]>> sendAndReceive(String str, byte[] bArr, long j) throws Exception {
            this.producer.send(new ProducerRecord(str, bArr)).get();
            return this.consumer.poll(j).iterator();
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public void close() {
            if (this.consumer != null) {
                this.consumer.commitSync();
                this.consumer.close();
            }
            if (this.producer != null) {
                this.producer.close();
            }
        }
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/EndToEndLatency$ReactiveEndToEndLatency.class */
    static class ReactiveEndToEndLatency extends AbstractEndToEndLatency {
        final KafkaSender<byte[], byte[]> sender;
        final Flux<ReceiverRecord<byte[], byte[]>> flux;
        final LinkedBlockingQueue<ConsumerRecord<byte[], byte[]>> receiveQueue;
        final Semaphore sendSemaphore;
        final Semaphore assignSemaphore;
        Disposable subscribeDisposable;

        ReactiveEndToEndLatency(Map<String, Object> map, Map<String, Object> map2, String str, String str2) {
            super(map, map2, str, str2);
            this.sendSemaphore = new Semaphore(0);
            this.assignSemaphore = new Semaphore(0);
            this.sender = KafkaSender.create(SenderOptions.create(this.producerProps));
            this.flux = KafkaReceiver.create(ReceiverOptions.create(this.consumerProps).addAssignListener(collection -> {
                if (this.assignSemaphore.availablePermits() == 0) {
                    collection.forEach(receiverPartition -> {
                        receiverPartition.seekToEnd();
                    });
                    this.assignSemaphore.release();
                }
            }).subscription(Collections.singleton(str2))).receive();
            this.receiveQueue = new LinkedBlockingQueue<>();
            System.out.println("Running latency test using Reactive API, class=" + getClass().getName());
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public void initialize() {
            this.subscribeDisposable = this.flux.subscribe(receiverRecord -> {
                this.receiveQueue.offer(receiverRecord);
            });
            try {
                if (this.assignSemaphore.tryAcquire(10L, TimeUnit.SECONDS)) {
                } else {
                    throw new IllegalStateException("Timed out waiting for assignment");
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public Iterator<ConsumerRecord<byte[], byte[]>> sendAndReceive(String str, byte[] bArr, long j) throws Exception {
            this.sender.createOutbound().send(Mono.just(new ProducerRecord(str, bArr))).then().doOnSuccess(r3 -> {
                this.sendSemaphore.release();
            }).subscribe();
            this.sendSemaphore.acquire();
            ConsumerRecord<byte[], byte[]> poll = this.receiveQueue.poll(j, TimeUnit.MILLISECONDS);
            ArrayList arrayList = new ArrayList();
            if (poll != null) {
                arrayList.add(poll);
            }
            this.receiveQueue.drainTo(arrayList);
            return arrayList.iterator();
        }

        @Override // reactor.kafka.tools.perf.EndToEndLatency.AbstractEndToEndLatency
        public void close() {
            if (this.sender != null) {
                this.sender.close();
            }
            if (this.subscribeDisposable != null) {
                this.subscribeDisposable.dispose();
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser argParser = argParser();
        try {
            Namespace parseArgs = argParser.parseArgs(strArr);
            String string = parseArgs.getString("bootstrapServers");
            String string2 = parseArgs.getString("topic");
            int intValue = parseArgs.getInt("messages").intValue();
            int intValue2 = parseArgs.getInt("messageSize").intValue();
            boolean booleanValue = parseArgs.getBoolean("reactive").booleanValue();
            Map<String, Object> properties = getProperties(parseArgs.getList("consumerConfig"));
            Map<String, Object> properties2 = getProperties(parseArgs.getList("producerConfig"));
            (booleanValue ? new ReactiveEndToEndLatency(properties, properties2, string, string2) : new NonReactiveEndToEndLatency(properties, properties2, string, string2)).runTest(intValue, intValue2, 60000L);
            System.exit(0);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                argParser.printHelp();
                System.exit(0);
            } else {
                argParser.handleError(e);
                System.exit(1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static byte[] randomBytesOfLen(Random random, int i) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return bArr;
    }

    private static ArgumentParser argParser() {
        ArgumentParser description = ArgumentParsers.newArgumentParser("end-to-end-latency").defaultHelp(true).description("This tool is used to verify end to end latency.");
        description.addArgument(new String[]{"--bootstrap-servers"}).action(Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).dest("bootstrapServers").help("kafka bootstrap servers");
        description.addArgument(new String[]{"--topic"}).action(Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("produce messages to this topic");
        description.addArgument(new String[]{"--messages"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"MESSAGES"}).help("number of messages to produce");
        description.addArgument(new String[]{"--message-size"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"MESSAGE-SIZE"}).dest("messageSize").help("size of messages to produce");
        description.addArgument(new String[]{"--consumer-props"}).nargs("+").required(false).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("consumerConfig").help("kafka consumer related configuration properties like client.id etc..");
        description.addArgument(new String[]{"--producer-props"}).nargs("+").required(false).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("producerConfig").help("kafka producer related configuration properties like client.id etc..");
        description.addArgument(new String[]{"--reactive"}).action(Arguments.store()).type(Boolean.class).metavar(new String[]{"REACTIVE"}).setDefault(false).help("if true, use reactive API");
        return description;
    }

    private static Map<String, Object> getProperties(List<String> list) {
        HashMap hashMap = new HashMap();
        if (list != null) {
            for (String str : list) {
                String[] split = str.split("=");
                if (split.length != 2) {
                    throw new IllegalArgumentException("Invalid property: " + str);
                }
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
