/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka.avro.embedded;

import io.atleon.kafka.embedded.EmbeddedKafka;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public final class EmbeddedSchemaRegistry {
    public static final int DEFAULT_PORT = 8081;
    private static URL schemaRegistryConnect;

    private EmbeddedSchemaRegistry() {
    }

    public static URL startAndGetConnectUrl() {
        return EmbeddedSchemaRegistry.startAndGetConnectUrl(8081);
    }

    public static URL startAndGetConnectUrl(int port) {
        return EmbeddedSchemaRegistry.startAndGetConnectUrl(port, 9092);
    }

    public static URL startAndGetConnectUrl(int port, int kafkaPort) {
        return EmbeddedSchemaRegistry.startAndGetConnectUrl(port, kafkaPort, 2181);
    }

    public static synchronized URL startAndGetConnectUrl(int port, int kafkaPort, int zookeeperPort) {
        return schemaRegistryConnect == null ? (schemaRegistryConnect = EmbeddedSchemaRegistry.initializeSchemaRegistry(port, kafkaPort, zookeeperPort)) : schemaRegistryConnect;
    }

    private static URL initializeSchemaRegistry(int port, int kafkaPort, int zookeeperPort) {
        try {
            URL schemaConnect = EmbeddedSchemaRegistry.convertToConnectUrl("localhost:" + port);
            Map kafkaConfig = EmbeddedKafka.start((int)kafkaPort, (int)zookeeperPort);
            Properties registryProperties = new Properties();
            registryProperties.putAll(EmbeddedSchemaRegistry.createLocalSchemaRegistryConfig(schemaConnect, kafkaConfig));
            new SchemaRegistryRestApplication(new SchemaRegistryConfig(registryProperties)).start();
            return schemaConnect;
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not start local Schema App", e);
        }
    }

    private static URL convertToConnectUrl(String connect) {
        try {
            return new URL("http://" + connect);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Could not create URL for Connect: " + connect, e);
        }
    }

    private static Map<String, Object> createLocalSchemaRegistryConfig(URL schemaConnect, Map<String, ?> kafkaConfig) {
        HashMap<String, Object> registryConfig = new HashMap<String, Object>();
        registryConfig.put("listeners", schemaConnect.toString());
        registryConfig.put("kafkastore.connection.url", EmbeddedKafka.extractZookeeperConnect(kafkaConfig));
        registryConfig.put("kafkastore.security.protocol", EmbeddedKafka.extractSecurityProtocol(kafkaConfig));
        registryConfig.put("kafkastore.bootstrap.servers", EmbeddedKafka.extractSecureConnect(kafkaConfig));
        registryConfig.put("kafkastore.topic", "_schemas");
        return registryConfig;
    }
}

