package com.privalia.qa.specs;

import cucumber.api.DataTable;
import cucumber.api.java.en.And;
import cucumber.api.java.en.Given;
import cucumber.api.java.en.Then;
import cucumber.api.java.en.When;
import gherkin.formatter.model.DataTableRow;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import okhttp3.Response;
import org.apache.avro.generic.GenericRecord;
import org.apache.zookeeper.KeeperException;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:com/privalia/qa/specs/KafkaGSpec.class */
public class KafkaGSpec extends BaseGSpec {
    static final /* synthetic */ boolean $assertionsDisabled;

    public KafkaGSpec(CommonG commonG) {
        this.commonspec = commonG;
    }

    @Given("^I connect to kafka at '(.+?)'( using path '(.+?)')?$")
    public void connectKafka(String str, String str2, String str3) throws UnknownHostException {
        String str4 = str.split(":")[1];
        this.commonspec.getKafkaUtils().setZkHost(str.split(":")[0], str4, str3);
        this.commonspec.getKafkaUtils().connect();
    }

    @When("^I create a Kafka topic named '(.+?)'( if it doesn't exists)?")
    public void createKafkaTopic(String str, String str2) throws Exception {
        if (str2 != null) {
            this.commonspec.getLogger().debug("Checking if topic " + str + " exists before creation");
            if (this.commonspec.getKafkaUtils().listTopics().contains(str)) {
                return;
            }
        }
        this.commonspec.getLogger().debug("Creating topic " + str);
        this.commonspec.getKafkaUtils().createTopic(str);
    }

    @When("^I delete a Kafka topic named '(.+?)'")
    public void deleteKafkaTopic(String str) throws Exception {
        this.commonspec.getKafkaUtils().deleteTopic(str);
    }

    @When("^I increase '(.+?)' partitions in a Kafka topic named '(.+?)'")
    public void modifyPartitions(int i, String str) throws Exception {
        int partitions = this.commonspec.getKafkaUtils().getPartitions(str);
        this.commonspec.getKafkaUtils().modifyTopicPartitioning(str, partitions + i);
        Assertions.assertThat(this.commonspec.getKafkaUtils().getPartitions(str)).as("Number of partitions is not the expected after operation", new Object[0]).isEqualTo(partitions + i);
    }

    @When("^I send a message '(.+?)' to the kafka topic named '(.+?)'( with key '(.+?)')?( if not exists)?$")
    public void sendAMessage(String str, String str2, String str3, String str4, String str5) throws Exception {
        if (str5 == null) {
            this.commonspec.getKafkaUtils().sendAndConfirmMessage(str, str4, str2, 1L);
            return;
        }
        Map<Object, Object> readTopicFromBeginning = this.commonspec.getKafkaUtils().readTopicFromBeginning(str2);
        if (!readTopicFromBeginning.containsKey(str4)) {
            this.commonspec.getKafkaUtils().sendAndConfirmMessage(str, str4, str2, 1L);
        } else {
            if (readTopicFromBeginning.get(str4).toString().matches(str)) {
                return;
            }
            this.commonspec.getKafkaUtils().sendAndConfirmMessage(str, str4, str2, 1L);
        }
    }

    @Given("I send a message '(.+?)' to the kafka topic named '(.+?)'( with key '(.+?)')? with:$")
    public void sendAMessageWithDatatable(String str, String str2, String str3, String str4, DataTable dataTable) throws InterruptedException, ExecutionException, TimeoutException {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            this.commonspec.getKafkaUtils().modifyProducerProperties((String) dataTableRow.getCells().get(0), (String) dataTableRow.getCells().get(1));
        }
        this.commonspec.getKafkaUtils().sendAndConfirmMessage(str, str4, str2, 1L);
    }

    @Then("^A kafka topic named '(.+?)' does not exist")
    public void kafkaTopicNotExist(String str) throws KeeperException, InterruptedException {
        if (!$assertionsDisabled && this.commonspec.getKafkaUtils().getZkUtils().pathExists("/" + str)) {
            throw new AssertionError("There is a topic with that name");
        }
    }

    @Then("^The number of partitions in topic '(.+?)' should be '(.+?)''?$")
    public void checkNumberOfPartitions(String str, int i) throws Exception {
        Assertions.assertThat(this.commonspec.getKafkaUtils().getPartitions(str)).isEqualTo(i);
    }

    @Then("^The kafka topic '(.*?)' has a message containing '(.*?)'( as key)?$")
    public void checkMessages(String str, String str2, String str3) throws InterruptedException {
        if (str3 != null) {
            Assertions.assertThat(this.commonspec.getKafkaUtils().readTopicFromBeginning(str).containsKey(str2)).as("Topic does not exist or the content does not match", new Object[0]).isTrue();
        } else {
            Assertions.assertThat(this.commonspec.getKafkaUtils().readTopicFromBeginning(str).containsValue(str2)).as("Topic does not exist or the content does not match", new Object[0]).isTrue();
        }
    }

    @Then("^A kafka topic named '(.+?)' exists")
    public void kafkaTopicExist(String str) throws KeeperException, InterruptedException {
        Assertions.assertThat(this.commonspec.getKafkaUtils().listTopics().contains(str)).as("There is no topic with that name", new Object[0]).isTrue();
    }

    @Given("^My schema registry is running at '(.+)'$")
    public void mySchemaRegistryIsRunningAtLocalhost(String str) throws Throwable {
        this.commonspec.getKafkaUtils().setSchemaRegistryUrl("http://" + str);
        this.commonspec.getKafkaUtils().modifyProducerProperties("schema.registry.url", "http://" + str);
    }

    @Then("^I register a new version of a schema under the subject '(.+)' with '(.+)'$")
    public void iRegisterANewVersionOfASchemaUnderTheSubject(String str, String str2) throws Throwable {
        Response registerNewSchema = this.commonspec.getKafkaUtils().registerNewSchema(str, this.commonspec.retrieveData(str2, "json"));
        Assertions.assertThat(registerNewSchema.code()).as("Schema registry returned " + registerNewSchema.code() + " response, body: " + registerNewSchema.body().string(), new Object[0]).isEqualTo(200);
    }

    @Then("^The kafka topic '(.+?)' has a message containing '(.+?)'( as key)? with:$")
    public void theKafkaTopicStringTopicHasAMessageContainingHelloWith(String str, String str2, String str3, DataTable dataTable) throws Throwable {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            this.commonspec.getKafkaUtils().modifyConsumerProperties((String) dataTableRow.getCells().get(0), (String) dataTableRow.getCells().get(1));
        }
        Map<Object, Object> readTopicFromBeginning = this.commonspec.getKafkaUtils().readTopicFromBeginning(str);
        if (str3 != null) {
            Assertions.assertThat(readTopicFromBeginning.containsKey(getFinalMessage("key.deserializer", str2))).as("Topic does not exist or the content does not match", new Object[0]).isTrue();
        } else {
            Assertions.assertThat(readTopicFromBeginning.containsValue(getFinalMessage("value.deserializer", str2))).as("Topic does not exist or the content does not match", new Object[0]).isTrue();
        }
    }

    private Object getFinalMessage(String str, String str2) {
        Object str3;
        String property = this.commonspec.getKafkaUtils().getPropsConsumer().getProperty(str);
        boolean z = -1;
        switch (property.hashCode()) {
            case -1150081177:
                if (property.equals("org.apache.kafka.common.serialization.StringDeserializer")) {
                    z = false;
                    break;
                }
                break;
            case 800842258:
                if (property.equals("org.apache.kafka.common.serialization.LongDeserializer")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str3 = str2.toString();
                break;
            case true:
                str3 = Long.valueOf(Long.parseLong(str2));
                break;
            default:
                str3 = str2.toString();
                break;
        }
        return str3;
    }

    @Then("^I create the avro record '(.+?)' from the schema in '(.+?)'( based on '(.+?)')? with:$")
    public void iCreateTheAvroRecordRecord(String str, String str2, String str3, String str4, DataTable dataTable) throws Throwable {
        createRecord(str, this.commonspec.retrieveData(str2, "json"), str4, dataTable);
    }

    @Then("^I create the avro record '(.+?)' using version '(.+?)' of subject '(.+?)' from registry( based on '(.+?)')? with:$")
    public void iCreateTheAvroRecordRecordUsingVersionOfSubjectRecordFromRegistryWith(String str, String str2, String str3, String str4, String str5, DataTable dataTable) throws Throwable {
        createRecord(str, this.commonspec.getKafkaUtils().getSchemaFromRegistry(str3, str2), str5, dataTable);
    }

    private void createRecord(String str, String str2, String str3, DataTable dataTable) throws Exception {
        if (str3 != null) {
            this.commonspec.getLogger().debug("Building Avro record from seed file");
            String retrieveData = this.commonspec.retrieveData(str3, "json", "ISO-8859-1");
            this.commonspec.getLogger().debug("Modifying data {} as {}", retrieveData, "json");
            this.commonspec.getKafkaUtils().createGenericRecord(str, this.commonspec.modifyData(retrieveData, "json", dataTable).toString(), str2);
            return;
        }
        this.commonspec.getLogger().debug("Building Avro record from datatable");
        HashMap hashMap = new HashMap();
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            hashMap.put(dataTableRow.getCells().get(0), dataTableRow.getCells().get(1));
        }
        this.commonspec.getKafkaUtils().createGenericRecord(str, hashMap, str2);
    }

    @When("^I send the avro record '(.+?)' to the kafka topic '(.+?)'( with key '(.+?)')? with:$")
    public void iSendTheAvroRecordRecordToTheKafkaTopic(String str, String str2, String str3, String str4, DataTable dataTable) throws Throwable {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            this.commonspec.getKafkaUtils().modifyProducerProperties((String) dataTableRow.getCells().get(0), (String) dataTableRow.getCells().get(1));
        }
        this.commonspec.getKafkaUtils().modifyProducerProperties("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        Assertions.assertThat(this.commonspec.getKafkaUtils().getAvroRecords().get(str)).as("No generic record found with name " + str, new Object[0]).isNotNull();
        this.commonspec.getKafkaUtils().sendAndConfirmMessage(str, str4, str2, 1L);
    }

    @Then("^The kafka topic '(.+?)' has an avro message '(.+?)' with:$")
    public void theKafkaTopicAvroTopicHasAnAvroMessageRecordWith(String str, String str2, DataTable dataTable) throws Throwable {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            this.commonspec.getKafkaUtils().modifyConsumerProperties((String) dataTableRow.getCells().get(0), (String) dataTableRow.getCells().get(1));
        }
        this.commonspec.getKafkaUtils().modifyConsumerProperties("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        Assertions.assertThat(getCommonSpec().getKafkaUtils().getSchemaRegistryUrl()).as("Could not build avro consumer since no schema registry was defined", new Object[0]).isNotNull();
        this.commonspec.getKafkaUtils().modifyConsumerProperties("schema.registry.url", getCommonSpec().getKafkaUtils().getSchemaRegistryUrl());
        kafkaTopicExist(str);
        Map<Object, Object> readTopicFromBeginning = this.commonspec.getKafkaUtils().readTopicFromBeginning(str);
        this.commonspec.getLogger().debug("Found " + readTopicFromBeginning.size() + "records in topic " + str);
        Assertions.assertThat(readTopicFromBeginning.containsValue(this.commonspec.getKafkaUtils().getAvroRecords().get(str2))).as("Topic does not contain message that matches the specified record", new Object[0]).isTrue();
    }

    @Then("^I configure the kafka consumers with:$")
    public void iConfigureConsumerProperties(DataTable dataTable) {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            String str = (String) dataTableRow.getCells().get(0);
            String str2 = (String) dataTableRow.getCells().get(1);
            getCommonSpec().getLogger().debug("Setting kafka consumer property: " + str + " -> " + str2);
            this.commonspec.getKafkaUtils().modifyConsumerProperties(str, str2);
        }
    }

    @Then("^I configure the kafka producer with:$")
    public void iConfigureProducerProperties(DataTable dataTable) {
        for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
            String str = (String) dataTableRow.getCells().get(0);
            String str2 = (String) dataTableRow.getCells().get(1);
            getCommonSpec().getLogger().debug("Setting kafka producer property: " + str + " -> " + str2);
            this.commonspec.getKafkaUtils().modifyProducerProperties(str, str2);
        }
    }

    @Then("^I close the connection to kafka$")
    public void iCloseTheConnectionToKafka() throws Throwable {
        getCommonSpec().getLogger().debug("Closing connection to kafka..");
        if (getCommonSpec().getKafkaUtils().getZkUtils() != null) {
            getCommonSpec().getKafkaUtils().getZkUtils().close();
        }
    }

    @And("^The kafka topic '(.+?)' has( at least)? '(.+?)' an avro message with:$")
    public void theKafkaTopicHasAnAvroMessageWith(String str, String str2, int i, DataTable dataTable) throws Throwable {
        this.commonspec.getKafkaUtils().modifyConsumerProperties("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        Assertions.assertThat(getCommonSpec().getKafkaUtils().getSchemaRegistryUrl()).as("Could not build avro consumer since no schema registry was defined", new Object[0]).isNotNull();
        this.commonspec.getKafkaUtils().modifyConsumerProperties("schema.registry.url", getCommonSpec().getKafkaUtils().getSchemaRegistryUrl());
        kafkaTopicExist(str);
        Map<Object, Object> readTopicFromBeginning = this.commonspec.getKafkaUtils().readTopicFromBeginning(str);
        int size = readTopicFromBeginning.size();
        for (Object obj : readTopicFromBeginning.values()) {
            if (obj instanceof GenericRecord) {
                String obj2 = ((GenericRecord) obj).toString();
                for (DataTableRow dataTableRow : dataTable.getGherkinRows()) {
                    String str3 = (String) dataTableRow.getCells().get(0);
                    try {
                        this.commonspec.evaluateJSONElementOperation(this.commonspec.getJSONPathString(obj2, str3, null), (String) dataTableRow.getCells().get(1), (String) dataTableRow.getCells().get(2));
                    } catch (AssertionError e) {
                        size--;
                    }
                }
            }
        }
        getCommonSpec().getLogger().debug("Found " + size + "records in topic " + str + " that match the specified conditions");
        if (str2 != null) {
            Assertions.assertThat(size).as("No matches found", new Object[0]).isGreaterThanOrEqualTo(i);
        } else {
            Assertions.assertThat(size).as("No matches found", new Object[0]).isEqualTo(i);
        }
    }

    static {
        $assertionsDisabled = !KafkaGSpec.class.desiredAssertionStatus();
    }
}
