package nstream.adapter.confluent;

import java.util.Properties;
import nstream.adapter.common.ext.KafkaIngressSettings;
import nstream.adapter.common.provision.ProvisionLoader;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/confluent/ConfluentAdapterUtils.class */
public final class ConfluentAdapterUtils {
    private static final String[] PRODUCER_PROVISION_REQUIRED_PROPS = {"bootstrap.servers", "key.deserializer", "value.deserializer", "ccloud.api.key", "ccloud.api.secret"};
    private static final String[] CONSUMER_PROVISION_REQUIRED_PROPS = {"bootstrap.servers", "group.id", "key.deserializer", "value.deserializer", "ccloud.api.key", "ccloud.api.secret"};

    private ConfluentAdapterUtils() {
    }

    public static <K, V> Consumer<K, V> createConsumer(Value value) {
        return createConsumer(parseConsumerCreatingSettings(value));
    }

    private static KafkaIngressSettings parseConsumerCreatingSettings(Value value) {
        if (value == null || !value.isDistinct()) {
            throw new IllegalArgumentException("prop " + value + " does not yield KafkaIngressSettings");
        }
        return (KafkaIngressSettings) KafkaIngressSettings.form().cast(value);
    }

    public static <K, V> Consumer<K, V> createConsumer(KafkaIngressSettings kafkaIngressSettings) {
        return new KafkaConsumer(sanitizeConsumerPropertiesProvision(sanitizeConsumerProvisionBasedSettings(kafkaIngressSettings)));
    }

    private static String sanitizeConsumerProvisionBasedSettings(KafkaIngressSettings kafkaIngressSettings) {
        if (kafkaIngressSettings.topics() == null || kafkaIngressSettings.topics().isEmpty()) {
            throw new IllegalArgumentException("Must subscribe to at least one topic");
        }
        if (kafkaIngressSettings.consumerPropertiesProvisionName() == null) {
            throw new IllegalArgumentException("Config must include consumerPropertiesProvisionName");
        }
        return kafkaIngressSettings.consumerPropertiesProvisionName();
    }

    static Properties sanitizeConsumerPropertiesProvision(String str) {
        return coalesceProducerProperties((Properties) ProvisionLoader.getProvision(str).value());
    }

    static Properties coalesceConsumerProperties(Properties properties) {
        Properties coalesceProperties = coalesceProperties(properties, CONSUMER_PROVISION_REQUIRED_PROPS);
        coalesceProperties.setProperty("session.timeout.ms", "45000");
        return coalesceProperties;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Properties coalesceProducerProperties(Properties properties) {
        Properties coalesceProperties = coalesceProperties(properties, PRODUCER_PROVISION_REQUIRED_PROPS);
        coalesceProperties.setProperty("acks", "all");
        return coalesceProperties;
    }

    private static Properties coalesceProperties(Properties properties, String[] strArr) {
        for (String str : strArr) {
            if (!properties.containsKey(str)) {
                throw new IllegalArgumentException("Properties must contain " + str);
            }
        }
        Properties properties2 = new Properties();
        properties2.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';", properties.getProperty("ccloud.api.key"), properties.getProperty("ccloud.api.secret")));
        properties2.setProperty("security.protocol", "SASL_SSL");
        properties2.setProperty("sasl.mechanism", "PLAIN");
        properties2.setProperty("client.dns.lookup", "use_all_dns_ips");
        String property = properties.getProperty("schema.registry.url");
        if (property != null && !property.isEmpty()) {
            String property2 = properties.getProperty("ccloud.schema.registry.key");
            String property3 = properties.getProperty("ccloud.schema.registry.secret");
            if (property2 != null && !property2.isEmpty() && property3 != null && !property3.isEmpty()) {
                properties2.setProperty("basic.auth.credentials.source", "USER_INFO");
                properties2.setProperty("basic.auth.user.info", String.format("%s:%s", property2, property3));
            }
        }
        int length = "kafka.".length();
        properties.stringPropertyNames().forEach(str2 -> {
            if (str2.startsWith("ccloud.")) {
                return;
            }
            properties2.put(str2.substring(str2.startsWith("kafka.") ? length : 0), properties.getProperty(str2));
        });
        return properties2;
    }
}
