package reactor.kafka.tools.perf;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

/* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance.class */
public class ProducerPerformance {
    private static final long DEFAULT_PRODUCER_BUFFER_SIZE = 33554432;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance$AbstractProducerPerformance.class */
    public static abstract class AbstractProducerPerformance {
        final int numRecords;
        final int recordSize;
        final boolean transactionsEnabled;
        final long transactionDurationMs;
        final ProducerRecord<byte[], byte[]> record;
        final Map<String, Object> producerProps;
        final ThroughputThrottler throttler;
        final Stats stats;

        AbstractProducerPerformance(Map<String, Object> map, String str, int i, int i2, long j, String str2, long j2) {
            this.numRecords = i;
            this.recordSize = i2;
            byte[] bArr = new byte[i2];
            Random random = new Random(0L);
            for (int i3 = 0; i3 < bArr.length; i3++) {
                bArr[i3] = (byte) (random.nextInt(26) + 65);
            }
            this.record = new ProducerRecord<>(str, bArr);
            this.producerProps = new HashMap(map);
            this.producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.transactionDurationMs = j2;
            this.transactionsEnabled = j2 > 0;
            if (this.transactionsEnabled) {
                this.producerProps.put("transactional.id", str2);
            }
            this.stats = new Stats(i, 5000);
            this.throttler = new ThroughputThrottler(j, System.currentTimeMillis());
        }

        public abstract Stats runTest() throws InterruptedException;
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance$NonReactiveProducerPerformance.class */
    static class NonReactiveProducerPerformance extends AbstractProducerPerformance {
        NonReactiveProducerPerformance(Map<String, Object> map, String str, int i, int i2, long j, String str2, long j2) {
            super(map, str, i, i2, j, str2, j2);
        }

        @Override // reactor.kafka.tools.perf.ProducerPerformance.AbstractProducerPerformance
        public Stats runTest() {
            System.out.println("Running producer performance test using non-reactive API, class=" + getClass().getSimpleName() + " messageSize=" + this.recordSize);
            KafkaProducer kafkaProducer = new KafkaProducer(this.producerProps);
            if (this.transactionsEnabled) {
                kafkaProducer.initTransactions();
            }
            int i = 0;
            long j = 0;
            for (int i2 = 0; i2 < this.numRecords; i2++) {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.transactionsEnabled && i == 0) {
                    kafkaProducer.beginTransaction();
                    j = currentTimeMillis;
                }
                kafkaProducer.send(this.record, this.stats.nextCompletion(currentTimeMillis, this.recordSize, this.stats));
                i++;
                if (this.transactionsEnabled && this.transactionDurationMs <= currentTimeMillis - j) {
                    kafkaProducer.commitTransaction();
                    i = 0;
                }
                if (this.throttler.shouldThrottle(i2, currentTimeMillis)) {
                    this.throttler.throttle();
                }
            }
            if (this.transactionsEnabled && i != 0) {
                kafkaProducer.commitTransaction();
            }
            this.stats.complete();
            kafkaProducer.close();
            return this.stats;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance$PerfCallback.class */
    public static final class PerfCallback implements Callback {
        private final long start;
        private final int iteration;
        private final int bytes;
        private final Stats stats;

        public PerfCallback(int i, long j, int i2, Stats stats) {
            this.start = j;
            this.stats = stats;
            this.iteration = i;
            this.bytes = i2;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            long currentTimeMillis = System.currentTimeMillis();
            this.stats.record(this.iteration, (int) (currentTimeMillis - this.start), this.bytes, currentTimeMillis);
            if (exc != null) {
                exc.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance$ReactiveProducerPerformance.class */
    static class ReactiveProducerPerformance extends AbstractProducerPerformance {
        final KafkaSender<byte[], byte[]> sender;

        ReactiveProducerPerformance(Map<String, Object> map, String str, int i, int i2, long j, String str2, long j2) {
            super(map, str, i, i2, j, str2, j2);
            SenderOptions maxInFlight = SenderOptions.create(this.producerProps).maxInFlight(maxInflight());
            this.sender = KafkaSender.create(this.transactionsEnabled ? maxInFlight : maxInFlight.scheduler(Schedulers.newSingle("prod-perf", true)));
        }

        @Override // reactor.kafka.tools.perf.ProducerPerformance.AbstractProducerPerformance
        public Stats runTest() throws InterruptedException {
            System.out.println("Running producer performance test using reactive API, class=" + getClass().getSimpleName() + " messageSize=" + this.recordSize + ", maxInflight=" + maxInflight());
            CountDownLatch countDownLatch = new CountDownLatch(this.numRecords);
            Disposable subscribe = (this.transactionsEnabled ? transactionalFlux(countDownLatch) : senderFlux(countDownLatch)).subscribe();
            countDownLatch.await();
            this.stats.complete();
            subscribe.dispose();
            this.sender.close();
            return this.stats;
        }

        Flux<?> senderFlux(CountDownLatch countDownLatch) {
            return this.sender.send(sourceFlux()).map(senderResult -> {
                return processResult(countDownLatch, senderResult);
            });
        }

        Flux<?> transactionalFlux(CountDownLatch countDownLatch) {
            AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() + this.transactionDurationMs);
            return this.sender.sendTransactionally(sourceFlux().publishOn(this.sender.transactionManager().scheduler()).windowUntil(senderRecord -> {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis < atomicLong.get()) {
                    return false;
                }
                atomicLong.set(currentTimeMillis + this.transactionDurationMs);
                return true;
            })).concatMap(flux -> {
                return flux;
            }).map(senderResult -> {
                return processResult(countDownLatch, senderResult);
            });
        }

        private Flux<SenderRecord<byte[], byte[], Callback>> sourceFlux() {
            return Flux.range(1, this.numRecords).map(num -> {
                long currentTimeMillis = System.currentTimeMillis();
                if (this.throttler.shouldThrottle(num.intValue(), currentTimeMillis)) {
                    this.throttler.throttle();
                }
                return SenderRecord.create(this.record, this.stats.nextCompletion(currentTimeMillis, this.recordSize, this.stats));
            });
        }

        private RecordMetadata processResult(CountDownLatch countDownLatch, SenderResult<Callback> senderResult) {
            RecordMetadata recordMetadata = senderResult.recordMetadata();
            ((Callback) senderResult.correlationMetadata()).onCompletion(recordMetadata, (Exception) null);
            countDownLatch.countDown();
            return recordMetadata;
        }

        int maxInflight() {
            String str = (String) this.producerProps.get("buffer.memory");
            int parseLong = (int) ((str != null ? Long.parseLong(str) : ProducerPerformance.DEFAULT_PRODUCER_BUFFER_SIZE) / (1 << (this.recordSize < 2 ? 0 : 31 - Integer.numberOfLeadingZeros(this.recordSize - 1))));
            if (parseLong > 65536) {
                parseLong = 65536;
            }
            return parseLong;
        }
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/ProducerPerformance$Stats.class */
    static class Stats {
        private int[] latencies;
        private int sampling;
        private int index;
        private long count;
        private long bytes;
        private long totalLatency;
        private long reportingInterval;
        private long completionTime;
        private long start = System.currentTimeMillis();
        private long windowStart = System.currentTimeMillis();
        private int iteration = 0;
        private int maxLatency = 0;
        private long windowCount = 0;
        private int windowMaxLatency = 0;
        private long windowTotalLatency = 0;
        private long windowBytes = 0;

        public Stats(long j, int i) {
            this.index = 0;
            this.sampling = (int) (j / Math.min(j, 500000L));
            this.latencies = new int[((int) (j / this.sampling)) + 1];
            this.index = 0;
            this.totalLatency = 0L;
            this.totalLatency = 0L;
            this.reportingInterval = i;
        }

        public void record(int i, int i2, int i3, long j) {
            this.count++;
            this.bytes += i3;
            this.totalLatency += i2;
            this.maxLatency = Math.max(this.maxLatency, i2);
            this.windowCount++;
            this.windowBytes += i3;
            this.windowTotalLatency += i2;
            this.windowMaxLatency = Math.max(this.windowMaxLatency, i2);
            if (i % this.sampling == 0) {
                this.latencies[this.index] = i2;
                this.index++;
            }
            if (j - this.windowStart >= this.reportingInterval) {
                printWindow();
                newWindow();
            }
        }

        public Callback nextCompletion(long j, int i, Stats stats) {
            PerfCallback perfCallback = new PerfCallback(this.iteration, j, i, stats);
            this.iteration++;
            return perfCallback;
        }

        public void printWindow() {
            long currentTimeMillis = System.currentTimeMillis() - this.windowStart;
            System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", Long.valueOf(this.windowCount), Double.valueOf((1000.0d * this.windowCount) / currentTimeMillis), Double.valueOf(((1000.0d * this.windowBytes) / currentTimeMillis) / 1048576.0d), Double.valueOf(this.windowTotalLatency / this.windowCount), Double.valueOf(this.windowMaxLatency));
        }

        public void newWindow() {
            this.windowStart = System.currentTimeMillis();
            this.windowCount = 0L;
            this.windowMaxLatency = 0;
            this.windowTotalLatency = 0L;
            this.windowBytes = 0L;
        }

        public void complete() {
            if (this.completionTime == 0) {
                this.completionTime = System.currentTimeMillis();
            }
        }

        public void printTotal() {
            complete();
            long j = this.completionTime - this.start;
            double d = (1000.0d * this.count) / j;
            double d2 = ((1000.0d * this.bytes) / j) / 1048576.0d;
            int[] percentiles = percentiles(0.5d, 0.75d, 0.95d, 0.99d, 0.999d);
            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 75th %d ms 95th, %d ms 99th, %d ms 99.9th.\n", Long.valueOf(this.count), Double.valueOf(d), Double.valueOf(d2), Double.valueOf(this.totalLatency / this.count), Double.valueOf(this.maxLatency), Integer.valueOf(percentiles[0]), Integer.valueOf(percentiles[1]), Integer.valueOf(percentiles[2]), Integer.valueOf(percentiles[3]), Integer.valueOf(percentiles[4]));
        }

        int[] percentiles(double... dArr) {
            int min = Math.min((int) this.count, this.latencies.length);
            Arrays.sort(this.latencies, 0, min);
            int[] iArr = new int[dArr.length];
            for (int i = 0; i < dArr.length; i++) {
                iArr[i] = this.latencies[(int) (dArr[i] * min)];
            }
            return iArr;
        }

        double recordsPerSec() {
            return (1000.0d * this.count) / (this.completionTime - this.start);
        }

        long count() {
            return this.count;
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser argParser = argParser();
        try {
            Namespace parseArgs = argParser.parseArgs(strArr);
            String string = parseArgs.getString("topic");
            int intValue = parseArgs.getInt("numRecords").intValue();
            int intValue2 = parseArgs.getInt("recordSize").intValue();
            int intValue3 = parseArgs.getInt("throughput").intValue();
            String string2 = parseArgs.getString("transactionalId");
            long longValue = parseArgs.getLong("transactionDurationMs").longValue();
            boolean booleanValue = parseArgs.getBoolean("reactive").booleanValue();
            Map<String, Object> properties = getProperties(parseArgs.getList("producerConfig"));
            (!booleanValue ? new NonReactiveProducerPerformance(properties, string, intValue, intValue2, intValue3, string2, longValue) : new ReactiveProducerPerformance(properties, string, intValue, intValue2, intValue3, string2, longValue)).runTest().printTotal();
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                argParser.printHelp();
                System.exit(0);
            } else {
                argParser.handleError(e);
                System.exit(1);
            }
        }
    }

    private static ArgumentParser argParser() {
        ArgumentParser description = ArgumentParsers.newArgumentParser("producer-performance").defaultHelp(true).description("This tool is used to verify the producer performance.");
        description.addArgument(new String[]{"--topic"}).action(Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("produce messages to this topic");
        description.addArgument(new String[]{"--num-records"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"NUM-RECORDS"}).dest("numRecords").help("number of messages to produce");
        description.addArgument(new String[]{"--record-size"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"RECORD-SIZE"}).dest("recordSize").help("message size in bytes");
        description.addArgument(new String[]{"--throughput"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"THROUGHPUT"}).help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec");
        description.addArgument(new String[]{"--producer-props"}).nargs("+").required(true).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("producerConfig").help("kafka producer related configuration properties like bootstrap.servers,client.id etc..");
        description.addArgument(new String[]{"--reactive"}).action(Arguments.store()).required(false).type(Boolean.class).metavar(new String[]{"REACTIVE"}).setDefault(true).help("if true, use reactive API");
        description.addArgument(new String[]{"--transactional-id"}).action(Arguments.store()).required(false).type(String.class).metavar(new String[]{"TRANSACTIONAL-ID"}).dest("transactionalId").setDefault("performance-producer-default-transactional-id").help("The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions.");
        description.addArgument(new String[]{"--transaction-duration-ms"}).action(Arguments.store()).required(false).type(Long.class).metavar(new String[]{"TRANSACTION-DURATION"}).dest("transactionDurationMs").setDefault(0L).help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive.");
        return description;
    }

    private static Map<String, Object> getProperties(List<String> list) {
        HashMap hashMap = new HashMap();
        if (list != null) {
            for (String str : list) {
                String[] split = str.split("=");
                if (split.length != 2) {
                    throw new IllegalArgumentException("Invalid property: " + str);
                }
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
