package io.debezium.connector.oracle;

import ch.qos.logback.classic.Level;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.antlr.listener.AlterTableParserListener;
import io.debezium.connector.oracle.antlr.listener.CreateTableParserListener;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.olr.OpenLogReplicatorStreamingChangeEventSource;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/oracle/OracleSchemaMigrationIT.class */
public class OracleSchemaMigrationIT extends AbstractAsyncEngineConnectorTest {
    private OracleConnection connection;

    /* renamed from: io.debezium.connector.oracle.OracleSchemaMigrationIT$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/oracle/OracleSchemaMigrationIT$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter = new int[OracleConnectorConfig.ConnectorAdapter.values().length];

        static {
            try {
                $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[OracleConnectorConfig.ConnectorAdapter.LOG_MINER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[OracleConnectorConfig.ConnectorAdapter.XSTREAM.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[OracleConnectorConfig.ConnectorAdapter.OLR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Before
    public void beforeEach() throws Exception {
        this.connection = TestHelper.testConnection();
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
        TestHelper.dropAllTables();
    }

    @After
    public void afterEach() throws Exception {
        if (this.connection != null) {
            TestHelper.dropAllTables();
            this.connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamNewlyCreatedNotFilteredTable() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(2);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord2);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEB");
        List array2 = ((Struct) sourceRecord2.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "CREATE", "DEBEZIUM", "TABLEB");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME).get(1);
        assertStreamingSchemaChange(sourceRecord3);
        List array3 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array3).hasSize(1);
        assertTableChange((Struct) array3.get(0), "ALTER", "DEBEZIUM", "TABLEB");
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEB")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 1);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEB");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldNotStreamNewlyCreatedTableDueToFilters() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
        assertNoRecordsToConsume();
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (1, 'B')"});
        assertNoRecordsToConsume();
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'A')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("after").get("DATA")).isEqualTo("A");
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableAddColumnSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea ADD data2 numeric"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA,DATA2) values (2, 'Test2', 100)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(3);
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test2");
        Assertions.assertThat(struct2.get("DATA2")).isEqualTo(BigDecimal.valueOf(100L));
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableAddMultipleColumnsSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea ADD (data2 numeric, data3 varchar2(25))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA,DATA2,DATA3) values (2, 'Test2', 100, 'a')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(4);
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test2");
        Assertions.assertThat(struct2.get("DATA2")).isEqualTo(BigDecimal.valueOf(100L));
        Assertions.assertThat(struct2.get("DATA3")).isEqualTo("a");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableRenameColumnSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME COLUMN data TO data1"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA1) values (2, 'Test2')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
        Assertions.assertThat(struct2.schema().field("DATA")).isNull();
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA1")).isEqualTo("Test2");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableDropColumnSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea DROP COLUMN data"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID) values (2)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(1);
        Assertions.assertThat(struct2.schema().field("DATA")).isNull();
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableDropMultipleColumnsSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data1 varchar2(50), data2 numeric)");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA1,DATA2) values (1, 'Test', 100)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(3);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA1")).isEqualTo("Test");
        Assertions.assertThat(struct.get("DATA2")).isEqualTo(BigDecimal.valueOf(100L));
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea DROP (data1, data2)"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID) values (2)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(1);
        Assertions.assertThat(struct2.schema().field("DATA1")).isNull();
        Assertions.assertThat(struct2.schema().field("DATA2")).isNull();
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableRenameTableSchemaChange() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME TO tableb"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA,TABLEB");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEB");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEB")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEB");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test2");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldNotStreamAfterTableRenameToExcludedName() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME TO tableb"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA,TABLEB");
        Assertions.assertThat(((Struct) sourceRecord3.value()).getArray("tableChanges")).isEmpty();
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnDataType() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric)");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data varchar2(50))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord2);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord2.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 1);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord3.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnNullability() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50) not null)");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        Assertions.assertThat(struct.schema().field("DATA").schema().isOptional()).isFalse();
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data varchar2(50) null)"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id) values (2)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA")).isNull();
        Assertions.assertThat(struct2.schema().field("DATA").schema().isOptional()).isTrue();
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnPrecisionAndScale() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric(8,2) not null)");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID, DATA) values (1, 12345.67)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo(BigDecimal.valueOf(12345.67d));
        Schema schema = struct.schema().field("DATA").schema();
        Assertions.assertThat((String) schema.parameters().get("scale")).isEqualTo("2");
        Assertions.assertThat((String) schema.parameters().get("connect.decimal.precision")).isEqualTo("8");
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data numeric(10,3))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (2, 234567.891)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord4, "ID", 2);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "TABLEA");
        Struct struct2 = ((Struct) sourceRecord4.value()).getStruct("after");
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("DATA")).isEqualTo(BigDecimal.valueOf(234567.891d));
        Schema schema2 = struct2.schema().field("DATA").schema();
        Assertions.assertThat((String) schema2.parameters().get("scale")).isEqualTo("3");
        Assertions.assertThat((String) schema2.parameters().get("connect.decimal.precision")).isEqualTo("10");
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldStreamDropTable() throws Exception {
        createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "TABLEA");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (1, 'Test')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "TABLEA");
        Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
        this.connection.execute(new String[]{"DROP TABLE debezium.tablea"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "TABLEA");
        List array2 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "DROP", "DEBEZIUM", "TABLEA");
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-2916"})
    public void shouldSnapshotAndStreamSchemaChangesUsingExplicitCasedNames() throws Exception {
        createTable("debezium.\"tableC\"", "CREATE TABLE debezium.\"tableC\" (\"id\" numeric(9,0) primary key, \"data\" varchar2(50))");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertSnapshotSchemaChange(sourceRecord);
        assertSourceTableInfo(sourceRecord, "DEBEZIUM", "tableC");
        List array = ((Struct) sourceRecord.value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "tableC");
        assertTableChangePrimaryKeyNames((Struct) array.get(0), "id");
        assertTableChangeColumn((Struct) array.get(0), 0, "id");
        assertTableChangeColumn((Struct) array.get(0), 1, "data");
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" add \"data2\" number(9,0)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord2);
        assertSourceTableInfo(sourceRecord2, "DEBEZIUM", "tableC");
        List array2 = ((Struct) sourceRecord2.value()).getArray("tableChanges");
        Assertions.assertThat(array2).hasSize(1);
        assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "tableC");
        assertTableChangePrimaryKeyNames((Struct) array2.get(0), "id");
        assertTableChangeColumn((Struct) array2.get(0), 0, "id");
        assertTableChangeColumn((Struct) array2.get(0), 1, "data");
        assertTableChangeColumn((Struct) array2.get(0), 2, "data2");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" add (\"data3\" number(9,0), \"data4\" varchar2(25))"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic3.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord3);
        assertSourceTableInfo(sourceRecord3, "DEBEZIUM", "tableC");
        List array3 = ((Struct) sourceRecord3.value()).getArray("tableChanges");
        Assertions.assertThat(array3).hasSize(1);
        assertTableChange((Struct) array3.get(0), "ALTER", "DEBEZIUM", "tableC");
        assertTableChangePrimaryKeyNames((Struct) array3.get(0), "id");
        assertTableChangeColumn((Struct) array3.get(0), 0, "id");
        assertTableChangeColumn((Struct) array3.get(0), 1, "data");
        assertTableChangeColumn((Struct) array3.get(0), 2, "data2");
        assertTableChangeColumn((Struct) array3.get(0), 3, "data3");
        assertTableChangeColumn((Struct) array3.get(0), 4, "data4");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" drop column \"data3\""});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic4.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord4);
        assertSourceTableInfo(sourceRecord4, "DEBEZIUM", "tableC");
        List array4 = ((Struct) sourceRecord4.value()).getArray("tableChanges");
        Assertions.assertThat(array4).hasSize(1);
        assertTableChange((Struct) array4.get(0), "ALTER", "DEBEZIUM", "tableC");
        assertTableChangePrimaryKeyNames((Struct) array4.get(0), "id");
        assertTableChangeColumn((Struct) array4.get(0), 0, "id");
        assertTableChangeColumn((Struct) array4.get(0), 1, "data");
        assertTableChangeColumn((Struct) array4.get(0), 2, "data2");
        assertTableChangeColumn((Struct) array4.get(0), 3, "data4");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" rename column \"data4\" to \"Data3\""});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic5.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord5);
        assertSourceTableInfo(sourceRecord5, "DEBEZIUM", "tableC");
        List array5 = ((Struct) sourceRecord5.value()).getArray("tableChanges");
        Assertions.assertThat(array5).hasSize(1);
        assertTableChange((Struct) array5.get(0), "ALTER", "DEBEZIUM", "tableC");
        assertTableChangePrimaryKeyNames((Struct) array5.get(0), "id");
        assertTableChangeColumn((Struct) array5.get(0), 0, "id");
        assertTableChangeColumn((Struct) array5.get(0), 1, "data");
        assertTableChangeColumn((Struct) array5.get(0), 2, "data2");
        assertTableChangeColumn((Struct) array5.get(0), 3, "Data3");
        this.connection.execute(new String[]{"DROP TABLE debezium.\"tableC\""});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic6.recordsForTopic(TestHelper.SERVER_NAME).get(0);
        assertStreamingSchemaChange(sourceRecord6);
        assertSourceTableInfo(sourceRecord6, "DEBEZIUM", "tableC");
        List array6 = ((Struct) sourceRecord6.value()).getArray("tableChanges");
        Assertions.assertThat(array6).hasSize(1);
        assertTableChange((Struct) array6.get(0), "DROP", "DEBEZIUM", "tableC");
        Assertions.assertThat(((Struct) array6.get(0)).getStruct("table")).isNull();
    }

    @Test
    @FixFor({"DBZ-2916"})
    @Ignore("Test can be flaky and cannot reproduce locally, ignoring to stablize test suite")
    public void shouldNotEmitDdlEventsForNonTableObjects() throws Exception {
        String str;
        LogInterceptor logInterceptor;
        try {
            LogInterceptor logInterceptor2 = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            LogInterceptor logInterceptor3 = new LogInterceptor(ErrorHandler.class);
            LogInterceptor logInterceptor4 = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
            LogInterceptor logInterceptor5 = new LogInterceptor(OpenLogReplicatorStreamingChangeEventSource.class);
            logInterceptor5.setLoggerLevel(OpenLogReplicatorStreamingChangeEventSource.class, Level.TRACE);
            TestHelper.grantRole("CREATE PROCEDURE");
            TestHelper.grantRole("ALTER ANY PROCEDURE");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"CREATE OR REPLACE FUNCTION mytestf() return number is x number(11,2); begin return x; END;"});
            this.connection.execute(new String[]{"DROP FUNCTION mytestf"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PROCEDURE mytest() BEGIN select * from dual; END;"});
            this.connection.execute(new String[]{"DROP PROCEDURE mytest"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PACKAGE pkgtest as function hire return number; END;"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PACKAGE BODY pkgtest as function hire return number; begin return 0; end;"});
            this.connection.execute(new String[]{"DROP PACKAGE pkgtest"});
            switch (AnonymousClass1.$SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[TestHelper.adapter().ordinal()]) {
                case 1:
                    str = "DDL: ";
                    logInterceptor = logInterceptor2;
                    break;
                case 2:
                    str = "Processing DDL event ";
                    logInterceptor = logInterceptor4;
                    break;
                case 3:
                    str = "Cannot process DDL";
                    logInterceptor = logInterceptor5;
                    break;
                default:
                    throw new IllegalStateException("Unexpected adapter: " + String.valueOf(TestHelper.adapter()));
            }
            LogInterceptor logInterceptor6 = logInterceptor;
            String str2 = str;
            Awaitility.await().atMost(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor6.countOccurrences(str2) == 7);
            });
            stopConnector();
            waitForConnectorShutdown(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(logInterceptor3.containsMessage("Producer failure")).as("Connector failure", new Object[0]).isFalse();
            assertNoRecordsToConsume();
            TestHelper.revokeRole("ALTER ANY PROCEDURE");
            TestHelper.revokeRole("CREATE PROCEDURE");
        } catch (Throwable th) {
            TestHelper.revokeRole("ALTER ANY PROCEDURE");
            TestHelper.revokeRole("CREATE PROCEDURE");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4037"})
    public void shouldParseSchemaChangeWithoutErrorOnFilteredTableWithRawDataType() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(CreateTableParserListener.class);
        LogInterceptor logInterceptor2 = new LogInterceptor(AlterTableParserListener.class);
        try {
            TestHelper.dropTable(this.connection, "dbz4037a");
            TestHelper.dropTable(this.connection, "dbz4037b");
            this.connection.execute(new String[]{"CREATE TABLE dbz4037a (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4037a");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4037A").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            String str = TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4037B";
            this.connection.execute(new String[]{"CREATE TABLE dbz4037b (id number(9,0), data raw(8), primary key(id))"});
            Awaitility.await().atMost(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage(getIgnoreCreateTable(str)));
            });
            this.connection.execute(new String[]{"ALTER TABLE dbz4037b ADD data2 raw(10)"});
            Awaitility.await().atMost(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor2.containsMessage(getIgnoreAlterTable(str)));
            });
            this.connection.execute(new String[]{"INSERT INTO dbz4037a (id,data) values (1, 'Test')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4037A"))).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4037A")).get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4037b");
            TestHelper.dropTable(this.connection, "dbz4037a");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4037b");
            TestHelper.dropTable(this.connection, "dbz4037a");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4037"})
    public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception {
        try {
            TestHelper.dropTable(this.connection, "dbz4037");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4037").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            this.connection.execute(new String[]{"CREATE TABLE dbz4037 (id number(9,0), data raw(8), name varchar(50), primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4037");
            this.connection.execute(new String[]{"ALTER TABLE dbz4037 ADD data2 raw(10)"});
            this.connection.prepareUpdate("INSERT INTO dbz4037 (id,data,name,data2) values (1,?,'Acme 123',?)", preparedStatement -> {
                preparedStatement.setBytes(1, "Test".getBytes());
                preparedStatement.setBytes(2, "T".getBytes());
            });
            this.connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4037"))).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4037")).get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo(ByteBuffer.wrap("Test".getBytes()));
            Assertions.assertThat(struct.get("DATA2")).isEqualTo(ByteBuffer.wrap("T".getBytes()));
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Acme 123");
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4037");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4037");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4782"})
    public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestart() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz4782");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"ALTER TABLE dbz4782 add data2 varchar2(50)"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(TestHelper.SERVER_NAME);
            Assertions.assertThat(recordsForTopic).hasSize(2);
            assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(0));
            List array = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array).hasSize(1);
            assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
            assertStreamingSchemaChange((SourceRecord) recordsForTopic.get(1));
            List array2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat(array2).hasSize(1);
            assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
            stopConnector();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            waitForAvailableRecords(20L, TimeUnit.SECONDS);
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4782");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4782");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4782"})
    public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestartWithFollowupDml() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782");
        try {
            createTable("dbz4782", "CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"ALTER TABLE dbz4782 add data2 varchar2(50)"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(TestHelper.SERVER_NAME);
            Assertions.assertThat(recordsForTopic).hasSize(2);
            assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(0));
            List array = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array).hasSize(1);
            assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
            assertStreamingSchemaChange((SourceRecord) recordsForTopic.get(1));
            List array2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat(array2).hasSize(1);
            assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
            stopConnector();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz4782 values (1, 'data1', 'data2')"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DEBEZIUM", "DBZ4782"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz4782");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4782");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4782"})
    public void shouldNotResendSchemaChangeWithInprogressTransactionOnSecondTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782a");
        TestHelper.dropTable(this.connection, "dbz4782b");
        try {
            createTable("dbz4782a", "CREATE TABLE dbz4782a (id numeric(9,0) primary key, data varchar2(50))");
            createTable("dbz4782b", "CREATE TABLE dbz4782b (id numeric(9,0) primary key, data varchar2(50))");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782[A|B]").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz4782b values (2, 'connection2')"});
                this.connection.execute(new String[]{"ALTER TABLE dbz4782a add data2 varchar2(50)"});
                List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(TestHelper.SERVER_NAME);
                Assertions.assertThat(recordsForTopic).hasSize(3);
                assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(0));
                List array = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getArray("tableChanges");
                Assertions.assertThat(array).hasSize(1);
                assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "DBZ4782A");
                assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(1));
                List array2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getArray("tableChanges");
                Assertions.assertThat(array2).hasSize(1);
                assertTableChange((Struct) array2.get(0), "CREATE", "DEBEZIUM", "DBZ4782B");
                assertStreamingSchemaChange((SourceRecord) recordsForTopic.get(2));
                List array3 = ((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getArray("tableChanges");
                Assertions.assertThat(array3).hasSize(1);
                assertTableChange((Struct) array3.get(0), "ALTER", "DEBEZIUM", "DBZ4782A");
                stopConnector();
                testConnection.commit();
                start(OracleConnector.class, build);
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                this.connection.execute(new String[]{"INSERT INTO dbz4782a values (1, 'data1', 'data2')"});
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                List allRecordsInOrder = consumeRecordsByTopic.allRecordsInOrder();
                PrintStream printStream = System.out;
                Objects.requireNonNull(printStream);
                allRecordsInOrder.forEach((v1) -> {
                    r1.println(v1);
                });
                List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4782A"));
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "ID", 1);
                List recordsForTopic3 = consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "DBZ4782B"));
                Assertions.assertThat(recordsForTopic3).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 2);
                if (testConnection != null) {
                    testConnection.close();
                }
                assertNoRecordsToConsume();
                TestHelper.dropTable(this.connection, "dbz4782a");
                TestHelper.dropTable(this.connection, "dbz4782b");
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4782a");
            TestHelper.dropTable(this.connection, "dbz4782b");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5285"})
    public void shouldOnlyCaptureSchemaChangesForIncludedTables() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5285a");
        TestHelper.dropTable(this.connection, "dbz5285b");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz5285a (id numeric(9,0) primary key, data varchar2(50))"});
            this.connection.execute(new String[]{"CREATE TABLE dbz5285b (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5285a");
            TestHelper.streamTable(this.connection, "dbz5285b");
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data) values (1, 'A')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data) values (2, 'B')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5285A").with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME);
            Assertions.assertThat(recordsForTopic).hasSize(1);
            assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(0));
            List array = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array).hasSize(1);
            assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "DBZ5285A");
            List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("A");
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"ALTER TABLE dbz5285a add data2 varchar2(50)"});
            this.connection.execute(new String[]{"ALTER TABLE dbz5285b add data2 varchar2(50)"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data,data2) values (3, 'A3', 'D1')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data,data2) values (4, 'B4', 'D2')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME);
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            assertStreamingSchemaChange((SourceRecord) recordsForTopic3.get(0));
            List array2 = ((Struct) ((SourceRecord) recordsForTopic3.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array2).hasSize(1);
            assertTableChange((Struct) array2.get(0), "ALTER", "DEBEZIUM", "DBZ5285A");
            List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat(recordsForTopic4).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "ID", 3);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic4.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("A3");
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo("D1");
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5285"})
    public void shouldCaptureSchemaChangesForAllTablesRegardlessOfIncludeList() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5285a");
        TestHelper.dropTable(this.connection, "dbz5285b");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz5285a (id numeric(9,0) primary key, data varchar2(50))"});
            this.connection.execute(new String[]{"CREATE TABLE dbz5285b (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5285a");
            TestHelper.streamTable(this.connection, "dbz5285b");
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data) values (1, 'A')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data) values (2, 'B')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5285A").with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, false).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME);
            Assertions.assertThat(recordsForTopic).hasSize(2);
            assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(0));
            List array = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array).hasSize(1);
            assertTableChange((Struct) array.get(0), "CREATE", "DEBEZIUM", "DBZ5285A");
            assertSnapshotSchemaChange((SourceRecord) recordsForTopic.get(1));
            List array2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat(array2).hasSize(1);
            assertTableChange((Struct) array2.get(0), "CREATE", "DEBEZIUM", "DBZ5285B");
            List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("A");
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"ALTER TABLE dbz5285a add data2 varchar2(50)"});
            this.connection.execute(new String[]{"ALTER TABLE dbz5285b add data2 varchar2(50)"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data,data2) values (3, 'A3', 'D1')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data,data2) values (4, 'B4', 'D2')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
            List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME);
            PrintStream printStream = System.out;
            Objects.requireNonNull(printStream);
            recordsForTopic3.forEach((v1) -> {
                r1.println(v1);
            });
            Assertions.assertThat(recordsForTopic3).hasSize(2);
            assertStreamingSchemaChange((SourceRecord) recordsForTopic3.get(0));
            List array3 = ((Struct) ((SourceRecord) recordsForTopic3.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat(array3).hasSize(1);
            assertTableChange((Struct) array3.get(0), "ALTER", "DEBEZIUM", "DBZ5285A");
            assertStreamingSchemaChange((SourceRecord) recordsForTopic3.get(1));
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic3.get(1)).value()).getArray("tableChanges")).isEmpty();
            assertSourceTableInfo((SourceRecord) recordsForTopic3.get(1), "DEBEZIUM", "DBZ5285B");
            List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat(recordsForTopic4).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "ID", 3);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic4.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("A3");
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo("D1");
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8089"})
    public void shouldResetScaleWhenColumnDataTypeIsModified() throws Exception {
        TestHelper.dropTable(this.connection, "dbz8089");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz8089 (id numeric(9,0) primary key, salary numeric(*,0))"});
            TestHelper.streamTable(this.connection, "dbz8089");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8089").with(OracleConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz8089 values (1, 12.36)"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(Double.valueOf(1.0d));
            Assertions.assertThat(struct.get("SALARY")).isEqualTo(Double.valueOf(12.0d));
            this.connection.execute(new String[]{"ALTER TABLE dbz8089 modify salary float"});
            this.connection.execute(new String[]{"INSERT INTO dbz8089 values (2, 12.36)"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(Double.valueOf(2.0d));
            Assertions.assertThat(struct2.get("SALARY")).isEqualTo(Double.valueOf(12.36d));
            stopConnector();
            TestHelper.dropTable(this.connection, "dbz8089");
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(this.connection, "dbz8089");
            throw th;
        }
    }

    private static String getTableIdString(String str, String str2) {
        return new TableId(TestHelper.getDatabaseName(), str, str2).toDoubleQuotedString();
    }

    private void createTable(String str, String str2) throws SQLException {
        this.connection.execute(new String[]{str2});
        TestHelper.streamTable(this.connection, str);
    }

    private static void assertSnapshotSchemaChange(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.topic()).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(((Struct) sourceRecord.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
    }

    private static void assertStreamingSchemaChange(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.topic()).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(((Struct) sourceRecord.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isNull();
    }

    private static void assertTableChange(Struct struct, String str, String str2, String str3) {
        Assertions.assertThat(struct.get("type")).isEqualTo(str);
        Assertions.assertThat(struct.get("id")).isEqualTo(getTableIdString(str2, str3));
    }

    private static void assertTableChangePrimaryKeyNames(Struct struct, String... strArr) {
        Assertions.assertThat(struct.getStruct("table").getArray("primaryKeyColumnNames")).isEqualTo(Arrays.asList(strArr));
    }

    private static void assertTableChangeColumn(Struct struct, int i, String str) {
        List array = struct.getStruct("table").getArray("columns");
        Assertions.assertThat(array.size()).isGreaterThan(i);
        Assertions.assertThat(((Struct) array.get(i)).get("name")).isEqualTo(str);
    }

    private static void assertSourceTableInfo(SourceRecord sourceRecord, String str, String str2) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        Assertions.assertThat(struct.get("db")).isEqualTo(TestHelper.getDatabaseName());
        Assertions.assertThat(struct.get("schema")).isEqualTo(str);
        Assertions.assertThat(struct.get("table")).isEqualTo(str2);
    }

    private static String topicName(String str, String str2) {
        return "server1." + str + "." + str2;
    }

    private static String getIgnoreCreateTable(String str) {
        return "Ignoring CREATE TABLE statement for non-captured table " + str;
    }

    private static String getIgnoreAlterTable(String str) {
        return "Ignoring ALTER TABLE statement for non-captured table " + str;
    }
}
