package io.debezium.connector.oracle.logminer;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Requires LogMiner")
/* loaded from: input_file:io/debezium/connector/oracle/logminer/LogMinerSourceInfoRedoSqlIT.class */
public class LogMinerSourceInfoRedoSqlIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    @FixFor({"DBZ-6960"})
    public void shouldStreamRedoSqlInSourceInfoBlockWhenEnabled() throws Exception {
        testRedoSqlInSourceInfoBlock(true);
    }

    @Test
    @FixFor({"DBZ-6960"})
    public void shouldNotIncludeRedoSqlInSourceInfoBlockWhenNotEnabled() throws Exception {
        testRedoSqlInSourceInfoBlock(false);
    }

    private void testRedoSqlInSourceInfoBlock(boolean z) throws Exception {
        TestHelper.dropTable(connection, "dbz6960");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6960 (id number(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz6960");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_INCLUDE_REDO_SQL, Boolean.toString(z)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6960").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz6960 (id,data) values (1, 'test')"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6960");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("source");
            Assertions.assertThat(struct.schema().field("redo_sql")).isNotNull();
            String string = struct.getString("redo_sql");
            if (z) {
                Assertions.assertThat(string).isEqualTo("insert into \"DEBEZIUM\".\"DBZ6960\"(\"ID\",\"DATA\") values ('1','test');");
            } else {
                Assertions.assertThat(string).isNull();
            }
            TestHelper.dropTable(connection, "dbz6960");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6960");
            throw th;
        }
    }
}
