/*
 * Decompiled with CFR 0.152.
 */
package io.kareldb.utils;

import java.util.List;
import java.util.Properties;
import java.util.Vector;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.After;
import org.junit.Before;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;

public abstract class ClusterTestHarness {
    protected static final int DEFAULT_NUM_BROKERS = 1;
    protected static final Option<Properties> EMPTY_SASL_PROPERTIES = Option$.MODULE$.empty();
    private final int numBrokers;
    protected EmbeddedZookeeper zookeeper;
    protected String zkConnect;
    protected List<KafkaConfig> configs = null;
    protected List<KafkaServer> servers = null;
    protected String bootstrapServers = null;

    public ClusterTestHarness() {
        this(1);
    }

    public ClusterTestHarness(int numBrokers) {
        this.numBrokers = numBrokers;
    }

    @Before
    public void setUp() throws Exception {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = String.format("localhost:%d", this.zookeeper.port());
        this.configs = new Vector<KafkaConfig>();
        this.servers = new Vector<KafkaServer>();
        for (int i = 0; i < this.numBrokers; ++i) {
            KafkaConfig config = this.getKafkaConfig(i);
            this.configs.add(config);
            KafkaServer server = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
            this.servers.add(server);
        }
        Object[] serverUrls = new String[this.servers.size()];
        ListenerName listenerType = ListenerName.forSecurityProtocol((SecurityProtocol)this.getSecurityProtocol());
        for (int i = 0; i < this.servers.size(); ++i) {
            serverUrls[i] = Utils.formatAddress((String)((EndPoint)this.servers.get(i).config().advertisedListeners().head()).host(), (Integer)this.servers.get(i).boundPort(listenerType));
        }
        this.bootstrapServers = Utils.join((Object[])serverUrls, (String)",");
    }

    protected void injectProperties(Properties props) {
        props.setProperty("auto.create.topics.enable", "true");
        props.setProperty("num.partitions", "1");
    }

    protected KafkaConfig getKafkaConfig(int brokerId) {
        Option noFile = Option.apply(null);
        Option noInterBrokerSecurityProtocol = Option.apply(null);
        Properties props = TestUtils.createBrokerConfig((int)brokerId, (String)this.zkConnect, (boolean)false, (boolean)false, (int)TestUtils.RandomPort(), (Option)noInterBrokerSecurityProtocol, (Option)noFile, EMPTY_SASL_PROPERTIES, (boolean)true, (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (Option)Option.empty(), (int)1, (boolean)false, (int)1, (short)1);
        this.injectProperties(props);
        return KafkaConfig.fromProps((Properties)props);
    }

    protected SecurityProtocol getSecurityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void tearDown() throws Exception {
        if (this.servers != null) {
            for (KafkaServer server : this.servers) {
                server.shutdown();
            }
            for (KafkaServer server : this.servers) {
                CoreUtils.delete((Seq)server.config().logDirs());
            }
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }
}

