/*
 * Decompiled with CFR 0.152.
 */
package io.openmessaging.chaos.driver.rocketmq;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.io.BaseEncoding;
import io.openmessaging.chaos.common.Message;
import io.openmessaging.chaos.driver.mq.ConsumerCallback;
import io.openmessaging.chaos.driver.mq.MQChaosDriver;
import io.openmessaging.chaos.driver.mq.MQChaosNode;
import io.openmessaging.chaos.driver.mq.MQChaosProducer;
import io.openmessaging.chaos.driver.mq.MQChaosPullConsumer;
import io.openmessaging.chaos.driver.mq.MQChaosPushConsumer;
import io.openmessaging.chaos.driver.rocketmq.RocketMQChaosNode;
import io.openmessaging.chaos.driver.rocketmq.RocketMQChaosProducer;
import io.openmessaging.chaos.driver.rocketmq.RocketMQChaosPullConsumer;
import io.openmessaging.chaos.driver.rocketmq.RocketMQChaosPushConsumer;
import io.openmessaging.chaos.driver.rocketmq.config.RocketMQBrokerConfig;
import io.openmessaging.chaos.driver.rocketmq.config.RocketMQClientConfig;
import io.openmessaging.chaos.driver.rocketmq.config.RocketMQConfig;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQChaosDriver
implements MQChaosDriver {
    private static final Random RANDOM = new Random();
    private static final ObjectMapper MAPPER = new ObjectMapper((JsonFactory)new YAMLFactory()).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private static final Logger log = LoggerFactory.getLogger(RocketMQChaosDriver.class);
    private DefaultMQAdminExt rmqAdmin;
    private RocketMQClientConfig rmqClientConfig;
    private RocketMQBrokerConfig rmqBrokerConfig;
    private RocketMQConfig rmqConfig;
    private List<String> nodes;

    private static RocketMQClientConfig readConfigForClient(File configurationFile) throws IOException {
        return (RocketMQClientConfig)MAPPER.readValue(configurationFile, RocketMQClientConfig.class);
    }

    private static RocketMQBrokerConfig readConfigForBroker(File configurationFile) throws IOException {
        return (RocketMQBrokerConfig)MAPPER.readValue(configurationFile, RocketMQBrokerConfig.class);
    }

    private static RocketMQConfig readConfigForRMQ(File configurationFile) throws IOException {
        return (RocketMQConfig)MAPPER.readValue(configurationFile, RocketMQConfig.class);
    }

    private static String getRandomString() {
        byte[] buffer = new byte[5];
        RANDOM.nextBytes(buffer);
        return BaseEncoding.base64Url().omitPadding().encode(buffer);
    }

    public void initialize(File configurationFile, List<String> nodes) throws IOException {
        this.rmqClientConfig = RocketMQChaosDriver.readConfigForClient(configurationFile);
        this.rmqBrokerConfig = RocketMQChaosDriver.readConfigForBroker(configurationFile);
        this.rmqConfig = RocketMQChaosDriver.readConfigForRMQ(configurationFile);
        this.nodes = nodes;
    }

    public MQChaosNode createChaosNode(String node, List<String> nodes) {
        return new RocketMQChaosNode(node, nodes, this.rmqConfig, this.rmqBrokerConfig);
    }

    public MQChaosProducer createProducer(String topic) {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ProducerGroup_Chaos");
        defaultMQProducer.setNamesrvAddr(this.getNameserver());
        defaultMQProducer.setInstanceName("ProducerInstance" + RocketMQChaosDriver.getRandomString());
        return new RocketMQChaosProducer(defaultMQProducer, topic);
    }

    public MQChaosPushConsumer createPushConsumer(String topic, String subscriptionName, ConsumerCallback consumerCallback) {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(subscriptionName);
        defaultMQPushConsumer.setNamesrvAddr(this.getNameserver());
        defaultMQPushConsumer.setInstanceName("ConsumerInstance" + RocketMQChaosDriver.getRandomString());
        try {
            defaultMQPushConsumer.subscribe(topic, "*");
            defaultMQPushConsumer.registerMessageListener((msgs, context) -> {
                for (MessageExt message : msgs) {
                    consumerCallback.messageReceived(new Message(message.getKeys(), message.getBody(), this.buildExtraInfo(message, subscriptionName)));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        }
        catch (MQClientException e) {
            log.error("Failed to create consumer instance.", (Throwable)e);
        }
        return new RocketMQChaosPushConsumer(defaultMQPushConsumer);
    }

    public MQChaosPullConsumer createPullConsumer(String topic, String subscriptionName) {
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer(subscriptionName);
        defaultLitePullConsumer.setNamesrvAddr(this.getNameserver());
        defaultLitePullConsumer.setInstanceName("ConsumerInstance" + RocketMQChaosDriver.getRandomString());
        defaultLitePullConsumer.setPollTimeoutMillis(100L);
        defaultLitePullConsumer.setPullBatchSize(5);
        try {
            defaultLitePullConsumer.subscribe(topic, "*");
        }
        catch (MQClientException e) {
            log.error("Failed to start the created lite pull consumer instance.", (Throwable)e);
        }
        return new RocketMQChaosPullConsumer(defaultLitePullConsumer);
    }

    public void createTopic(String topic, int partitions) {
        this.rmqAdmin = new DefaultMQAdminExt();
        this.rmqAdmin.setNamesrvAddr(this.getNameserver());
        this.rmqAdmin.setInstanceName("AdminInstance-" + RocketMQChaosDriver.getRandomString());
        try {
            this.rmqAdmin.start();
        }
        catch (MQClientException e) {
            log.error("Start the RocketMQ admin tool failed.");
        }
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setOrder(false);
        topicConfig.setPerm(6);
        topicConfig.setReadQueueNums(partitions);
        topicConfig.setWriteQueueNums(partitions);
        topicConfig.setTopicName(topic);
        try {
            Set brokerList = CommandUtil.fetchMasterAddrByClusterName((MQAdminExt)this.rmqAdmin, (String)this.rmqClientConfig.clusterName);
            topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
            topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
            for (String brokerAddr : brokerList) {
                this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to create topic [%s] to cluster [%s]", topic, this.rmqClientConfig.clusterName), e);
        }
    }

    public void shutdown() {
        this.rmqAdmin.shutdown();
    }

    private String getNameserver() {
        if (this.rmqClientConfig.namesrvAddr != null && !this.rmqClientConfig.namesrvAddr.isEmpty()) {
            return this.rmqClientConfig.namesrvAddr;
        }
        StringBuilder res = new StringBuilder();
        this.nodes.forEach(node -> res.append(node + ":9876;"));
        return res.toString();
    }

    private String buildExtraInfo(MessageExt message, String group) {
        return "receive status [ msgId = " + message.getMsgId() + ", topic = " + message.getTopic() + ", group = " + group + ", queueId = " + message.getQueueId() + ", queueOffset = " + message.getQueueOffset() + "]";
    }
}

