package reactor.kafka.tools.perf;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import reactor.core.Cancellation;
import reactor.kafka.receiver.Receiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;

/* loaded from: input_file:reactor/kafka/tools/perf/ConsumerPerformance.class */
public class ConsumerPerformance {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/tools/perf/ConsumerPerformance$AbstractConsumerPerformance.class */
    public static abstract class AbstractConsumerPerformance {
        final String topic;
        final Map<String, Object> consumerProps;
        final ConsumerPerfConfig config;
        private long startMs;
        final AtomicLong totalMessagesRead = new AtomicLong();
        final AtomicLong totalBytesRead = new AtomicLong();
        private long endMs = 0;

        AbstractConsumerPerformance(Map<String, Object> map, String str, String str2, ConsumerPerfConfig consumerPerfConfig) {
            this.topic = str;
            this.config = consumerPerfConfig;
            this.consumerProps = consumerProps(str2, map);
        }

        public void runTest(int i) throws InterruptedException {
            this.startMs = System.currentTimeMillis();
            consumeMessages(i, this.config.showDetailedStats);
            this.endMs = System.currentTimeMillis();
            if (this.config.showDetailedStats) {
                return;
            }
            printFinalStats();
        }

        public abstract void consumeMessages(int i, boolean z) throws InterruptedException;

        public double recordsPerSec() {
            return (this.totalMessagesRead.get() * 1000.0d) / (this.endMs - this.startMs);
        }

        private void printFinalStats() {
            double d = (this.endMs - this.startMs) / 1000.0d;
            if (this.config.showDetailedStats) {
                return;
            }
            double d2 = (this.totalBytesRead.get() * 1.0d) / 1048576.0d;
            System.out.println("Start-time               End-time               Total-MB  MB/sec Total-messages Messages/sec");
            System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f\n", this.config.dateFormat.format(Long.valueOf(this.startMs)), this.config.dateFormat.format(Long.valueOf(this.endMs)), Double.valueOf(d2), Double.valueOf(d2 / d), Long.valueOf(this.totalMessagesRead.get()), Double.valueOf(this.totalMessagesRead.get() / d));
        }

        void printProgressMessage(int i, long j, long j2, long j3, long j4, long j5, long j6, SimpleDateFormat simpleDateFormat) {
            double d = j6 - j5;
            System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f\n", simpleDateFormat.format(Long.valueOf(j6)), Integer.valueOf(i), Double.valueOf((j * 1.0d) / 1048576.0d), Double.valueOf(1000.0d * ((((j - j2) * 1.0d) / 1048576.0d) / d)), Long.valueOf(j3), Double.valueOf(((j3 - j4) / d) * 1000.0d));
        }

        private Map<String, Object> consumerProps(String str, Map<String, Object> map) {
            HashMap hashMap = new HashMap();
            hashMap.put("group.id", str);
            hashMap.put("auto.offset.reset", "earliest");
            hashMap.put("key.deserializer", ByteArrayDeserializer.class.getName());
            hashMap.put("value.deserializer", ByteArrayDeserializer.class.getName());
            hashMap.put("check.crcs", "false");
            hashMap.putAll(map);
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/tools/perf/ConsumerPerformance$ConsumerPerfConfig.class */
    public static class ConsumerPerfConfig {
        boolean showDetailedStats = false;
        long reportingInterval = 5000;
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");

        ConsumerPerfConfig() {
        }
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/ConsumerPerformance$NonReactiveConsumerPerformance.class */
    static class NonReactiveConsumerPerformance extends AbstractConsumerPerformance {
        NonReactiveConsumerPerformance(Map<String, Object> map, String str, String str2, ConsumerPerfConfig consumerPerfConfig) {
            super(map, str, str2, consumerPerfConfig);
        }

        @Override // reactor.kafka.tools.perf.ConsumerPerformance.AbstractConsumerPerformance
        public void consumeMessages(int i, boolean z) throws InterruptedException {
            System.out.println("Running consumer performance test using non-reactive API, class=" + getClass().getSimpleName());
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(this.consumerProps);
            consume(kafkaConsumer, i, z);
            kafkaConsumer.close();
        }

        private void consume(KafkaConsumer<byte[], byte[]> kafkaConsumer, int i, boolean z) throws InterruptedException {
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            long j4 = 0;
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            kafkaConsumer.subscribe(Collections.singleton(this.topic), new ConsumerRebalanceListener() { // from class: reactor.kafka.tools.perf.ConsumerPerformance.NonReactiveConsumerPerformance.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    atomicBoolean.set(false);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    atomicBoolean.set(true);
                }
            });
            long currentTimeMillis = System.currentTimeMillis();
            while (!atomicBoolean.get()) {
                if (System.currentTimeMillis() - currentTimeMillis >= 10000) {
                    throw new RuntimeException("Timed out waiting for initial group join.");
                }
                kafkaConsumer.poll(100L);
            }
            kafkaConsumer.seekToBeginning(Collections.emptyList());
            long currentTimeMillis2 = System.currentTimeMillis();
            long currentTimeMillis3 = System.currentTimeMillis();
            while (j2 < i && System.currentTimeMillis() - currentTimeMillis3 <= 1000) {
                ConsumerRecords poll = kafkaConsumer.poll(100L);
                if (poll.count() > 0) {
                    currentTimeMillis3 = System.currentTimeMillis();
                }
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    j2++;
                    if (consumerRecord.key() != null) {
                        j += ((byte[]) consumerRecord.key()).length;
                    }
                    if (consumerRecord.value() != null) {
                        j += ((byte[]) consumerRecord.value()).length;
                    }
                    if (j2 % this.config.reportingInterval == 0) {
                        if (z) {
                            printProgressMessage(0, j, j3, j2, j4, currentTimeMillis2, System.currentTimeMillis(), this.config.dateFormat);
                        }
                        currentTimeMillis2 = System.currentTimeMillis();
                        j4 = j2;
                        j3 = j;
                    }
                }
            }
            this.totalMessagesRead.set(j2);
            this.totalBytesRead.set(j);
        }
    }

    /* loaded from: input_file:reactor/kafka/tools/perf/ConsumerPerformance$ReactiveConsumerPerformance.class */
    static class ReactiveConsumerPerformance extends AbstractConsumerPerformance {
        ReactiveConsumerPerformance(Map<String, Object> map, String str, String str2, ConsumerPerfConfig consumerPerfConfig) {
            super(map, str, str2, consumerPerfConfig);
        }

        @Override // reactor.kafka.tools.perf.ConsumerPerformance.AbstractConsumerPerformance
        public void consumeMessages(int i, boolean z) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(i);
            AtomicLong atomicLong = new AtomicLong();
            AtomicLong atomicLong2 = new AtomicLong();
            AtomicLong atomicLong3 = new AtomicLong();
            AtomicLong atomicLong4 = new AtomicLong();
            System.out.println("Running consumer performance test using reactive API, class=" + getClass().getSimpleName());
            Cancellation subscribe = Receiver.create(ReceiverOptions.create(this.consumerProps).addAssignListener(collection -> {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    ((ReceiverPartition) it.next()).seekToBeginning();
                }
            }).subscription(Collections.singletonList(this.topic))).receive().subscribe(receiverRecord -> {
                ConsumerRecord record = receiverRecord.record();
                atomicLong3.set(System.currentTimeMillis());
                this.totalMessagesRead.incrementAndGet();
                if (record.key() != null) {
                    this.totalBytesRead.addAndGet(((byte[]) record.key()).length);
                }
                if (record.value() != null) {
                    this.totalBytesRead.addAndGet(((byte[]) record.value()).length);
                }
                if (this.totalMessagesRead.get() % this.config.reportingInterval == 0) {
                    if (z) {
                        printProgressMessage(0, this.totalBytesRead.get(), atomicLong.get(), this.totalMessagesRead.get(), atomicLong2.get(), atomicLong4.get(), System.currentTimeMillis(), this.config.dateFormat);
                    }
                    atomicLong4.set(System.currentTimeMillis());
                    atomicLong2.set(this.totalMessagesRead.get());
                    atomicLong.set(this.totalBytesRead.get());
                }
                countDownLatch.countDown();
            }, i);
            countDownLatch.await();
            subscribe.dispose();
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser argParser = argParser();
        try {
            Namespace parseArgs = argParser.parseArgs(strArr);
            String string = parseArgs.getString("topic");
            String string2 = parseArgs.getString("group");
            int intValue = parseArgs.getInt("messages").intValue();
            ConsumerPerfConfig consumerPerfConfig = new ConsumerPerfConfig();
            boolean booleanValue = parseArgs.getBoolean("reactive").booleanValue();
            Map<String, Object> properties = getProperties(parseArgs.getList("consumerConfig"));
            (booleanValue ? new ReactiveConsumerPerformance(properties, string, string2, consumerPerfConfig) : new NonReactiveConsumerPerformance(properties, string, string2, consumerPerfConfig)).runTest(intValue);
            System.exit(0);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                argParser.printHelp();
                System.exit(0);
            } else {
                argParser.handleError(e);
                System.exit(1);
            }
        }
    }

    private static ArgumentParser argParser() {
        ArgumentParser description = ArgumentParsers.newArgumentParser("consumer-performance").defaultHelp(true).description("This tool is used to verify the consumer performance.");
        description.addArgument(new String[]{"--topic"}).action(Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("consume messages from this topic");
        description.addArgument(new String[]{"--group"}).action(Arguments.store()).required(true).type(String.class).metavar(new String[]{"GROUP"}).help("group id");
        description.addArgument(new String[]{"--messages"}).action(Arguments.store()).required(true).type(Integer.class).metavar(new String[]{"MESSAGES"}).help("number of messages to consume");
        description.addArgument(new String[]{"--consumer-props"}).nargs("+").required(false).metavar(new String[]{"PROP-NAME=PROP-VALUE"}).type(String.class).dest("consumerConfig").help("kafka consumer related configuration properties like bootstrap.servers,client.id etc..");
        description.addArgument(new String[]{"--reactive"}).action(Arguments.store()).type(Boolean.class).metavar(new String[]{"REACTIVE"}).setDefault(false).help("if true, use reactive API");
        return description;
    }

    private static Map<String, Object> getProperties(List<String> list) {
        HashMap hashMap = new HashMap();
        if (list != null) {
            for (String str : list) {
                String[] split = str.split("=");
                if (split.length != 2) {
                    throw new IllegalArgumentException("Invalid property: " + str);
                }
                hashMap.put(split[0], split[1]);
            }
        }
        return hashMap;
    }
}
