package io.debezium.connector.oracle;

import ch.qos.logback.classic.Level;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipOnDatabaseOption;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnStrategyRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenLogMiningStrategyIs;
import io.debezium.connector.oracle.logminer.LogMinerAdapter;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.management.JMException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/OracleConnectorIT.class */
public class OracleConnectorIT extends AbstractAsyncEngineConnectorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorIT.class);
    private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
    private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
    private static final String ERROR_PROCESSING_FAIL_MESSAGE = "Oracle LogMiner is unable to re-construct the SQL for '";
    private static final String ERROR_PROCESSING_WARN_MESSAGE = "cannot be parsed. This event will be ignored and skipped.";

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();

    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();

    @Rule
    public final TestRule skipStrategyRule = new SkipTestDependingOnStrategyRule();
    private static OracleConnection connection;

    /* renamed from: io.debezium.connector.oracle.OracleConnectorIT$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/oracle/OracleConnectorIT$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter = new int[OracleConnectorConfig.ConnectorAdapter.values().length];

        static {
            try {
                $SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[OracleConnectorConfig.ConnectorAdapter.XSTREAM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropAllTables();
        TestHelper.dropTable(connection, "debezium.customer");
        TestHelper.dropTable(connection, "debezium.masked_hashed_column_table");
        TestHelper.dropTable(connection, "debezium.truncated_column_table");
        TestHelper.dropTable(connection, "debezium.dt_table");
        connection.execute(new String[]{"create table debezium.customer (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.customer to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table debezium.masked_hashed_column_table (  id numeric(9,0) not null,   name varchar2(255),   name2 varchar2(255),   name3 varchar2(20),  primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.masked_hashed_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.masked_hashed_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table debezium.truncated_column_table (  id numeric(9,0) not null,   name varchar2(20),   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.truncated_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.truncated_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table dt_table (  id numeric(9,0) not null,   c1 int,   c2 int,   c3a numeric(5,2),   c3b varchar(128),   f1 float(10),   f2 decimal(8,4),   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.dt_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.dt_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.customer2");
            TestHelper.dropTable(connection, "customer");
            TestHelper.dropTable(connection, "masked_hashed_column_table");
            TestHelper.dropTable(connection, "truncated_column_table");
            TestHelper.dropTable(connection, "dt_table");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        connection.execute(new String[]{"delete from debezium.customer"});
        connection.execute(new String[]{"delete from debezium.masked_hashed_column_table"});
        connection.execute(new String[]{"delete from debezium.truncated_column_table"});
        connection.execute(new String[]{"delete from debezium.dt_table"});
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    @FixFor({"DBZ-2452"})
    public void shouldSnapshotAndStreamWithHyphenedTableName() throws Exception {
        TestHelper.dropTable(connection, "debezium.\"my-table\"");
        try {
            connection.execute(new String[]{"create table \"my-table\" ( id numeric(9,0) not null,  c1 int,  c2 varchar(128),  primary key (id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE").with(OracleConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, "avro").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.my-table");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("C1")).isEqualTo(BigDecimal.valueOf(25L));
            Assertions.assertThat(struct.get("C2")).isEqualTo("Test");
            Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
            Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("C1")).isEqualTo(BigDecimal.valueOf(50L));
            Assertions.assertThat(struct2.get("C2")).isEqualTo("Test2");
            TestHelper.dropTable(connection, "debezium.\"my-table\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.\"my-table\"");
            throw th;
        }
    }

    @Test
    public void shouldTakeSnapshot() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("source")).get("snapshot")).isEqualTo("first");
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct2.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct2.get("REGISTERED")).isNull();
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("source")).get("snapshot")).isEqualTo("last");
    }

    @Test
    @FixFor({"DBZ-6276"})
    @Ignore("Requires database to be configured without ARCHIVELOG_MODE enabled; which conflicts with dbz-oracle images")
    public void shouldSkipCheckingArchiveLogIfNoCdc() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL_ONLY).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.SKIP).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        LogInterceptor logInterceptor = new LogInterceptor(OracleConnectorTask.class);
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        stopConnector();
        Assertions.assertThat(logInterceptor.containsWarnMessage("Failed the archive log check but continuing as redo log isn't strictly required")).isTrue();
    }

    @Test
    public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
        continueStreamingAfterSnapshot(TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build());
    }

    private void continueStreamingAfterSnapshot(Configuration configuration) throws Exception {
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, configuration);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
        Assertions.assertThat(struct.get("snapshot")).isEqualTo("first");
        Assertions.assertThat(struct.get("scn")).isNotNull();
        Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct.get("version")).isNotNull();
        Assertions.assertThat(struct.get("txId")).isNull();
        Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("after")).get("ID")).isEqualTo(2);
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("source")).get("snapshot")).isEqualTo("last");
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Brian', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i2 = 0 + 1;
        List recordsForTopic2 = consumeRecordsByTopic(i2).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic2).hasSize(i2);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord3.value()).get("after")).get("ID")).isEqualTo(3);
        Assertions.assertThat(sourceRecord3.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat(sourceRecord3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
        Struct struct2 = (Struct) ((Struct) sourceRecord3.value()).get("source");
        Assertions.assertThat(struct2.get("snapshot")).isEqualTo("false");
        Assertions.assertThat(struct2.get("scn")).isNotNull();
        Assertions.assertThat(struct2.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct2.get("version")).isNotNull();
        Assertions.assertThat(struct2.get("txId")).isNotNull();
        Assertions.assertThat(struct2.get("ts_ms")).isNotNull();
    }

    @Test
    @FixFor({"DBZ-1223"})
    public void shouldStreamTransaction() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
        Assertions.assertThat(struct.get("snapshot")).isEqualTo("first");
        Assertions.assertThat(struct.get("scn")).isNotNull();
        Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct.get("version")).isNotNull();
        Assertions.assertThat(struct.get("txId")).isNull();
        Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("after")).get("ID")).isEqualTo(2);
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        connection.setAutoCommit(false);
        sendTxBatch(build, 30, 100);
        sendTxBatch(build, 30, 200);
    }

    private void sendTxBatch(Configuration configuration, int i, int i2) throws SQLException, InterruptedException {
        boolean z = false;
        if (connection.connection().getAutoCommit()) {
            z = true;
            connection.connection().setAutoCommit(false);
        }
        for (int i3 = i2; i3 < i + i2; i3++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i3), Integer.valueOf(i3))});
        }
        connection.connection().commit();
        if (z) {
            connection.connection().setAutoCommit(true);
        }
        assertTxBatch(configuration, i, i2);
    }

    private void assertTxBatch(Configuration configuration, int i, int i2) throws InterruptedException {
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        OracleConnectorConfig.ConnectorAdapter adapter = TestHelper.getAdapter(configuration);
        for (int i3 = 0; i3 < i; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            VerifyRecord.isValidInsert(sourceRecord, "ID", i3 + i2);
            Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(Integer.valueOf(i3 + i2));
            Assertions.assertThat(sourceRecord.sourceOffset().containsKey("snapshot")).isFalse();
            Assertions.assertThat(sourceRecord.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
            if (OracleConnectorConfig.ConnectorAdapter.XSTREAM == adapter) {
                Assertions.assertThat(sourceRecord.sourceOffset().containsKey("lcr_position")).isTrue();
                Assertions.assertThat(sourceRecord.sourceOffset().containsKey("scn")).isFalse();
            }
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
            Assertions.assertThat(struct.get("snapshot")).isEqualTo("false");
            Assertions.assertThat(struct.get("scn")).isNotNull();
            if (OracleConnectorConfig.ConnectorAdapter.XSTREAM == adapter) {
                Assertions.assertThat(struct.get("lcr_position")).isNotNull();
            }
            Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
            Assertions.assertThat(struct.get("version")).isNotNull();
            Assertions.assertThat(struct.get("txId")).isNotNull();
            Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        }
    }

    @Test
    public void shouldStreamAfterRestart() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Assertions.assertThat(consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER")).hasSize(i);
        connection.setAutoCommit(false);
        sendTxBatch(build, 30, 100);
        sendTxBatch(build, 30, 200);
        stopConnector();
        for (int i2 = 300; i2 < 30 + 300; i2++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i2), Integer.valueOf(i2))});
        }
        connection.connection().commit();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        assertTxBatch(build, 30, 300);
        sendTxBatch(build, 30, 400);
        sendTxBatch(build, 30, 500);
    }

    @Test
    public void shouldStreamAfterRestartAfterSnapshot() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER")).hasSize(i);
        stopConnector();
        connection.setAutoCommit(false);
        for (int i2 = 100; i2 < i + 100; i2++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i2), Integer.valueOf(i2))});
        }
        connection.connection().commit();
        try {
            connection.setAutoCommit(true);
            Testing.print("=== Starting connector second time ===");
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            assertTxBatch(build, i, 100);
            sendTxBatch(build, i, 200);
            connection.setAutoCommit(false);
        } catch (Throwable th) {
            connection.setAutoCommit(false);
            throw th;
        }
    }

    @Test
    public void shouldReadChangeStreamForExistingTable() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"UPDATE debezium.customer SET name = 'Bruce', score = 2345.67, registered = TO_DATE('2018-03-23', 'yyyy-mm-dd') WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"UPDATE debezium.customer SET id = 2 WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 2"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 1 + 1 + 3 + 2;
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 1);
        Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Map sourceOffset = ((SourceRecord) recordsForTopic.get(0)).sourceOffset();
        Assertions.assertThat(sourceOffset.get("snapshot")).isNull();
        Assertions.assertThat(sourceOffset.get(SNAPSHOT_COMPLETED_KEY)).isNull();
        VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic.get(1), "ID", 1);
        Struct struct2 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).get("before");
        Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct2.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct2.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct2.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Struct struct3 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).get("after");
        Assertions.assertThat(struct3.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct3.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct3.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct3.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(2), "ID", 1);
        Struct struct4 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).get("before");
        Assertions.assertThat(struct4.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct4.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct4.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct4.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic.get(3));
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(4), "ID", 2);
        Struct struct5 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(4)).value()).get("after");
        Assertions.assertThat(struct5.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct5.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct5.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct5.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(5), "ID", 2);
        Struct struct6 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(5)).value()).get("before");
        Assertions.assertThat(struct6.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct6.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct6.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct6.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic.get(6));
    }

    @Test
    @FixFor({"DBZ-835"})
    public void deleteWithoutTombstone() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.TOMBSTONES_ON_DELETE, false).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 1 + 1 + 1;
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(1), "ID", 1);
        Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("before");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(2), "ID", 2);
    }

    @Test
    public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
        TestHelper.dropTable(connection, "debezium.customer2");
        try {
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER2").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"create table debezium.customer2 (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))"});
            TestHelper.streamTable(connection, "debezium.customer2");
            connection.execute(new String[]{"INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER2");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 2);
            Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
            Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
            Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
            TestHelper.dropTable(connection, "debezium.customer2");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.customer2");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-800"})
    public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throws Exception {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        start(OracleConnector.class, TestHelper.defaultConfig().with(Heartbeat.HEARTBEAT_INTERVAL, "1").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ800B").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800a (id NUMBER(9) NOT NULL, aaa VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800b (id NUMBER(9) NOT NULL, bbb VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800a VALUES (1, 'AAA')"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800b VALUES (2, 'BBB')"});
        connection.execute(new String[]{"COMMIT"});
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            if (atomicReference.get() == null) {
                atomicReference.set(consumeRecordsByTopic(1));
            } else {
                List allRecordsInOrder = consumeRecordsByTopic(1).allRecordsInOrder();
                AbstractConnectorTest.SourceRecords sourceRecords = (AbstractConnectorTest.SourceRecords) atomicReference.get();
                Objects.requireNonNull(sourceRecords);
                allRecordsInOrder.forEach(sourceRecords::add);
            }
            return Boolean.valueOf(((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B") != null);
        });
        List recordsForTopic = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("__debezium-heartbeat.server1");
        List recordsForTopic2 = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800A");
        List recordsForTopic3 = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B");
        Assertions.assertThat(recordsForTopic).isNotEmpty();
        Assertions.assertThat(recordsForTopic2).isNull();
        Assertions.assertThat(recordsForTopic3).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 2);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithDatabaseName() throws Exception {
        shouldConsumeEventsWithMaskedAndTruncatedColumns(true);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithoutDatabaseName() throws Exception {
        shouldConsumeEventsWithMaskedAndTruncatedColumns(false);
    }

    public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean z) throws Exception {
        Configuration build;
        if (z) {
            String databaseName = TestHelper.getDatabaseName();
            build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with("column.mask.with.12.chars", databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2," + databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", databaseName + ".DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        } else {
            build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with("column.mask.with.12.chars", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2,DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", "DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        }
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"});
        connection.execute(new String[]{"INSERT INTO debezium.truncated_column_table VALUES(11, 'some_name')"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TRUNCATED_COLUMN_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 10);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Struct struct2 = struct.getStruct("after");
            Assertions.assertThat(struct2.getString("NAME")).isEqualTo("************");
            Assertions.assertThat(struct2.getString("NAME2")).isEqualTo("8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
            Assertions.assertThat(struct2.getString("NAME3")).isEqualTo("8e68c68edbbac316dfe2");
        }
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 11);
        Struct struct3 = (Struct) sourceRecord2.value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("NAME")).isEqualTo("some");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldRewriteIdentityKeyWithDatabaseName() throws Exception {
        shouldRewriteIdentityKey(true);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldRewriteIdentityKeyWithoutDatabaseName() throws Exception {
        shouldRewriteIdentityKey(false);
    }

    private void shouldRewriteIdentityKey(boolean z) throws Exception {
        start(OracleConnector.class, z ? TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).debezium.customer:id,name").build() : TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "debezium.customer:id,name").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Nest', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(((SourceRecord) recordsForTopic.get(0)).key()).isNotNull();
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).key();
        Assertions.assertThat(struct.get("ID")).isNotNull();
        Assertions.assertThat(struct.get("NAME")).isNotNull();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"});
        connection.execute(new String[]{"COMMIT"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DT_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Field field = ((SourceRecord) recordsForTopic.get(0)).valueSchema().field("before");
        Assertions.assertThat(field.schema().field("ID").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "9"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "38"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "38"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C3A").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "5"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "2")});
        Assertions.assertThat(field.schema().field("C3B").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR2"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "128")});
        Assertions.assertThat(field.schema().field("F2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "4")});
        Assertions.assertThat(field.schema().field("F1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "FLOAT"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "10")});
    }

    @Test
    @FixFor({"DBZ-4385"})
    public void shouldTruncate() throws Exception {
        TestHelper.dropTable(connection, "debezium.truncate_ddl");
        try {
            connection.execute(new String[]{"create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))"});
            TestHelper.streamTable(connection, "debezium.truncate_ddl");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TRUNCATE_DDL").with(OracleConnectorConfig.SKIPPED_OPERATIONS, "none").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL")).hasSize(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"TRUNCATE TABLE debezium.truncate_ddl"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getString("op")).isEqualTo("t");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (2, 'Roadrunner')"});
            connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getString("op")).isEqualTo("c");
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4385"})
    public void shouldNotTruncateWhenSkipped() throws Exception {
        TestHelper.dropTable(connection, "debezium.truncate_ddl");
        try {
            connection.execute(new String[]{"create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))"});
            TestHelper.streamTable(connection, "debezium.truncate_ddl");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TRUNCATE_DDL").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL")).hasSize(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"TRUNCATE TABLE debezium.truncate_ddl"});
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (2, 'Roadrunner')"});
            connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getString("op")).isEqualTo("c");
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
            throw th;
        }
    }

    @FixFor({"DBZ-1539"})
    public void shouldHandleIntervalTypesAsInt64() throws Exception {
        TestHelper.dropTable(connection, "debezium.interval");
        try {
            connection.execute(new String[]{"create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))"});
            TestHelper.streamTable(connection, "debezium.interval");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat(consumeRecordsByTopic.topics()).contains(new String[]{"server1.DEBEZIUM.INTERVAL"});
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.getInt64("INTYM")).isEqualTo(63115200000000L);
            Assertions.assertThat(struct.getInt64("INTYM2")).isEqualTo(17524987200000000L);
            Assertions.assertThat(struct.getInt64("INTDS")).isEqualTo(259200000000L);
            Assertions.assertThat(struct.getInt64("INTDS2")).isEqualTo(9627503444333L);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.getInt64("INTYM")).isEqualTo(0L);
            Assertions.assertThat(struct2.getInt64("INTYM2")).isEqualTo(0L);
            Assertions.assertThat(struct2.getInt64("INTDS")).isEqualTo(0L);
            Assertions.assertThat(struct2.getInt64("INTDS2")).isEqualTo(0L);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat(consumeRecordsByTopic2.topics()).contains(new String[]{"server1.DEBEZIUM.INTERVAL"});
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat(recordsForTopic2).hasSize(2);
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct3.getInt64("INTYM")).isEqualTo(63115200000000L);
            Assertions.assertThat(struct3.getInt64("INTYM2")).isEqualTo(17524987200000000L);
            Assertions.assertThat(struct3.getInt64("INTDS")).isEqualTo(259200000000L);
            Assertions.assertThat(struct3.getInt64("INTDS2")).isEqualTo(9627503444333L);
            Struct struct4 = ((Struct) ((SourceRecord) recordsForTopic2.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct4.get("ID")).isEqualTo(4);
            Assertions.assertThat(struct4.getInt64("INTYM")).isEqualTo(0L);
            Assertions.assertThat(struct4.getInt64("INTYM2")).isEqualTo(0L);
            Assertions.assertThat(struct4.getInt64("INTDS")).isEqualTo(0L);
            Assertions.assertThat(struct4.getInt64("INTDS2")).isEqualTo(0L);
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "debezium.interval");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.interval");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1539"})
    public void shouldHandleIntervalTypesAsString() throws Exception {
        TestHelper.dropTable(connection, "debezium.interval");
        try {
            connection.execute(new String[]{"create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))"});
            TestHelper.streamTable(connection, "debezium.interval");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (3, INTERVAL '-1' YEAR, INTERVAL '-1' MONTH, INTERVAL '-1' DAY, INTERVAL '-7 5:12:10.0123' DAY(1) TO SECOND)"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").with(OracleConnectorConfig.INTERVAL_HANDLING_MODE, OracleConnectorConfig.IntervalHandlingMode.STRING.getValue()).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(3);
            Assertions.assertThat(consumeRecordsByTopic.topics()).contains(new String[]{"server1.DEBEZIUM.INTERVAL"});
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat(recordsForTopic).hasSize(3);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S");
            Assertions.assertThat(struct.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S");
            Assertions.assertThat(struct.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S");
            Assertions.assertThat(struct.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S");
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct2.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct2.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct2.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S");
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct3.getString("INTYM")).isEqualTo("P-1Y0M0DT0H0M0S");
            Assertions.assertThat(struct3.getString("INTYM2")).isEqualTo("P0Y-1M0DT0H0M0S");
            Assertions.assertThat(struct3.getString("INTDS")).isEqualTo("P0Y0M-1DT0H0M0S");
            Assertions.assertThat(struct3.getString("INTDS2")).isEqualTo("P0Y0M-7DT-5H-12M-10.0123S");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (4, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (5, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (6, INTERVAL '-1' YEAR, INTERVAL '-1' MONTH, INTERVAL '-1' DAY, INTERVAL '-7 5:12:10.0123' DAY(1) TO SECOND)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
            Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(3);
            Assertions.assertThat(consumeRecordsByTopic2.topics()).contains(new String[]{"server1.DEBEZIUM.INTERVAL"});
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat(recordsForTopic2).hasSize(3);
            Struct struct4 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct4.get("ID")).isEqualTo(4);
            Assertions.assertThat(struct4.getString("INTYM")).isEqualTo("P2Y0M0DT0H0M0S");
            Assertions.assertThat(struct4.getString("INTYM2")).isEqualTo("P555Y4M0DT0H0M0S");
            Assertions.assertThat(struct4.getString("INTDS")).isEqualTo("P0Y0M3DT0H0M0S");
            Assertions.assertThat(struct4.getString("INTDS2")).isEqualTo("P0Y0M111DT10H9M563.444333S");
            Struct struct5 = ((Struct) ((SourceRecord) recordsForTopic2.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct5.get("ID")).isEqualTo(5);
            Assertions.assertThat(struct5.getString("INTYM")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct5.getString("INTYM2")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct5.getString("INTDS")).isEqualTo("P0Y0M0DT0H0M0S");
            Assertions.assertThat(struct5.getString("INTDS2")).isEqualTo("P0Y0M0DT0H0M0S");
            Struct struct6 = ((Struct) ((SourceRecord) recordsForTopic2.get(2)).value()).getStruct("after");
            Assertions.assertThat(struct6.get("ID")).isEqualTo(6);
            Assertions.assertThat(struct6.getString("INTYM")).isEqualTo("P-1Y0M0DT0H0M0S");
            Assertions.assertThat(struct6.getString("INTYM2")).isEqualTo("P0Y-1M0DT0H0M0S");
            Assertions.assertThat(struct6.getString("INTDS")).isEqualTo("P0Y0M-1DT0H0M0S");
            Assertions.assertThat(struct6.getString("INTDS2")).isEqualTo("P0Y0M-7DT-5H-12M-10.0123S");
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "debezium.interval");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.interval");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2624"})
    public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception {
        if (VerifyRecord.isApucurioAvailable()) {
            skipAvroValidation();
        }
        TestHelper.dropTable(connection, "debezium.complex_ddl");
        try {
            connection.execute(new String[]{"create table debezium.complex_ddl ( id numeric(6) constraint customers_id_nn not null,  name varchar2(100), value numeric default 1,  constraint customers_pk primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.complex_ddl to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.complex_ddl ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COMPLEX_DDL").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name)values (2, 'Acme2')"});
            connection.commit();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
            TestHelper.dropTable(connection, "debezium.complex_ddl");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.complex_ddl");
            throw th;
        }
    }

    @Test
    @SkipOnDatabaseOption(value = "Partitioning", enabled = false)
    @FixFor({"DBZ-2683"})
    public void shouldSnapshotAndStreamChangesFromPartitionedTable() throws Exception {
        TestHelper.dropTable(connection, "players");
        try {
            connection.execute(new String[]{"CREATE TABLE players (id NUMERIC(6), name VARCHAR(100), birth_date DATE,primary key(id)) PARTITION BY RANGE (birth_date) (PARTITION p2019 VALUES LESS THAN (TO_DATE('2020-01-01', 'yyyy-mm-dd')), PARTITION p2020 VALUES LESS THAN (TO_DATE('2021-01-01', 'yyyy-mm-dd')))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', TO_DATE('2019-05-01', 'yyyy-mm-dd'))"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', TO_DATE('2019-06-26', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', TO_DATE('2020-11-01', 'yyyy-mm-dd'))"});
            connection.commit();
            Assertions.assertThat(consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2);
            TestHelper.dropTable(connection, "players");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "players");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2849"})
    public void shouldAvroSerializeColumnsWithSpecialCharacters() throws Exception {
        TestHelper.dropTable(connection, "columns_test");
        try {
            connection.execute(new String[]{"CREATE TABLE columns_test (id NUMERIC(6), amount$ number not null, primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.columns_test to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.columns_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (1, 12345.67)"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COLUMNS_TEST").with(OracleConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.getInt32("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("AMOUNT_")).isEqualTo(VariableScaleDecimal.fromLogical(struct.schema().field("AMOUNT_").schema(), BigDecimal.valueOf(12345.67d)));
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (2, 23456.78)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.getInt32("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("AMOUNT_")).isEqualTo(VariableScaleDecimal.fromLogical(struct2.schema().field("AMOUNT_").schema(), BigDecimal.valueOf(23456.78d)));
            TestHelper.dropTable(connection, "columns_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "columns_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2825"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Tests archive log support for LogMiner only")
    public void testArchiveLogScnBoundariesAreIncluded() throws Exception {
        TestHelper.dropTable(connection, "alog_test");
        try {
            connection.execute(new String[]{"CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')"});
            connection.commit();
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0)).value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(BigDecimal.valueOf(1L));
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Test");
            stopConnector();
            TestHelper.forceFlushOfRedoLogsToArchiveLogs();
            start(OracleConnector.class, build);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) values (2, 'Home')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0)).value()).get("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(BigDecimal.valueOf(2L));
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Home");
            TestHelper.dropTable(connection, "alog_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "alog_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2784"})
    public void shouldConvertDatesSpecifiedAsStringInSQL() throws Exception {
        try {
            TestHelper.dropTable(connection, "orders");
            connection.execute(new String[]{"CREATE TABLE orders (id NUMERIC(6), order_date date not null,primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.orders TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (9, '22-FEB-2018')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.orders").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(9);
            Assertions.assertThat(struct.get("ORDER_DATE")).isEqualTo(1519257600000L);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (10, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(10);
            Assertions.assertThat(struct2.get("ORDER_DATE")).isEqualTo(1519257600000L);
            TestHelper.dropTable(connection, "orders");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "orders");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2733"})
    public void shouldConvertNumericAsStringDecimalHandlingMode() throws Exception {
        TestHelper.dropTable(connection, "table_number_pk");
        try {
            connection.execute(new String[]{"CREATE TABLE table_number_pk (id NUMBER, name varchar2(255), age number, primary key (id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.table_number_pk TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.table_number_pk ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (1, 'Bob', 25)"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.table_number_pk").with(OracleConnectorConfig.DECIMAL_HANDLING_MODE, "string").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            assertRecordSchemaAndValues(Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, "1"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, "Bob"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, "25")), (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0), "after");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (2, 'Sue', 30)"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            assertRecordSchemaAndValues(Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, "2"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, "Sue"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, "30")), (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0), "after");
            TestHelper.dropTable(connection, "table_number_pk");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "table_number_pk");
            throw th;
        }
    }

    protected void assertRecordSchemaAndValues(List<SchemaAndValueField> list, SourceRecord sourceRecord, String str) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct(str);
        if (list == null) {
            Assertions.assertThat(struct).isNull();
        } else {
            Assertions.assertThat(struct).as("expected there to be content in Envelope under " + str, new Object[0]).isNotNull();
            list.forEach(schemaAndValueField -> {
                schemaAndValueField.assertFor(struct);
            });
        }
    }

    @Test
    @FixFor({"DBZ-2920"})
    public void shouldStreamDdlThatExceeds4000() throws Exception {
        TestHelper.dropTable(connection, "large_dml");
        connection.execute(new String[]{"CREATE TABLE large_dml (id NUMERIC(6), value varchar2(4000), value2 varchar2(4000), primary key(id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.large_dml TO " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.large_dml ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String generateAlphaNumericStringColumn = generateAlphaNumericStringColumn(4000);
        String generateAlphaNumericStringColumn2 = generateAlphaNumericStringColumn(4000);
        connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (1, '" + generateAlphaNumericStringColumn + "', '" + generateAlphaNumericStringColumn2 + "')"});
        connection.commit();
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.large_dml").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).build());
        assertNoRecordsToConsume();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(1);
        Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.LARGE_DML").get(0)).value()).getStruct("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("VALUE")).isEqualTo(generateAlphaNumericStringColumn);
        Assertions.assertThat(struct.get("VALUE2")).isEqualTo(generateAlphaNumericStringColumn2);
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(generateAlphaNumericStringColumn(4000));
            arrayList2.add(generateAlphaNumericStringColumn(4000));
            connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (" + (2 + i) + ", '" + ((String) arrayList.get(arrayList.size() - 1)) + "', '" + ((String) arrayList2.get(arrayList2.size() - 1)) + "')"});
        }
        connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(10);
        List recordsForTopic = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.LARGE_DML");
        for (int i2 = 0; i2 < 10; i2++) {
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(i2)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(Integer.valueOf(2 + i2));
            Assertions.assertThat(struct2.get("VALUE")).isEqualTo(arrayList.get(i2));
            Assertions.assertThat(struct2.get("VALUE2")).isEqualTo(arrayList2.get(i2));
        }
        stopConnector(z -> {
            TestHelper.dropTable(connection, "large_dml");
        });
    }

    @Test
    @FixFor({"DBZ-2891"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.XSTREAM, reason = "Only applies to Xstreams")
    public void shouldNotObserveDeadlockWhileStreamingWithXstream() throws Exception {
        long j = this.pollTimeoutInMs;
        TestHelper.dropTable(connection, "deadlock_test");
        try {
            connection.execute(new String[]{"CREATE TABLE deadlock_test (id numeric(9), name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.deadlock_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.deadlock_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            this.pollTimeoutInMs = TimeUnit.SECONDS.toMillis(20L);
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.deadlock_test").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.MAX_QUEUE_SIZE, 2).with(RelationalDatabaseConnectorConfig.MAX_BATCH_SIZE, 1).build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 0; i < 10; i++) {
                connection.execute(new String[]{"INSERT INTO deadlock_test (id, name) values (" + i + ", 'Test " + i + "')"});
                connection.execute(new String[]{"COMMIT"});
            }
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10, 24);
            Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DEADLOCK_TEST")).hasSize(10);
            this.pollTimeoutInMs = j;
            TestHelper.dropTable(connection, "deadlock_test");
        } catch (Throwable th) {
            this.pollTimeoutInMs = j;
            TestHelper.dropTable(connection, "deadlock_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3057"})
    public void shouldReadTableUniqueIndicesWithCharactersThatRequireExplicitQuotes() throws Exception {
        try {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
            connection.execute(new String[]{"CREATE GLOBAL TEMPORARY TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" (id number, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"#T70_Sid:582003931_1_ConnConne\" TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.\\#T70_Sid\\:582003931_1_ConnConne").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3151"})
    public void testSnapshotCompletesWithSystemGeneratedUniqueIndexOnKeylessTable() throws Exception {
        TestHelper.dropTable(connection, "XML_TABLE");
        try {
            connection.execute(new String[]{"CREATE TABLE XML_TABLE of XMLTYPE"});
            connection.execute(new String[]{"GRANT SELECT ON DEBEZIUM.XML_TABLE TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE DEBEZIUM.XML_TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO DEBEZIUM.XML_TABLE values (xmltype('<?xml version=\"1.0\"?><tab><name>Hi</name></tab>'))"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.XML_TABLE").build());
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            TestHelper.dropTable(connection, "XML_TABLE");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "XML_TABLE");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3001"})
    public void shouldGetOracleDatabaseVersion() throws Exception {
        OracleDatabaseVersion oracleVersion = connection.getOracleVersion();
        Assertions.assertThat(oracleVersion).isNotNull();
        Assertions.assertThat(oracleVersion.getMajor()).isGreaterThan(0);
    }

    @Test
    @FixFor({"DBZ-3109"})
    public void shouldStreamChangesForTableWithMultipleLogGroupTypes() throws Exception {
        try {
            TestHelper.dropTable(connection, "log_group_test");
            connection.execute(new String[]{"CREATE TABLE log_group_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.log_group_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.LOG_GROUP_TEST").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.log_group_test (id, name) values (1,'Test')"});
            connection.execute(new String[]{"COMMIT"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.LOG_GROUP_TEST")).hasSize(1);
            TestHelper.dropTable(connection, "log_group_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "log_group_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2875"})
    public void shouldResumeStreamingAtCorrectScnOffset() throws Exception {
        TestHelper.dropTable(connection, "offset_test");
        try {
            Testing.Debug.enable();
            connection.execute(new String[]{"CREATE TABLE offset_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.offset_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.offset_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.OFFSET_TEST").build();
            start(OracleConnector.class, build);
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (1, 'Bob')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print(struct);
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Bob");
            stopConnector();
            start(OracleConnector.class, build);
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (2, 'Bill')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print(struct2);
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Bill");
            TestHelper.dropTable(connection, "offset_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "offset_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3036"})
    @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "IOT tables are skipped")
    public void shouldHandleParentChildIndexOrganizedTables() throws Exception {
        TestHelper.dropTable(connection, "test_iot");
        try {
            connection.execute(new String[]{"CREATE TABLE test_iot (id numeric(9,0), description varchar2(50) not null, primary key(id)) ORGANIZATION INDEX INCLUDING description OVERFLOW"});
            TestHelper.streamTable(connection, "debezium.test_iot");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('1', 'Hello World')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "(.)*IOT(.)*").with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Assertions.assertThat(struct.get("DESCRIPTION")).isEqualTo("Hello World");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('2', 'Goodbye')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Assertions.assertThat(struct2.get("DESCRIPTION")).isEqualTo("Goodbye");
            TestHelper.dropTable(connection, "test_iot");
            TestHelper.purgeRecycleBin(connection);
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "test_iot");
            TestHelper.purgeRecycleBin(connection);
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-3257"})
    public void shouldSnapshotAndStreamClobDataTypes() throws Exception {
        TestHelper.dropTable(connection, "clob_test");
        try {
            connection.execute(new String[]{"CREATE TABLE clob_test(id numeric(9,0) primary key, val_clob clob, val_nclob nclob)"});
            TestHelper.streamTable(connection, "clob_test");
            connection.execute(new String[]{"INSERT INTO clob_test values (1, 'TestClob', 'TestNClob')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
            Assertions.assertThat(struct.get("VAL_CLOB")).isEqualTo("TestClob");
            Assertions.assertThat(struct.get("VAL_NCLOB")).isEqualTo("TestNClob");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"UPDATE clob_test SET val_clob = 'TestClob2', val_nclob = 'TestNClob2' WHERE ID = 1"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).get("after");
            Assertions.assertThat(struct2.get("VAL_CLOB")).isEqualTo("TestClob2");
            Assertions.assertThat(struct2.get("VAL_NCLOB")).isEqualTo("TestNClob2");
            TestHelper.dropTable(connection, "clob_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "clob_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3347"})
    public void shouldContainPartitionInSchemaChangeEvent() throws Exception {
        TestHelper.dropTable(connection, "dbz3347");
        try {
            connection.execute(new String[]{"create table dbz3347 (id number primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3347");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3347").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(((SourceRecord) consumeRecordsByTopic(1).recordsForTopic(TestHelper.SERVER_NAME).get(0)).sourcePartition()).isEqualTo(Collections.singletonMap("server", TestHelper.SERVER_NAME));
            TestHelper.dropTable(connection, "dbz3347");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3347");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-832"})
    public void shouldSnapshotAndStreamTablesWithNoPrimaryKey() throws Exception {
        TestHelper.dropTable(connection, "dbz832");
        try {
            connection.execute(new String[]{"create table dbz832 (id numeric(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz832");
            connection.execute(new String[]{"INSERT INTO dbz832 values (1, 'Test')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ832").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat(sourceRecord.key()).isNull();
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz832 values (2, 'Test2')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat(sourceRecord2.key()).isNull();
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test2");
            TestHelper.dropTable(connection, "dbz832");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz832");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1211"})
    public void shouldSnapshotAndStreamTablesWithUniqueIndexPrimaryKey() throws Exception {
        TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
        try {
            connection.execute(new String[]{"create table dbz1211 (id numeric(9,0), data varchar2(50), constraint pkdbz1211 primary key (id) using index)"});
            connection.execute(new String[]{"alter table dbz1211 add constraint xdbz1211 unique (id,data) using index"});
            connection.execute(new String[]{"create table dbz1211_child (id numeric(9,0), data varchar2(50), constraint fk1211 foreign key (id) references dbz1211 on delete cascade)"});
            connection.execute(new String[]{"alter table dbz1211_child add constraint ydbz1211 unique (id,data) using index"});
            TestHelper.streamTable(connection, "dbz1211");
            TestHelper.streamTable(connection, "dbz1211_child");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211 values (1, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (1, 'Child')"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ1211,DEBEZIUM\\.DBZ1211\\_CHILD").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0);
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct).isNotNull();
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.schema().field("DATA")).isNull();
            Struct struct2 = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test");
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0);
            Struct struct3 = (Struct) sourceRecord2.key();
            Assertions.assertThat(struct3).isNotNull();
            Assertions.assertThat(struct3.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct3.get("DATA")).isEqualTo("Child");
            Struct struct4 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct4.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct4.get("DATA")).isEqualTo("Child");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz1211 values (2, 'Test2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (1, 'Child1-2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (2, 'Child2-1')"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(2);
            SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0);
            Struct struct5 = (Struct) sourceRecord3.key();
            Assertions.assertThat(struct5).isNotNull();
            Assertions.assertThat(struct5.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct5.schema().field("DATA")).isNull();
            Struct struct6 = ((Struct) sourceRecord3.value()).getStruct("after");
            Assertions.assertThat(struct6.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct6.get("DATA")).isEqualTo("Test2");
            SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0);
            Struct struct7 = (Struct) sourceRecord4.key();
            Assertions.assertThat(struct7).isNotNull();
            Assertions.assertThat(struct7.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct7.get("DATA")).isEqualTo("Child1-2");
            Struct struct8 = ((Struct) sourceRecord4.value()).getStruct("after");
            Assertions.assertThat(struct8.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct8.get("DATA")).isEqualTo("Child1-2");
            SourceRecord sourceRecord5 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(1);
            Struct struct9 = (Struct) sourceRecord5.key();
            Assertions.assertThat(struct9).isNotNull();
            Assertions.assertThat(struct9.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct9.get("DATA")).isEqualTo("Child2-1");
            Struct struct10 = ((Struct) sourceRecord5.value()).getStruct("after");
            Assertions.assertThat(struct10.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct10.get("DATA")).isEqualTo("Child2-1");
            TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
        } catch (Throwable th) {
            TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3322"})
    public void shouldNotEmitEventsOnConstraintViolations() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            try {
                try {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test1')"});
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test2')"});
                    connection.executeWithoutCommitting(new String[]{"COMMIT"});
                } catch (Throwable th) {
                    connection.executeWithoutCommitting(new String[]{"COMMIT"});
                    throw th;
                }
            } catch (SQLException e) {
                if (!e.getMessage().startsWith("ORA-00001")) {
                    throw e;
                }
                connection.executeWithoutCommitting(new String[]{"COMMIT"});
            }
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3322");
        } catch (Throwable th2) {
            TestHelper.dropTable(connection, "dbz3322");
            throw th2;
        }
    }

    @Test
    @FixFor({"DBZ-5090"})
    public void shouldNotEmitEventsOnConstraintViolationsAcrossSessions() throws Exception {
        TestHelper.dropTable(connection, "dbz5090");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5090 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz5090 ON dbz5090 (id)"});
            TestHelper.streamTable(connection, "dbz5090");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5090").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                OracleConnection testConnection2 = TestHelper.testConnection();
                try {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test1')"});
                    CountDownLatch countDownLatch = new CountDownLatch(2);
                    CountDownLatch countDownLatch2 = new CountDownLatch(1);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        try {
                            testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (2,'Test2')"});
                            countDownLatch.countDown();
                            try {
                                testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test2')"});
                            } catch (SQLException e) {
                                countDownLatch2.await();
                                testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test2')"});
                            }
                            return true;
                        } catch (SQLException e2) {
                            return false;
                        }
                    }));
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        try {
                            testConnection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (3,'Test3')"});
                            countDownLatch.countDown();
                            try {
                                testConnection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test3')"});
                            } catch (SQLException e) {
                                countDownLatch2.await();
                                testConnection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test3b')"});
                            }
                            return true;
                        } catch (SQLException e2) {
                            return false;
                        }
                    }));
                    countDownLatch.await();
                    Thread.sleep(5000L);
                    connection.commit();
                    countDownLatch2.countDown();
                    Assertions.assertThat((Boolean) ((Future) arrayList.get(0)).get()).isFalse();
                    Assertions.assertThat((Boolean) ((Future) arrayList.get(1)).get()).isFalse();
                    testConnection.commit();
                    testConnection2.commit();
                    if (testConnection2 != null) {
                        testConnection2.close();
                    }
                    if (testConnection != null) {
                        testConnection.close();
                    }
                    List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.DEBEZIUM.DBZ5090");
                    Assertions.assertThat(recordsForTopic).hasSize(3);
                    VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 1);
                    Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
                    Assertions.assertThat(struct.get("ID")).isEqualTo(1);
                    Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
                    assertNoRecordsToConsume();
                    TestHelper.dropTable(connection, "dbz5090");
                } catch (Throwable th) {
                    if (testConnection2 != null) {
                        try {
                            testConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            TestHelper.dropTable(connection, "dbz5090");
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-3322"})
    public void shouldNotEmitEventsInRollbackTransaction() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (2, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"ROLLBACK"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (3, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value()).getStruct("after").get("ID")).isEqualTo(3);
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3322");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3322");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3062"})
    public void shouldSelectivelySnapshotTables() throws Exception {
        TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3062a (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE TABLE dbz3062b (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3062a");
            TestHelper.streamTable(connection, "dbz3062b");
            connection.execute(new String[]{"INSERT INTO dbz3062a VALUES (1, 'Test1')"});
            connection.execute(new String[]{"INSERT INTO dbz3062b VALUES (2, 'Test2')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3062.*").with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*DEBEZIUM\\.DBZ3062A").with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(recordsForTopic2).isNull();
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062a VALUES (3, 'Test3')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062b VALUES (4, 'Test4')"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            Assertions.assertThat(recordsForTopic4).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic3.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test3");
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic4.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(4);
            Assertions.assertThat(struct3.get("DATA")).isEqualTo("Test4");
            TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
        } catch (Throwable th) {
            TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3616"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldNotLogWarningsAboutCommittedTransactionsWhileStreamingNormally() throws Exception {
        TestHelper.dropTables(connection, "dbz3616", "dbz3616");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3616 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3616");
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3616.*").with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            OracleConnection testConnection = TestHelper.testConnection();
            testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (1,'Conn2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (2,'Conn1')"});
            connection.commit();
            Awaitility.await().pollDelay(Durations.ONE_MINUTE).timeout(Durations.TWO_MINUTES).until(() -> {
                return true;
            });
            testConnection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3616")).hasSize(2);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3616");
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").get("ID")).isEqualTo(2);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("ID")).isEqualTo(1);
            TestHelper.dropTables(connection, "dbz3616", "dbz3616");
        } catch (Throwable th) {
            TestHelper.dropTables(connection, "dbz3616", "dbz3616");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3668"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1001, 'DBZ3668', 100)"});
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        List<SourceRecord> recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        for (SourceRecord sourceRecord : recordsForTopic) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME, false);
        }
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1002, 'DBZ3668', 95)"});
        List<SourceRecord> recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        for (SourceRecord sourceRecord2 : recordsForTopic2) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord2, false, jsonNode -> {
                Assertions.assertThat(jsonNode.get("id").asText()).contains(new CharSequence[]{"scn:"});
            });
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord2, TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME, false);
        }
    }

    @Test
    @FixFor({"DBZ-3896"})
    public void shouldCaptureTableMetadataWithMultipleStatements() throws Exception {
        try {
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3896").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"CREATE TABLE dbz3896 (id number(9,0), name varchar2(50), data varchar2(50))", "CREATE UNIQUE INDEX dbz3896_pk ON dbz3896 (\"ID\", \"NAME\")", "ALTER TABLE dbz3896 ADD CONSTRAINT idx_dbz3896 PRIMARY KEY (\"ID\", \"NAME\") USING INDEX \"DBZ3896_PK\""});
            TestHelper.streamTable(connection, "dbz3896");
            connection.execute(new String[]{"INSERT INTO dbz3896 (id,name,data) values (1,'First','Test')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3896")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3896").get(0);
            Assertions.assertThat(sourceRecord.key()).isNotNull();
            Assertions.assertThat(sourceRecord.keySchema().field("ID")).isNotNull();
            Assertions.assertThat(sourceRecord.keySchema().field("NAME")).isNotNull();
            Assertions.assertThat(((Struct) sourceRecord.key()).get("ID")).isEqualTo(1);
            Assertions.assertThat(((Struct) sourceRecord.key()).get("NAME")).isEqualTo("First");
            TestHelper.dropTable(connection, "dbz3896");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3896");
            throw th;
        }
    }

    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Tests specific LogMiner features")
    @Test
    @SkipOnDatabaseOption(value = "Real Application Clusters", enabled = true, reason = "Performance w/CATALOG_IN_REDO on Oracle RAC")
    @FixFor({"DBZ-3898"})
    public void shouldIgnoreAllTablesInExcludedSchemas() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3898");
            connection.execute(new String[]{"CREATE TABLE dbz3898 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3898");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz3898 (id,data) values (1,'Test')"});
            Scn currentScn = TestHelper.getCurrentScn();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3898")).hasSize(1);
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> {
                return Boolean.valueOf(new Scn((BigInteger) getStreamingMetric("CurrentScn")).compareTo(currentScn) > 0);
            });
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3898");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3898");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3712", "DBZ-4879"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Tests archive log support for LogMiner only")
    public void shouldStartWithArchiveLogOnlyModeAndStreamWhenRecordsBecomeAvailable() throws Exception {
        TestHelper.dropTable(connection, "dbz3712");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3712");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true).with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
            assertNoRecordsToConsume();
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (1, 'Test')"});
            waitForLogSwitchOrForceOneAfterTimeout();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (2, 'Test2')"});
            waitForLogSwitchOrForceOneAfterTimeout();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            TestHelper.dropTable(connection, "dbz3712");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3712");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3712"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Tests archive log support for LogMiner only")
    public void shouldPermitChangingToArchiveLogOnlyModeOnExistingConnector() throws Exception {
        TestHelper.dropTable(connection, "dbz3712");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3712");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (1, 'Test1')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            stopConnector();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true).with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
            assertNoRecordsToConsume();
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (2, 'Test2')"});
            waitForLogSwitchOrForceOneAfterTimeout();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (3, 'Test2')"});
            waitForLogSwitchOrForceOneAfterTimeout();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            TestHelper.dropTable(connection, "dbz3712");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3712");
            throw th;
        }
    }

    private void waitForLogSwitchOrForceOneAfterTimeout() throws SQLException {
        List<BigInteger> currentRedoLogSequences = TestHelper.getCurrentRedoLogSequences();
        try {
            Awaitility.await().pollInterval(Duration.of(5L, ChronoUnit.SECONDS)).atMost(Duration.of(20L, ChronoUnit.SECONDS)).until(() -> {
                if (!TestHelper.getCurrentRedoLogSequences().equals(currentRedoLogSequences)) {
                    return true;
                }
                assertNoRecordsToConsume();
                return false;
            });
        } catch (ConditionTimeoutException e) {
            TestHelper.forceLogfileSwitch();
        }
    }

    @Test
    @FixFor({"DBZ-3978"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Specific to only LogMiner")
    public void shouldFilterUser() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3978");
            connection.execute(new String[]{"CREATE TABLE dbz3978 (id number(9,0), data varchar2(50), primary key (id))"});
            TestHelper.streamTable(connection, "dbz3978");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3978").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "none").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (1, 'Test1')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (2, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            Assertions.assertThat(waitForAvailableRecords(10L, TimeUnit.SECONDS)).isFalse();
            Assertions.assertThat((Long) getStreamingMetric("TotalCapturedDmlCount")).isGreaterThanOrEqualTo(2L);
            TestHelper.dropTable(connection, "dbz3978");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3978");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5756"})
    public void testShouldIgnoreCompressionAdvisorTablesDuringSnapshotAndStreaming() throws Exception {
        TestHelper.dropTable(connection, "CMP3$12345");
        try {
            connection.execute(new String[]{"CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))"});
            TestHelper.streamTable(connection, "CMP3$12345");
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (2, 2, 'data')"});
            try {
                Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                    assertNoRecordsToConsume();
                    return false;
                });
            } catch (ConditionTimeoutException e) {
            }
            TestHelper.dropTable(connection, "CMP3$12345");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "CMP3$12345");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5756"})
    public void testShouldIgnoreCompressionAdvisorTablesDuringStreaming() throws Exception {
        TestHelper.dropTable(connection, "CMP3$12345");
        try {
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))"});
            TestHelper.streamTable(connection, "CMP3$12345");
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')"});
            try {
                Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                    assertNoRecordsToConsume();
                    return false;
                });
            } catch (ConditionTimeoutException e) {
            }
            TestHelper.dropTable(connection, "CMP3$12345");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "CMP3$12345");
            throw th;
        }
    }

    private <T> T getStreamingMetric(String str) throws JMException {
        return (T) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME), str);
    }

    private String generateAlphaNumericStringColumn(int i) {
        StringBuilder sb = new StringBuilder(i);
        for (int i2 = 0; i2 < i; i2++) {
            sb.append("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".charAt((int) ("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".length() * Math.random())));
        }
        return sb.toString();
    }

    private void verifyHeartbeatRecord(SourceRecord sourceRecord) {
        TestCase.assertEquals("__debezium-heartbeat.server1", sourceRecord.topic());
        Assertions.assertThat(((Struct) sourceRecord.key()).get("serverName")).isEqualTo(TestHelper.SERVER_NAME);
    }

    private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) {
        return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND;
    }

    @Test(expected = DebeziumException.class)
    @FixFor({"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyRecoveryExceptionWithoutOffset() {
        Path absolutePath = Testing.Files.createTestingPath("missing-history.txt").toAbsolutePath();
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.RECOVERY).with(FileSchemaHistory.FILE_PATH, absolutePath).build();
        AtomicReference atomicReference = new AtomicReference();
        start(OracleConnector.class, build, (z, str, th) -> {
            atomicReference.set(th);
        });
        Testing.Files.delete(absolutePath);
        throw ((RuntimeException) atomicReference.get());
    }

    @Test
    @FixFor({"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        try {
            Configuration.Builder with = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986").with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()).with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
            consumeRecords(with.build());
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')"});
            with.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.RECOVERY);
            start(OracleConnector.class, with.build());
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals(3, struct.get("ID"));
            TestCase.assertEquals("asuka", struct.get("DATA"));
            TestHelper.dropTable(connection, "DBZ3986");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "DBZ3986");
            throw th;
        }
    }

    @Test(expected = DebeziumException.class)
    @FixFor({"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Exception {
        try {
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986").with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()).with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName()).build();
            consumeRecords(build);
            AtomicReference atomicReference = new AtomicReference();
            start(OracleConnector.class, build, (z, str, th) -> {
                atomicReference.set(th);
            });
            throw ((RuntimeException) atomicReference.get());
        } catch (Throwable th2) {
            TestHelper.dropTable(connection, "DBZ3986");
            throw th2;
        }
    }

    @Test
    @FixFor({"DBZ-3986"})
    public void shouldSkipDataOnSnapshotSchemaOnly() throws Exception {
        try {
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986").with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName()).with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName()).build();
            consumeRecords(build);
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')"});
            start(OracleConnector.class, build);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (4, 'debezium')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals(4, struct.get("ID"));
            TestCase.assertEquals(TestHelper.SCHEMA_USER, struct.get("DATA"));
            TestHelper.dropTable(connection, "DBZ3986");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "DBZ3986");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4161"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies to LogMiner only")
    public void shouldWarnAboutTableNameLengthExceeded() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
            connection.execute(new String[]{"CREATE TABLE dbz4161_with_a_name_that_is_greater_than_30 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
            connection.execute(new String[]{"INSERT INTO dbz4161_with_a_name_that_is_greater_than_30 values (1, 'snapshot')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30").build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("snapshot");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz4161_with_a_name_that_is_greater_than_30 values (2, 'streaming')"});
            waitForCurrentScnToHaveBeenSeenByConnector();
            assertNoRecordsToConsume();
            Assertions.assertThat(logInterceptor.containsWarnMessage("Table 'DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30' won't be captured by Oracle LogMiner")).isTrue();
            TestHelper.dropTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4161"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies to LogMiner only")
    public void shouldWarnAboutColumnNameLengthExceeded() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4161");
            connection.execute(new String[]{"CREATE TABLE dbz4161 (id numeric(9,0), a_very_long_column_name_that_is_greater_than_30 varchar2(30))"});
            TestHelper.streamTable(connection, "dbz4161");
            connection.execute(new String[]{"INSERT INTO dbz4161 values (1, 'snapshot')"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4161").build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4161")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4161").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("A_VERY_LONG_COLUMN_NAME_THAT_IS_GREATER_THAN_30")).isEqualTo("snapshot");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz4161 values (2, 'streaming')"});
            waitForCurrentScnToHaveBeenSeenByConnector();
            assertNoRecordsToConsume();
            Assertions.assertThat(logInterceptor.containsWarnMessage("Table 'DBZ4161' won't be captured by Oracle LogMiner")).isTrue();
            TestHelper.dropTable(connection, "dbz4161");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4161");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3611"})
    public void shouldSafelySnapshotAndStreamWithDatabaseIncludeList() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3611");
            connection.execute(new String[]{"CREATE TABLE dbz3611 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz3611");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (1, 'snapshot')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_INCLUDE_LIST, TestHelper.getDatabaseName()).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.dbz3611").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3611").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("snapshot");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz3611 values (2, 'streaming')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3611");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3611");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3611"})
    public void shouldSafelySnapshotAndStreamWithDatabaseExcludeList() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3611");
            connection.execute(new String[]{"CREATE TABLE dbz3611 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz3611");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (1, 'snapshot')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_EXCLUDE_LIST, TestHelper.getDatabaseName() + "2").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.dbz3611").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3611").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("snapshot");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz3611 values (2, 'streaming')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3611");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3611");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4376"})
    public void shouldNotRaiseNullPointerExceptionWithNonUppercaseDatabaseName() throws Exception {
        Configuration build = TestHelper.isUsingPdb() ? TestHelper.defaultConfig().with(OracleConnectorConfig.PDB_NAME, TestHelper.getDatabaseName().toLowerCase()).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build() : TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_NAME, TestHelper.getDatabaseName().toLowerCase()).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer (id,name) values (1, 'Bugs Bunny')"});
        start(OracleConnector.class, build);
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER").get(0)).value()).getStruct("after").get("ID")).isEqualTo(1);
    }

    @FixFor({"DBZ-3986"})
    private void consumeRecords(Configuration configuration) throws SQLException, InterruptedException {
        TestHelper.dropTable(connection, "DBZ3986");
        connection.execute(new String[]{"CREATE TABLE DBZ3986 (ID number(9,0), DATA varchar2(50))"});
        TestHelper.streamTable(connection, "DBZ3986");
        start(OracleConnector.class, configuration);
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (1, 'Test')"});
        connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (2, 'ashlin')"});
        Assertions.assertThat(consumeRecordsByTopic(2).allRecordsInOrder()).hasSize(2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-4367"})
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundary() throws Exception {
        TestHelper.dropTable(connection, "DBZ4367");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367").with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY).build());
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post snapshot post TX')"});
            List list = (List) consumeRecordsByTopic(4).recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(sourceRecord -> {
                return getAfter(sourceRecord).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list).containsOnly(new Integer[]{1, 2, 3, 4});
            Assertions.assertThat(list).doesNotHaveDuplicates();
            Assertions.assertThat(list).hasSize(4);
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4367"})
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundaryWithoutDuplicatingSnapshottedChanges() throws Exception {
        OracleConnection testConnection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "DBZ4367");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            testConnection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'pre-snapshot in another TX')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367").with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY).build());
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List list = (List) consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(sourceRecord -> {
                return getAfter(sourceRecord).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list).containsOnly(new Integer[]{1, 3});
            Assertions.assertThat(list).doesNotHaveDuplicates();
            Assertions.assertThat(list).hasSize(2);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (5, 'post snapshot post TX')"});
            List list2 = (List) consumeRecordsByTopic(3).recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(sourceRecord2 -> {
                return getAfter(sourceRecord2).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list2).containsOnly(new Integer[]{2, 4, 5});
            Assertions.assertThat(list2).doesNotHaveDuplicates();
            Assertions.assertThat(list2).hasSize(3);
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            testConnection.close();
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            testConnection.close();
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4367"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "User-defined types not supported")
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundaryWithoutReemittingDDLChanges() throws Exception {
        OracleConnection testConnection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "DBZ4367");
        TestHelper.dropTable(connection, "DBZ4367_EXTRA");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"CREATE TABLE DBZ4367_EXTRA (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367_EXTRA");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA) VALUES (100, 'second table, pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            testConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA ADD DATA2 VARCHAR2(50) DEFAULT 'default2'"});
            testConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA ADD DATA3 VARCHAR2(50) DEFAULT 'default3'"});
            testConnection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2, DATA3) VALUES (150, 'second table, with outdated schema', 'something', 'something')"});
            testConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA DROP COLUMN DATA3"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (200, 'second table, pre-snapshot in TX', 'something')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367,DEBEZIUM\\.DBZ4367_EXTRA").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY).build());
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(9);
            List ddlRecordsForDatabase = consumeRecordsByTopic.ddlRecordsForDatabase(TestHelper.getDatabaseName());
            ddlRecordsForDatabase.forEach(sourceRecord -> {
                Assertions.assertThat(((Struct) sourceRecord.value()).getString("ddl")).contains(new CharSequence[]{"CREATE TABLE"});
            });
            Assertions.assertThat(ddlRecordsForDatabase).hasSize(6);
            Assertions.assertThat((List) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(sourceRecord2 -> {
                return getAfter(sourceRecord2).getInt32("ID");
            }).collect(Collectors.toList())).containsExactly(new Integer[]{1});
            List list = (List) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4367_EXTRA").stream().map(sourceRecord3 -> {
                return getAfter(sourceRecord3).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list).containsOnly(new Integer[]{100, 150});
            Assertions.assertThat(list).doesNotHaveDuplicates();
            Assertions.assertThat(list).hasSize(2);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (300, 'second table, post-snapshot in TX', 'something')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post snapshot post TX')"});
            connection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (400, 'second table, post-snapshot post TX', 'something')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(6);
            List ddlRecordsForDatabase2 = consumeRecordsByTopic2.ddlRecordsForDatabase(TestHelper.getDatabaseName());
            if (ddlRecordsForDatabase2 != null) {
                Assertions.assertThat(ddlRecordsForDatabase2).isEmpty();
            }
            List list2 = (List) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(sourceRecord4 -> {
                return getAfter(sourceRecord4).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list2).containsOnly(new Integer[]{2, 3, 4});
            Assertions.assertThat(list2).doesNotHaveDuplicates();
            Assertions.assertThat(list2).hasSize(3);
            List list3 = (List) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ4367_EXTRA").stream().map(sourceRecord5 -> {
                return getAfter(sourceRecord5).getInt32("ID");
            }).collect(Collectors.toList());
            Assertions.assertThat(list3).containsOnly(new Integer[]{200, 300, 400});
            Assertions.assertThat(list3).doesNotHaveDuplicates();
            Assertions.assertThat(list3).hasSize(3);
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            TestHelper.dropTable(connection, "DBZ4367_EXTRA");
            testConnection.close();
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            TestHelper.dropTable(connection, "DBZ4367_EXTRA");
            testConnection.close();
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5085"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to LogMiner")
    public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundarySmallTrxs() throws Exception {
        TestHelper.dropTable(connection, "dbz5085");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerAdapter.class);
            setConsumeTimeout(10L, TimeUnit.SECONDS);
            connection.execute(new String[]{"CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5085");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085").with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL).build();
            LOGGER.info("Inserting {} records", 50);
            for (int i = 0; i < 50; i++) {
                if (i % 2 == 0) {
                    connection.execute(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                } else {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                }
                Thread.sleep(100L);
            }
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("Pending Transaction '"));
            });
            connection.commit();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(50).recordsForTopic("server1.DEBEZIUM.DBZ5085");
            Assertions.assertThat(recordsForTopic).hasSize(50);
            boolean z = false;
            boolean z2 = false;
            for (int i2 = 0; i2 < 50; i2++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2);
                if (((Struct) sourceRecord.value()).getString("op").equals(Envelope.Operation.READ.code())) {
                    z = true;
                    VerifyRecord.isValidRead(sourceRecord, "ID", i2);
                } else {
                    z2 = true;
                    VerifyRecord.isValidInsert(sourceRecord, "ID", i2);
                }
            }
            Assertions.assertThat(z).isTrue();
            Assertions.assertThat(z2).isTrue();
            TestHelper.dropTable(connection, "dbz5085");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5085");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5085"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to LogMiner")
    public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundaryLargeTrxs() throws Exception {
        TestHelper.dropTable(connection, "dbz5085");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerAdapter.class);
            setConsumeTimeout(10L, TimeUnit.SECONDS);
            connection.execute(new String[]{"CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5085");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085").with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL).build();
            LOGGER.info("Inserting {} records", 50);
            for (int i = 0; i < 50; i++) {
                if (i % 10 == 0) {
                    connection.execute(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                } else {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                }
                Thread.sleep(100L);
            }
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("Pending Transaction '"));
            });
            connection.commit();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(50).recordsForTopic("server1.DEBEZIUM.DBZ5085");
            Assertions.assertThat(recordsForTopic).hasSize(50);
            boolean z = false;
            boolean z2 = false;
            for (int i2 = 0; i2 < 50; i2++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2);
                if (((Struct) sourceRecord.value()).getString("op").equals(Envelope.Operation.READ.code())) {
                    z = true;
                    VerifyRecord.isValidRead(sourceRecord, "ID", i2);
                } else {
                    z2 = true;
                    VerifyRecord.isValidInsert(sourceRecord, "ID", i2);
                }
            }
            Assertions.assertThat(z).isTrue();
            Assertions.assertThat(z2).isTrue();
            TestHelper.dropTable(connection, "dbz5085");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5085");
            throw th;
        }
    }

    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Hybrid strategy now detects and handles this use case")
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to LogMiner")
    @Test
    @FixFor({"DBZ-4842"})
    public void shouldRestartAfterCapturedTableIsDroppedWhileConnectorDown() throws Exception {
        TestHelper.dropTable(connection, "dbz4842");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4842 (id numeric(9,0) primary key, name varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4842");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4842").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz4842 (id,name) values (1,'Test')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4842")).hasSize(1);
            stopConnector(z -> {
                Assertions.assertThat(z).isFalse();
            });
            connection.execute(new String[]{"INSERT INTO dbz4842 (id,name) values (2,'Test')"});
            TestHelper.dropTable(connection, "dbz4842");
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().pollDelay(10L, TimeUnit.SECONDS).timeout(11L, TimeUnit.SECONDS).until(() -> {
                assertNoRecordsToConsume();
                return true;
            });
            TestHelper.dropTable(connection, "dbz4842");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4842");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4852"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "User-defined types not supported")
    public void shouldCaptureChangeForTableWithUnsupportedColumnType() throws Exception {
        TestHelper.dropTable(connection, "dbz4852");
        try {
            OracleConnection adminConnection = TestHelper.adminConnection(false);
            try {
                adminConnection.execute(new String[]{"CREATE OR REPLACE DIRECTORY DIR_DBZ4852 AS '/home/oracle'"});
                if (adminConnection != null) {
                    adminConnection.close();
                }
                start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4852").build());
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                connection.execute(new String[]{"CREATE TABLE dbz4852 (id numeric(9,0) primary key, filename bfile)"});
                TestHelper.streamTable(connection, "dbz4852");
                connection.execute(new String[]{"INSERT INTO dbz4852 (id,filename) values (1,bfilename('DIR_DBZ4852','test.txt'))"});
                connection.execute(new String[]{"UPDATE dbz4852 set filename = bfilename('DIR_DBZ4852','test2.txt') WHERE id = 1"});
                connection.execute(new String[]{"DELETE FROM dbz4852 where id = 1"});
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4852")).hasSize(3);
                SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(0);
                VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
                Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").schema().field("FILENAME")).isNull();
                SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(1);
                VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
                Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("before").schema().field("FILENAME")).isNull();
                Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("after").schema().field("FILENAME")).isNull();
                SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(2);
                VerifyRecord.isValidDelete(sourceRecord3, "ID", 1);
                Assertions.assertThat(((Struct) sourceRecord3.value()).getStruct("before").schema().field("FILENAME")).isNull();
                TestHelper.dropTable(connection, "dbz4852");
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4852");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4853"})
    public void shouldCaptureChangeForTableWithUnsupportedColumnTypeLong() throws Exception {
        TestHelper.dropTable(connection, "dbz4853");
        try {
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4853").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"CREATE TABLE dbz4853 (id numeric(9,0) primary key, long_val long)"});
            TestHelper.streamTable(connection, "dbz4853");
            connection.execute(new String[]{"INSERT INTO dbz4853 (id,long_val) values (1,'test.txt')"});
            connection.execute(new String[]{"UPDATE dbz4853 set long_val = 'test2.txt' WHERE id = 1"});
            connection.execute(new String[]{"DELETE FROM dbz4853 where id = 1"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4853")).hasSize(3);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").schema().field("LONG_VAL")).isNull();
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(1);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("before").schema().field("LONG_VAL")).isNull();
            Assertions.assertThat(((Struct) sourceRecord2.value()).getStruct("after").schema().field("LONG_VAL")).isNull();
            SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(2);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 1);
            Assertions.assertThat(((Struct) sourceRecord3.value()).getStruct("before").schema().field("LONG_VAL")).isNull();
            TestHelper.dropTable(connection, "dbz4853");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4853");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4907"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only LogMiner performs flushes")
    public void shouldContinueToUpdateOffsetsEvenWhenTableIsNotChanged() throws Exception {
        TestHelper.dropTable(connection, "dbz4907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4907 (id numeric(9,0) primary key, state varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz4907 (id,state) values (1, 'snapshot')"});
            TestHelper.streamTable(connection, "dbz4907");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4907").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4907")).hasSize(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            BigInteger bigInteger = (BigInteger) getStreamingMetric("OffsetScn");
            BigInteger bigInteger2 = (BigInteger) getStreamingMetric("CommittedScn");
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                BigInteger bigInteger3 = (BigInteger) getStreamingMetric("OffsetScn");
                BigInteger bigInteger4 = (BigInteger) getStreamingMetric("CommittedScn");
                return Boolean.valueOf((bigInteger3 == null || bigInteger4 == null || bigInteger3.equals(bigInteger) || bigInteger4.equals(bigInteger2)) ? false : true);
            });
            TestHelper.dropTable(connection, "dbz4907");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4907");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4936"})
    public void shouldNotEmitLastCommittedTransactionEventsUponRestart() throws Exception {
        TestHelper.dropTable(connection, "dbz4936");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4936 (id numeric(9,0) primary key, name varchar2(100))"});
            TestHelper.streamTable(connection, "dbz4936");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4936").with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "memory").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                testConnection.setAutoCommit(false);
                testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz4936 (id,name) values (1,'In-Progress')"});
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                connection.execute(new String[]{"INSERT INTO dbz4936 (id,name) values (2, 'committed')"});
                List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat(recordsForTopic).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 2);
                connection.execute(new String[]{"UPDATE dbz4936 set name = 'updated' WHERE id = 2"});
                List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic2.get(0), "ID", 2);
                stopConnector();
                start(OracleConnector.class, build);
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                testConnection.commit();
                List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat(recordsForTopic3).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 1);
                assertNoRecordsToConsume();
                if (testConnection != null) {
                    testConnection.close();
                }
                TestHelper.dropTable(connection, "dbz4936");
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4936");
            throw th;
        }
    }

    @Test
    @FixFor({"DbZ-3318"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldSuccessfullyConnectAndStreamWithDatabaseUrl() throws Exception {
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1001, 'DBZ3668', 100)"});
        Map asMap = TestHelper.defaultConfig().build().asMap();
        asMap.remove(OracleConnectorConfig.HOSTNAME.name());
        asMap.put(OracleConnectorConfig.URL.name(), TestHelper.getOracleConnectionUrlDescriptor());
        Configuration.Builder create = Configuration.create();
        for (Map.Entry entry : asMap.entrySet()) {
            create.with((String) entry.getKey(), (String) entry.getValue());
        }
        start(OracleConnector.class, create.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1001);
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-4953"})
    public void shouldStreamTruncateEventWhenLobIsEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz4953");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4953 (id numeric(9,0) primary key, col2 varchar2(100))"});
            TestHelper.streamTable(connection, "dbz4953");
            connection.execute(new String[]{"INSERT INTO dbz4953 (id,col2) values (1, 'Daffy Duck')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4953").with(OracleConnectorConfig.LOB_ENABLED, true).with(OracleConnectorConfig.SKIPPED_OPERATIONS, "none").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4953");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            connection.execute(new String[]{"TRUNCATE TABLE dbz4953"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4953");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidTruncate((SourceRecord) recordsForTopic2.get(0));
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz4953");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4953");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4963"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldRestartLogMiningSessionAfterMaxSessionElapses() throws Exception {
        TestHelper.dropTable(connection, "dbz4963");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4963 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4963");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4963").with(OracleConnectorConfig.LOG_MINING_SESSION_MAX_MS, 10000L).build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("LogMiner session has exceeded maximum session time"));
            });
            stopConnector();
            TestHelper.dropTable(connection, "dbz4963");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4963");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4963"})
    @Ignore("Waits 60 seconds by default, so disabled by default")
    public void shouldNotRestartLogMiningSessionWithMaxSessionZero() throws Exception {
        TestHelper.dropTable(connection, "dbz4963");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4963 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4963");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4963").build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            try {
                Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                    return Boolean.valueOf(logInterceptor.containsMessage("LogMiner session has exceeded maximum session time"));
                });
                Assert.fail("Expected a ConditionTimeoutException, LogMiner session max session message should not have been written.");
                TestHelper.dropTable(connection, "dbz4963");
            } catch (ConditionTimeoutException e) {
                stopConnector();
                TestHelper.dropTable(connection, "dbz4963");
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4963");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5006"})
    public void shouldSupportTablesWithForwardSlashes() throws Exception {
        testTableWithForwardSlashes("/dbz5006", "_dbz5006");
        testTableWithForwardSlashes("dbz/5006", "dbz_5006");
        testTableWithForwardSlashes("dbz5006/", "dbz5006_");
        testTableWithForwardSlashes("db/z50/06", "db_z50_06");
        testTableWithForwardSlashes("dbz//5006", "dbz__5006");
    }

    @Test
    @FixFor({"DBZ-5119"})
    public void shouldExecuteHeartbeatActionQuery() throws Exception {
        TestHelper.dropTable(connection, "dbz5119");
        TestHelper.dropTable(connection, "heartbeat");
        try {
            connection.execute(new String[]{"CREATE TABLE heartbeat (data timestamp)"});
            connection.execute(new String[]{"INSERT INTO heartbeat values (sysdate)"});
            TestHelper.grantRole("INSERT,UPDATE", "debezium.heartbeat", TestHelper.getConnectorUserName());
            connection.execute(new String[]{"CREATE TABLE dbz5119 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5119");
            TestHelper.streamTable(connection, "heartbeat");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, "schema_only").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5119,DEBEZIUM\\.HEARTBEAT").with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "UPDATE debezium.heartbeat set data = sysdate WHERE ROWNUM = 1").with(DatabaseHeartbeatImpl.HEARTBEAT_INTERVAL, 1000).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.HEARTBEAT");
                return Boolean.valueOf((recordsForTopic == null || recordsForTopic.isEmpty()) ? false : true);
            });
            stopConnector(z -> {
                consumeAvailableRecords(sourceRecord -> {
                });
            });
            TestHelper.dropTable(connection, "heartbeat");
            TestHelper.dropTable(connection, "dbz5119");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "heartbeat");
            TestHelper.dropTable(connection, "dbz5119");
            throw th;
        }
    }

    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Test overrides strategy as requires online_catalog")
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to Oracle LogMiner implementation")
    @Test
    @FixFor({"DBZ-5147"})
    public void shouldStopWhenErrorProcessingFailureHandlingModeIsDefault() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor logInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor.containsErrorMessage(ERROR_PROCESSING_FAIL_MESSAGE));
            });
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz5147");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5147");
            throw th;
        }
    }

    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Test overrides strategy as requires online_catalog")
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to Oracle LogMiner implementation")
    @Test
    @FixFor({"DBZ-5147"})
    public void shouldLogWarningAndSkipWhenErrorProcessingFailureHandlingModeIsWarn() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147").with(OracleConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, "warn").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor logInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor.containsWarnMessage(ERROR_PROCESSING_WARN_MESSAGE));
            });
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "ID", 3);
            TestHelper.dropTable(connection, "dbz5147");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5147");
            throw th;
        }
    }

    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Test overrides strategy as requires online_catalog")
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to Oracle LogMiner implementation")
    @Test
    @FixFor({"DBZ-5147"})
    public void shouldSilentlySkipWhenErrorProcessingFailureHandlingModeIsSkip() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147").with(OracleConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, "skip").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor logInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "ID", 3);
            Assertions.assertThat(logInterceptor.containsErrorMessage(ERROR_PROCESSING_FAIL_MESSAGE)).isFalse();
            Assertions.assertThat(logInterceptor.containsWarnMessage(ERROR_PROCESSING_WARN_MESSAGE)).isFalse();
            TestHelper.dropTable(connection, "dbz5147");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5147");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5139"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldDiscardTransactionThatExceedsEventThreshold() throws Exception {
        TestHelper.dropTable(connection, "dbz5139");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5139 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5139");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5139").with(OracleConnectorConfig.LOG_MINING_BUFFER_TRANSACTION_EVENTS_THRESHOLD, 100).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            for (int i = 0; i < 101; i++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i + ", 'Test" + i + "')"});
            }
            connection.commit();
            for (int i2 = 200; i2 < 225; i2++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i2 + ", 'Test" + i2 + "')"});
            }
            connection.commit();
            List recordsForTopic = consumeRecordsByTopic(25).recordsForTopic("server1.DEBEZIUM.DBZ5139");
            Assertions.assertThat(recordsForTopic).hasSize(25);
            for (int i3 = 0; i3 < 25; i3++) {
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(i3), "ID", 200 + i3);
            }
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz5139");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5139");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5139"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldNotDiscardTransactionWhenNoEventThresholdSet() throws Exception {
        TestHelper.dropTable(connection, "dbz5139");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5139 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5139");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5139").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            for (int i = 0; i < 101; i++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i + ", 'Test" + i + "')"});
            }
            connection.commit();
            for (int i2 = 200; i2 < 225; i2++) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i2 + ", 'Test" + i2 + "')"});
            }
            connection.commit();
            List recordsForTopic = consumeRecordsByTopic(126).recordsForTopic("server1.DEBEZIUM.DBZ5139");
            Assertions.assertThat(recordsForTopic).hasSize(126);
            for (int i3 = 0; i3 < 101; i3++) {
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(i3), "ID", i3);
            }
            for (int i4 = 0; i4 < 25; i4++) {
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(101 + i4), "ID", 200 + i4);
            }
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz5139");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5139");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5356"})
    public void shouldUniqueIndexWhenAtLeastOneColumnIsExcluded() throws Exception {
        TestHelper.dropTable(connection, "dbz5356");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5356 (id numeric(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX dbz5356_idx ON dbz5356 (upper(data), id)"});
            TestHelper.streamTable(connection, "dbz5356");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5356").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            stopConnector();
            TestHelper.dropTable(connection, "dbz5356");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5356");
            throw th;
        }
    }

    private void testTableWithForwardSlashes(String str, String str2) throws Exception {
        String str3 = "\"" + str + "\"";
        TestHelper.dropTable(connection, str3);
        try {
            try {
                Testing.Files.delete(OFFSET_STORE_PATH);
                Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
                connection.execute(new String[]{"CREATE TABLE " + str3 + " (id numeric(9,0) primary key, data varchar2(50))"});
                connection.execute(new String[]{"INSERT INTO " + str3 + " (id,data) values (1, 'Record1')"});
                TestHelper.streamTable(connection, str3);
                start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\." + str).build());
                assertConnectorIsRunning();
                waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM." + str2);
                Assertions.assertThat(recordsForTopic).hasSize(1);
                VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                assertNoRecordsToConsume();
                connection.execute(new String[]{"INSERT INTO " + str3 + " (id,data) values (2,'Record2')"});
                List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM." + str2);
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "ID", 2);
                connection.execute(new String[]{"UPDATE " + str3 + " SET data = 'Record2u' WHERE id = 2"});
                List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM." + str2);
                Assertions.assertThat(recordsForTopic3).hasSize(1);
                VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic3.get(0), "ID", 2);
                connection.execute(new String[]{"DELETE " + str3 + " WHERE id = 1"});
                List recordsForTopic4 = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM." + str2);
                Assertions.assertThat(recordsForTopic4).hasSize(2);
                VerifyRecord.isValidDelete((SourceRecord) recordsForTopic4.get(0), "ID", 1);
                VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic4.get(1));
                assertNoRecordsToConsume();
                stopConnector();
                TestHelper.dropTable(connection, str3);
            } catch (Exception e) {
                throw new RuntimeException("Forward-slash test failed for table: " + str, e);
            }
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, str3);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5441"})
    @SkipWhenAdapterNameIs(value = SkipWhenAdapterNameIs.AdapterName.OLR, reason = "binary objects are skipped")
    public void shouldGracefullySkipObjectBasedTables() throws Exception {
        LogInterceptor logInterceptor;
        TestHelper.dropTable(connection, "dbz5441");
        try {
            TestHelper.grantRole("CREATE ANY TYPE");
            LogInterceptor logInterceptor2 = new LogInterceptor(RelationalSnapshotChangeEventSource.class);
            connection.execute(new String[]{"CREATE TYPE DEBEZIUM.DBZ5441_TYPE AS OBJECT (id number, lvl number)"});
            connection.execute(new String[]{"CREATE TABLE DEBEZIUM.DBZ5441 of DEBEZIUM.DBZ5441_TYPE (primary key(id))"});
            TestHelper.streamTable(connection, "DBZ5441");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5441").build();
            int defaultMessageConsumerPollTimeout = TestHelper.defaultMessageConsumerPollTimeout() * 2;
            switch (AnonymousClass1.$SwitchMap$io$debezium$connector$oracle$OracleConnectorConfig$ConnectorAdapter[TestHelper.getAdapter(build).ordinal()]) {
                case 1:
                    logInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
                    defaultMessageConsumerPollTimeout *= 2;
                    break;
                default:
                    logInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
                    break;
            }
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            Assertions.assertThat(logInterceptor2.containsMessage("Locking captured tables []")).isTrue();
            connection.execute(new String[]{"INSERT INTO DEBEZIUM.DBZ5441 (id,lvl) values (1,1)"});
            LogInterceptor logInterceptor3 = logInterceptor;
            Awaitility.await().atMost(defaultMessageConsumerPollTimeout, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor3.containsMessage("is not a relational table and will be skipped"));
            });
            assertNoRecordsToConsume();
            stopConnector();
            TestHelper.dropTable(connection, "dbz5441");
            connection.execute(new String[]{"DROP TYPE DEBEZIUM.DBZ5441_TYPE"});
            TestHelper.revokeRole("CREATE ANY TYPE");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5441");
            connection.execute(new String[]{"DROP TYPE DEBEZIUM.DBZ5441_TYPE"});
            TestHelper.revokeRole("CREATE ANY TYPE");
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-5682"})
    public void shouldCaptureChangesForTableUniqueIndexWithNullColumnValuesWhenLobEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz5682");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5682 (col_bpchar varchar2(30), col_varchar varchar2(30), col_int4 number(5), constraint uk_dbz5862 unique (col_bpchar, col_varchar))"});
            TestHelper.streamTable(connection, "dbz5682");
            connection.execute(new String[]{"INSERT INTO dbz5682 values ('1', null, 1)"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5682").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5682");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("COL_BPCHAR")).isEqualTo("1");
            Assertions.assertThat(struct.get("COL_VARCHAR")).isNull();
            Assertions.assertThat(struct.get("COL_INT4")).isEqualTo(1);
            connection.execute(new String[]{"INSERT INTO dbz5682 values ('2', null, 2)"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ5682");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("COL_BPCHAR")).isEqualTo("2");
            Assertions.assertThat(struct2.get("COL_VARCHAR")).isNull();
            Assertions.assertThat(struct2.get("COL_INT4")).isEqualTo(2);
            TestHelper.dropTable(connection, "dbz5682");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5682");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5626"})
    public void shouldNotUseOffsetScnWhenSnapshotIsAlways() throws Exception {
        try {
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.ALWAYS).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5626").build();
            TestHelper.dropTable(connection, "DBZ5626");
            connection.execute(new String[]{"CREATE TABLE DBZ5626 (ID number(9,0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ5626");
            connection.execute(new String[]{"INSERT INTO DBZ5626 (ID, DATA) values (1, 'Test1')", "INSERT INTO DBZ5626 (ID, DATA) values (2, 'Test2')"});
            start(OracleConnector.class, build);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(2);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals(1, struct.get("ID"));
            TestCase.assertEquals("Test1", struct.get("DATA"));
            stopConnector();
            connection.execute(new String[]{"DELETE FROM DBZ5626 WHERE ID=1"});
            connection.execute(new String[]{"INSERT INTO DBZ5626 (ID, DATA) values (3, 'Test3')"});
            start(OracleConnector.class, build);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(2);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals(2, struct2.get("ID"));
            TestCase.assertEquals("Test2", struct2.get("DATA"));
            TestHelper.dropTable(connection, "DBZ5626");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "DBZ5626");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5738"})
    public void shouldSkipSnapshotOfNestedTable() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalSnapshotChangeEventSource.class);
        TestHelper.dropTable(connection, "DBZ5738");
        TestHelper.grantRole("CREATE ANY TYPE");
        try {
            connection.execute(new String[]{"CREATE OR REPLACE TYPE my_tab_t AS TABLE OF VARCHAR2(128);"});
            connection.execute(new String[]{"create table DBZ5738 ( id numeric(9,0) not null,  c1 int,  c2 my_tab_t,  primary key (id))  nested table c2 store as nested_table"});
            TestHelper.streamTable(connection, "DBZ5738");
            connection.execute(new String[]{"INSERT INTO DBZ5738 VALUES (1, 25, my_tab_t('test1'))"});
            connection.execute(new String[]{"INSERT INTO DBZ5738 VALUES (2, 50, my_tab_t('test2'))"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5738").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            Assertions.assertThat(logInterceptor.containsMessage("Locking captured tables []")).isTrue();
            stopConnector();
            TestHelper.dropTable(connection, "DBZ5738");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "DBZ5738");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5907"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "LogMiner only")
    public void shouldUndoOnlyLastEventWithSavepoint() throws Exception {
        TestHelper.dropTable(connection, "dbz5907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5907 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5907");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5907").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz5907 (id,data) values (1, 'insert')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Assertions.assertThat(getAfter(sourceRecord).get("DATA")).isEqualTo("insert");
            connection.execute(new String[]{"BEGIN UPDATE dbz5907 SET data = 'update' WHERE id = 1;INSERT INTO dbz5907 (id,data) values (2, 'insert');SAVEPOINT a;UPDATE dbz5907 SET data = 'updateb' WHERE id = 1;ROLLBACK TO SAVEPOINT a;COMMIT;END;"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(2);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Assertions.assertThat(getAfter(sourceRecord2).get("DATA")).isEqualTo("update");
            SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(1);
            VerifyRecord.isValidInsert(sourceRecord3, "ID", 2);
            Assertions.assertThat(getAfter(sourceRecord3).get("DATA")).isEqualTo("insert");
            assertNoRecordsToConsume();
            stopConnector();
            TestHelper.dropTable(connection, "dbz5907");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5907");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5907"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "LogMiner only")
    public void shouldCorrectlyUndoWithMultipleSavepoints() throws Exception {
        TestHelper.dropTable(connection, "dbz5907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5907 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5907");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5907").build();
            LogInterceptor logInterceptor = build.getString(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE).equals("memory") ? new LogInterceptor(MemoryLogMinerEventProcessor.class.getName()) : new LogInterceptor(AbstractLogMinerEventProcessor.class.getName());
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz5907 (id,data) values (1, 'insert')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Assertions.assertThat(getAfter(sourceRecord).get("DATA")).isEqualTo("insert");
            connection.execute(new String[]{"BEGIN SAVEPOINT a;UPDATE dbz5907 SET data = 'update' WHERE id = 1;INSERT INTO dbz5907 (id,data) values (2, 'insert');SAVEPOINT b;UPDATE dbz5907 SET data = 'updateb' WHERE id = 1;ROLLBACK TO SAVEPOINT b;ROLLBACK TO SAVEPOINT a;UPDATE dbz5907 SET data = 'updatea' WHERE id = 1;COMMIT;END;"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Assertions.assertThat(getAfter(sourceRecord2).get("DATA")).isEqualTo("updatea");
            assertNoRecordsToConsume();
            stopConnector();
            Assertions.assertThat(logInterceptor.containsMessage("Cannot undo change on table")).as("Unable to correctly undo operation within transaction", new Object[0]).isFalse();
            TestHelper.dropTable(connection, "dbz5907");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5907");
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-6107"})
    public void shouldNotConsolidateBulkUpdateWhenLobEnabledIfUpdatesAreDifferentLogicalRowsWithoutLobColumns() throws Exception {
        TestHelper.dropTable(connection, "dbz6107");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6107 (a numeric(9,0), b varchar2(25))"});
            TestHelper.streamTable(connection, "dbz6107");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6107").with(OracleConnectorConfig.LOB_ENABLED, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 1; i <= 10; i++) {
                connection.execute(new String[]{"INSERT INTO dbz6107 (a,b) values (" + i + ",'t" + i + "')"});
            }
            connection.execute(new String[]{"UPDATE dbz6107 SET a=12 WHERE a=1 OR a=2"});
            List recordsForTopic = consumeRecordsByTopic(12).recordsForTopic("server1.DEBEZIUM.DBZ6107");
            Assertions.assertThat(recordsForTopic).hasSize(12);
            for (int i2 = 1; i2 <= 10; i2++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2 - 1);
                VerifyRecord.isValidInsert(sourceRecord);
                Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
                Assertions.assertThat(struct.get("A")).isEqualTo(Integer.valueOf(i2));
                Assertions.assertThat(struct.get("B")).isEqualTo("t" + i2);
            }
            for (int i3 = 11; i3 <= 12; i3++) {
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i3 - 1);
                VerifyRecord.isValidUpdate(sourceRecord2);
                Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("before");
                Assertions.assertThat(struct2.get("A")).isEqualTo(Integer.valueOf(i3 - 10));
                Assertions.assertThat(struct2.get("B")).isEqualTo("t" + (i3 - 10));
                Struct struct3 = ((Struct) sourceRecord2.value()).getStruct("after");
                Assertions.assertThat(struct3.get("A")).isEqualTo(12);
                Assertions.assertThat(struct3.get("B")).isEqualTo("t" + (i3 - 10));
            }
            assertNoRecordsToConsume();
            stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-6107"})
    public void shouldNotConsolidateBulkUpdateWhenLobEnabledIfUpdatesAreDifferentLogicalRowsWithLobColumns() throws Exception {
        TestHelper.dropTable(connection, "dbz6107");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6107 (a numeric(9,0), b varchar2(25), d clob, c clob)"});
            TestHelper.streamTable(connection, "dbz6107");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6107").with(OracleConnectorConfig.LOB_ENABLED, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 1; i <= 10; i++) {
                connection.execute(new String[]{"INSERT INTO dbz6107 (a,b,c,d) values (" + i + ",'t" + i + "', 'data" + i + "','x')"});
            }
            connection.execute(new String[]{"UPDATE dbz6107 SET a=12 WHERE a=1 OR a=2"});
            connection.execute(new String[]{"UPDATE dbz6107 SET a=13, c = 'Updated' WHERE a=3 OR a=4"});
            connection.execute(new String[]{"UPDATE dbz6107 SET a=14, c = NULL WHERE a=5 OR a=6"});
            List recordsForTopic = consumeRecordsByTopic(16).recordsForTopic("server1.DEBEZIUM.DBZ6107");
            Assertions.assertThat(recordsForTopic).hasSize(16);
            for (int i2 = 1; i2 <= 10; i2++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2 - 1);
                VerifyRecord.isValidInsert(sourceRecord);
                Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
                Assertions.assertThat(struct.get("A")).isEqualTo(Integer.valueOf(i2));
                Assertions.assertThat(struct.get("B")).isEqualTo("t" + i2);
                Assertions.assertThat(struct.get("C")).isEqualTo("data" + i2);
            }
            for (int i3 = 11; i3 <= 12; i3++) {
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i3 - 1);
                VerifyRecord.isValidUpdate(sourceRecord2);
                Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("before");
                Assertions.assertThat(struct2.get("A")).isEqualTo(Integer.valueOf(i3 - 10));
                Assertions.assertThat(struct2.get("B")).isEqualTo("t" + (i3 - 10));
                Assertions.assertThat(struct2.get("C")).isEqualTo("__debezium_unavailable_value");
                Struct struct3 = ((Struct) sourceRecord2.value()).getStruct("after");
                Assertions.assertThat(struct3.get("A")).isEqualTo(12);
                Assertions.assertThat(struct3.get("B")).isEqualTo("t" + (i3 - 10));
                Assertions.assertThat(struct3.get("C")).isEqualTo("__debezium_unavailable_value");
            }
            for (int i4 = 13; i4 <= 14; i4++) {
                SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(i4 - 1);
                VerifyRecord.isValidUpdate(sourceRecord3);
                Struct struct4 = ((Struct) sourceRecord3.value()).getStruct("before");
                Assertions.assertThat(struct4.get("A")).isEqualTo(Integer.valueOf(i4 - 10));
                Assertions.assertThat(struct4.get("B")).isEqualTo("t" + (i4 - 10));
                Assertions.assertThat(struct4.get("C")).isEqualTo("__debezium_unavailable_value");
                Struct struct5 = ((Struct) sourceRecord3.value()).getStruct("after");
                Assertions.assertThat(struct5.get("A")).isEqualTo(13);
                Assertions.assertThat(struct5.get("B")).isEqualTo("t" + (i4 - 10));
                Assertions.assertThat(struct5.get("C")).isEqualTo("Updated");
            }
            for (int i5 = 15; i5 <= 16; i5++) {
                SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic.get(i5 - 1);
                VerifyRecord.isValidUpdate(sourceRecord4);
                Struct struct6 = ((Struct) sourceRecord4.value()).getStruct("before");
                Assertions.assertThat(struct6.get("A")).isEqualTo(Integer.valueOf(i5 - 10));
                Assertions.assertThat(struct6.get("B")).isEqualTo("t" + (i5 - 10));
                Assertions.assertThat(struct6.get("C")).isEqualTo("__debezium_unavailable_value");
                Struct struct7 = ((Struct) sourceRecord4.value()).getStruct("after");
                Assertions.assertThat(struct7.get("A")).isEqualTo(14);
                Assertions.assertThat(struct7.get("B")).isEqualTo("t" + (i5 - 10));
                Assertions.assertThat(struct7.get("C")).isNull();
            }
            assertNoRecordsToConsume();
            stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
        } catch (Throwable th) {
            stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6120"})
    public void testCapturingChangesForTableWithSapcesInName() throws Exception {
        TestHelper.dropTable(connection, "\"Q1! 表\"");
        try {
            connection.execute(new String[]{"CREATE TABLE \"Q1! 表\" (a int)"});
            connection.execute(new String[]{"INSERT INTO \"Q1! 表\" (a) values (1)"});
            TestHelper.streamTable(connection, "\"Q1! 表\"");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.Q1! 表").build());
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.Q1___")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO \"Q1! 表\" (a) values (2)"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.Q1___")).hasSize(1);
            TestHelper.dropTable(connection, "\"Q1! 表\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "\"Q1! 表\"");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6120"})
    public void testCapturingChangesForTableWithSpecialCharactersInName() throws Exception {
        TestHelper.dropTable(connection, "\"Q1!表\"");
        try {
            connection.execute(new String[]{"CREATE TABLE \"Q1!表\" (a int)"});
            connection.execute(new String[]{"INSERT INTO \"Q1!表\" (a) values (1)"});
            TestHelper.streamTable(connection, "\"Q1!表\"");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.Q1!表").build());
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.Q1__")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO \"Q1!表\" (a) values (2)"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.Q1__")).hasSize(1);
            TestHelper.dropTable(connection, "\"Q1!表\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "\"Q1!表\"");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6143"})
    public void testTimestampWithTimeZoneFormatConsistentUsingDriverEnabledTimestampTzInGmt() throws Exception {
        TestHelper.dropTable(connection, "tz_test");
        try {
            connection.execute(new String[]{"CREATE TABLE tz_test (a timestamp with time zone)"});
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            TestHelper.streamTable(connection, "tz_test");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST").with("driver.oracle.jdbc.timestampTzInGmt", "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic2.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
            TestHelper.dropTable(connection, "tz_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "tz_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6143"})
    public void testTimestampWithTimeZoneFormatConsistentUsingDriverDisabledTimestampTzInGmt() throws Exception {
        TestHelper.dropTable(connection, "tz_test");
        try {
            connection.execute(new String[]{"CREATE TABLE tz_test (a timestamp with time zone)"});
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            TestHelper.streamTable(connection, "tz_test");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST").with("driver.oracle.jdbc.timestampTzInGmt", "false").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Assertions.assertThat(getAfter((SourceRecord) recordsForTopic2.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
            TestHelper.dropTable(connection, "tz_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "tz_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6221"})
    public void testShouldProperlyMapCharacterColumnTypesAsCharWhenTableCreatedDuringStreamingPhase() throws Exception {
        TestHelper.dropTable(connection, "dbz6221");
        try {
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ6221").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"CREATE TABLE dbz6221 (data0 character, data1 character(5), data2 character varying(5))"});
            TestHelper.streamTable(connection, "dbz6221");
            connection.execute(new String[]{"INSERT INTO dbz6221 values ('a', 'abc', 'abc')"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6221");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.schema().fields()).hasSize(3);
            Assertions.assertThat(struct.schema().field("DATA0").schema().type()).isEqualTo(Schema.Type.STRING);
            Assertions.assertThat(struct.schema().field("DATA1").schema().type()).isEqualTo(Schema.Type.STRING);
            Assertions.assertThat(struct.schema().field("DATA2").schema().type()).isEqualTo(Schema.Type.STRING);
            Assertions.assertThat(struct.get("DATA0")).isEqualTo("a");
            Assertions.assertThat(struct.get("DATA1")).isEqualTo("abc  ");
            Assertions.assertThat(struct.get("DATA2")).isEqualTo("abc");
            TestHelper.dropTable(connection, "dbz6221");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6221");
            throw th;
        }
    }

    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies to LogMiner only")
    @Test
    @FixFor({"DBZ-5395"})
    public void testShouldAdvanceStartScnWhenNoActiveTransactionsBetweenIterationsWhenLobEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz5395");
        try {
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ5395").with(OracleConnectorConfig.LOB_ENABLED, "true").build();
            connection.execute(new String[]{"CREATE TABLE dbz5395 (data0 character, data1 character(5), data2 character varying(5))"});
            TestHelper.streamTable(connection, "dbz5395");
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AtomicReference atomicReference = new AtomicReference(Scn.NULL);
            Awaitility.await().atMost(Duration.ofMinutes(5L)).until(() -> {
                BigInteger bigInteger = (BigInteger) getStreamingMetric("OffsetScn");
                if (bigInteger == null) {
                    return false;
                }
                atomicReference.set(new Scn(bigInteger));
                return true;
            });
            Awaitility.await().atMost(Duration.ofMinutes(5L)).pollInterval(Duration.ofSeconds(2L)).until(() -> {
                return Boolean.valueOf(new Scn((BigInteger) getStreamingMetric("OffsetScn")).compareTo((Scn) atomicReference.get()) > 0);
            });
            TestHelper.dropTable(connection, "dbz5395");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz5395");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6355"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "User-defined types not supported")
    public void testBacklogTransactionShouldNotBeAbandon() throws Exception {
        TestHelper.dropTable(connection, "dbz6355");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6355 (id numeric(9,0) primary key, name varchar2(50))"});
            TestHelper.streamTable(connection, "dbz6355");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION_MS, 60000L).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6355").build();
            connection.execute(new String[]{"INSERT INTO dbz6355 (id,name) values (1, 'Gerald Jinx Mouse')"});
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ6355");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Gerald Jinx Mouse");
            stopConnector();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz6355 (id,name) values (2, 'Minnie Mouse')"});
                LOGGER.info("Waiting {}ms for second change to age; should not be captured.", 70000L);
                Thread.sleep(70000L);
                start(OracleConnector.class, build);
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                Long l = (Long) getStreamingMetric("FetchingQueryCount");
                connection.execute(new String[]{"INSERT INTO dbz6355 (id,name) VALUES (3, 'Donald Duck')"});
                Awaitility.waitAtMost(Duration.ofSeconds(60L)).until(() -> {
                    return Boolean.valueOf(l.longValue() + 5 <= ((Long) getStreamingMetric("FetchingQueryCount")).longValue());
                });
                testConnection.commit();
                if (testConnection != null) {
                    testConnection.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(1);
                List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ6355");
                Assertions.assertThat(recordsForTopic2).hasSize(1);
                Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
                Assertions.assertThat(struct2.get("ID")).isEqualTo(3);
                Assertions.assertThat(struct2.get("NAME")).isEqualTo("Donald Duck");
                assertNoRecordsToConsume();
                TestHelper.dropTable(connection, "dbz6355");
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6355");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6439"})
    public void shouldGetTableMetadataOnlyForCapturedTables() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL_ONLY).with(OracleConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        LogInterceptor logInterceptor = new LogInterceptor(JdbcConnection.class);
        logInterceptor.setLoggerLevel(JdbcConnection.class, Level.DEBUG);
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        stopConnector();
        Assertions.assertThat(logInterceptor.containsMessage("1 table(s) will be scanned")).isTrue();
    }

    @Test
    @FixFor({"DBZ-6499"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldRestartOracleJdbcConnectionAtMaxSessionThreshold() throws Exception {
        TestHelper.forceLogfileSwitch();
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.LOG_MINING_SESSION_MAX_MS, "30000").with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true").build();
        LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
        logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("restarting Oracle JDBC connection"));
        });
        Thread.sleep(5000L);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-6499"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
    public void shouldRestartOracleJdbcConnectionUponLogSwitch() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true").build();
        LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
        logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
            if (!atomicBoolean.get()) {
                if (!logInterceptor.containsMessage("Oracle Session UGA")) {
                    return false;
                }
                atomicBoolean.set(true);
            }
            if (atomicBoolean2.get()) {
                return Boolean.valueOf(logInterceptor.containsMessage("restarting Oracle JDBC connection"));
            }
            atomicBoolean2.set(true);
            TestHelper.forceLogfileSwitch();
            return false;
        });
        Thread.sleep(5000L);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-6528"})
    public void shouldNotFailToStartWhenSignalDataCollectionNotDefinedWithinTableIncludeList() throws Exception {
        OracleConnection adminConnection;
        try {
            TestHelper.dropTable(connection, "dbz6528");
            TestHelper.dropTable(connection, "dbz6495");
            OracleConnection adminConnection2 = TestHelper.adminConnection();
            try {
                if (TestHelper.isUsingPdb()) {
                    adminConnection2.setSessionToPdb(TestHelper.getDatabaseName());
                }
                TestHelper.dropTable(adminConnection2, "c##dbzuser.signals");
                adminConnection2.execute(new String[]{"CREATE TABLE c##dbzuser.signals (id varchar2(64), type varchar2(32), data varchar2(2048))"});
                TestHelper.streamTable(adminConnection2, "c##dbzuser.signals");
                if (adminConnection2 != null) {
                    adminConnection2.close();
                }
                connection.execute(new String[]{"CREATE TABLE dbz6528 (id numeric(9,0), data varchar2(50))"});
                TestHelper.streamTable(connection, "dbz6528");
                start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6528").with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".C##DBZUSER.SIGNALS").with(OracleConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "true").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA.getValue()).build());
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                connection.execute(new String[]{"INSERT INTO dbz6528 (id,data) values (1, 'data')"});
                Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6528")).hasSize(1);
                stopConnector();
                TestHelper.dropTable(connection, "dbz6528");
                adminConnection = TestHelper.adminConnection();
                try {
                    if (TestHelper.isUsingPdb()) {
                        adminConnection.setSessionToPdb(TestHelper.getDatabaseName());
                    }
                    TestHelper.dropTable(adminConnection, "c##dbzuser.signals");
                    if (adminConnection != null) {
                        adminConnection.close();
                    }
                } catch (Throwable th) {
                    throw th;
                }
            } catch (Throwable th2) {
                if (adminConnection2 != null) {
                    try {
                        adminConnection2.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
                throw th2;
            }
        } catch (Throwable th4) {
            TestHelper.dropTable(connection, "dbz6528");
            adminConnection = TestHelper.adminConnection();
            try {
                if (TestHelper.isUsingPdb()) {
                    adminConnection.setSessionToPdb(TestHelper.getDatabaseName());
                }
                TestHelper.dropTable(adminConnection, "c##dbzuser.signals");
                if (adminConnection != null) {
                    adminConnection.close();
                }
                throw th4;
            } finally {
                if (adminConnection != null) {
                    try {
                        adminConnection.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            }
        }
    }

    @Test
    @FixFor({"DBZ-6650"})
    public void shouldNotThrowConcurrentModificationExceptionWhenDispatchingSchemaChangeEvent() throws Exception {
        TestHelper.dropTable(connection, "dbz6650_snapshot");
        TestHelper.dropTable(connection, "dbz6650_stream");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6650_snapshot (id numeric(9,0), data varchar2(50), primary key(id))"});
            connection.execute(new String[]{"INSERT INTO dbz6650_snapshot values (1, 'data')"});
            TestHelper.streamTable(connection, "dbz6650_snapshot");
            connection.execute(new String[]{"CREATE TABLE dbz6650_stream (id numeric(9,0), data varchar2(50), primary key(id))"});
            connection.execute(new String[]{"INSERT INTO dbz6650_stream values (1, 'data')"});
            TestHelper.streamTable(connection, "dbz6650_stream");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6650_SNAPSHOT").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(OracleConnectorConfig.SKIPPED_OPERATIONS, "none").with(OracleConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, "true").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ6650_SNAPSHOT");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
            stopConnector();
            start(OracleConnector.class, build.edit().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6650_SNAPSHOT,DEBEZIUM\\.DBZ6650_STREAM").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            assertNoRecordsToConsume();
            connection.execute(new String[]{"TRUNCATE TABLE dbz6650_stream"});
            connection.execute(new String[]{"INSERT INTO dbz6650_stream (id,data) values (2,'data')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ6650_STREAM");
            Assertions.assertThat(recordsForTopic2).hasSize(2);
            VerifyRecord.isValidTruncate((SourceRecord) recordsForTopic2.get(0));
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(1), "ID", 2);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(TestHelper.SERVER_NAME)).hasSize(1);
            TestHelper.dropTable(connection, "dbz6650_snapshot");
            TestHelper.dropTable(connection, "dbz6650_stream");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6650_snapshot");
            TestHelper.dropTable(connection, "dbz6650_stream");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6660"})
    @SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
    public void shouldPauseAndWaitForDeviationCalculationIfBeforeMiningRange() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz6660");
            connection.execute(new String[]{"CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz6660");
            Long l = 10000L;
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660").with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, l.toString()).with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_MAX, "100").with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_DEFAULT, "100").with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_MIN, "100").build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
            LogInterceptor logInterceptor2 = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            logInterceptor2.setLoggerLevel(AbstractLogMinerEventProcessor.class, Level.DEBUG);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("outside of mining range, recalculating."));
            });
            try {
                Pattern compile = Pattern.compile("Lag: ([0-9]+)");
                AtomicInteger atomicInteger = new AtomicInteger(1);
                Awaitility.await().pollInterval(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(60L)).until(() -> {
                    connection.execute(new String[]{"INSERT INTO dbz6660 values (" + atomicInteger.getAndIncrement() + ", 'data')"});
                    Iterator it = logInterceptor2.getLogEntriesThatContainsMessage("Processed in ").iterator();
                    while (it.hasNext()) {
                        Matcher matcher = compile.matcher((String) it.next());
                        if (matcher.matches()) {
                            Assertions.assertThat(Long.valueOf(matcher.group(1))).isGreaterThan(l);
                        }
                    }
                    return false;
                });
            } catch (ConditionTimeoutException e) {
            }
            assertConnectorIsRunning();
            stopConnector();
            TestHelper.dropTable(connection, "dbz6660");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6660");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6660"})
    @Ignore("Test can be flaky when using a brand new docker instance")
    public void shouldUseEndScnIfDeviationProducesScnOutsideOfUndoRetention() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz6660");
            connection.execute(new String[]{"CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz6660");
            Long l = 10000000000L;
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660").with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, l.toString()).with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_MAX, "100").with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_DEFAULT, "100").with(OracleConnectorConfig.LOG_MINING_BATCH_SIZE_MIN, "100").build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("outside undo space, using upperbounds"));
            });
            stopConnector();
            TestHelper.dropTable(connection, "dbz6660");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6660");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6677"})
    public void shouldCaptureInvisibleColumn() throws Exception {
        TestHelper.dropTable(connection, "dbz6677");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6677 (id number(9,0) primary key, data varchar2(50), data2 varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz6677 values (1, 'Daffy', 'Daffy')"});
            TestHelper.streamTable(connection, "dbz6677");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6677").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6677");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Daffy");
            Assertions.assertThat(struct.get("DATA2")).isEqualTo("Daffy");
            connection.execute(new String[]{"ALTER TABLE dbz6677 modify data invisible"});
            connection.execute(new String[]{"UPDATE dbz6677 set DATA2 = 'Donald' WHERE id = 1"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6677");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Daffy");
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo("Donald");
            connection.execute(new String[]{"ALTER TABLE dbz6677 modify data visible"});
            connection.execute(new String[]{"INSERT INTO dbz6677 values (3, 'Hewy', 'Hewy')"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ6677");
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 3);
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic3.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("DATA")).isEqualTo("Hewy");
            Assertions.assertThat(struct3.get("DATA2")).isEqualTo("Hewy");
            TestHelper.dropTable(connection, "dbz6677");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6677");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6975"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "LogMiner performs DML parsing")
    public void shouldHandleEscapedSingleQuotesInCharacterFields() throws Exception {
        TestHelper.dropTable(connection, "dbz6975");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6975 (c0 varchar2(50), c1 nvarchar2(50), c2 char(10), c3 nchar(10))"});
            TestHelper.streamTable(connection, "dbz6975");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6975").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz6975 values (" + String.join(",", "UNISTR('\\0412') || 'a''b\\c'", "UNISTR('\\0412') || 'a''b\\c'", "UNISTR('\\0412') || 'a''b\\c'", "UNISTR('\\0412') || 'a''b\\c'") + ")"});
            connection.execute(new String[]{"UPDATE dbz6975 set c0=" + "UNISTR('\\041D') || 'bc''d'" + ", c1=" + "UNISTR('\\041D') || 'bc''d'" + ",c2=" + "UNISTR('\\041D') || 'bc''d'" + ",c3=" + "UNISTR('\\041D') || 'bc''d'"});
            connection.execute(new String[]{"DELETE FROM dbz6975"});
            List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.DEBEZIUM.DBZ6975");
            Assertions.assertThat(recordsForTopic).hasSize(3);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord);
            Assertions.assertThat(getAfter(sourceRecord).get("C0")).isEqualTo("Вa'b\\c");
            Assertions.assertThat(getAfter(sourceRecord).get("C1")).isEqualTo("Вa'b\\c");
            Assertions.assertThat(getAfter(sourceRecord).get("C2")).isEqualTo("Вa'b\\c   ");
            Assertions.assertThat(getAfter(sourceRecord).get("C3")).isEqualTo("Вa'b\\c    ");
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidUpdate(sourceRecord2);
            Assertions.assertThat(getAfter(sourceRecord2).get("C0")).isEqualTo("Нbc'd");
            Assertions.assertThat(getAfter(sourceRecord2).get("C1")).isEqualTo("Нbc'd");
            Assertions.assertThat(getAfter(sourceRecord2).get("C2")).isEqualTo("Нbc'd    ");
            Assertions.assertThat(getAfter(sourceRecord2).get("C3")).isEqualTo("Нbc'd     ");
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
            VerifyRecord.isValidDelete(sourceRecord3);
            Assertions.assertThat(getBefore(sourceRecord3).get("C0")).isEqualTo("Нbc'd");
            Assertions.assertThat(getBefore(sourceRecord3).get("C1")).isEqualTo("Нbc'd");
            Assertions.assertThat(getBefore(sourceRecord3).get("C2")).isEqualTo("Нbc'd    ");
            Assertions.assertThat(getBefore(sourceRecord3).get("C3")).isEqualTo("Нbc'd     ");
            stopConnector();
            TestHelper.dropTable(connection, "dbz6975");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz6975");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4332", "DBZ-7823"})
    public void shouldCaptureRowIdForDataChanges() throws Exception {
        TestHelper.dropTable(connection, "dbz4332");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4332 (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz4332");
            connection.execute(new String[]{"INSERT INTO dbz4332 VALUES (1, 'snapshot')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4332").with(OracleConnectorConfig.TOMBSTONES_ON_DELETE, "false").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz4332 VALUES (2, 'streaming')"});
            connection.execute(new String[]{"UPDATE dbz4332 set data = 'update'"});
            connection.execute(new String[]{"DELETE FROM dbz4332"});
            List recordsForTopic = consumeRecordsByTopic(6).recordsForTopic("server1.DEBEZIUM.DBZ4332");
            Assertions.assertThat(recordsForTopic).hasSize(6);
            for (int i = 0; i < recordsForTopic.size(); i++) {
                Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(i)).value()).getStruct("source");
                if (i == 0) {
                    Assertions.assertThat(struct.get("row_id")).isNull();
                } else {
                    Assertions.assertThat(struct.get("row_id")).isNotNull();
                }
            }
            stopConnector();
            TestHelper.dropTable(connection, "dbz4332");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4332");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7831"})
    public void shouldStreamChangesForTableWithSingleQuote() throws Exception {
        TestHelper.dropTable(connection, "\"debezium_test'\"");
        try {
            connection.execute(new String[]{"CREATE TABLE \"debezium_test'\"\n(\n    id NUMBER(10),\n    first_name VARCHAR2(50),\n    last_name VARCHAR2(50),\n    PRIMARY KEY(ID)\n)"});
            connection.execute(new String[]{"INSERT INTO \"debezium_test'\" (id,first_name,last_name) values (1,'Andy','Griffith')"});
            TestHelper.streamTable(connection, "\"debezium_test'\"");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.debezium_test'").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO \"debezium_test'\" (id,first_name,last_name) values (2,'Elmer','Fudd')"});
            Assertions.assertThat(consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.debezium_test_")).hasSize(2);
            TestHelper.dropTable(connection, "\"debezium_test'\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "\"debezium_test'\"");
            throw th;
        }
    }

    private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
        OracleConnection adminConnection = TestHelper.adminConnection(true);
        try {
            Scn currentScn = adminConnection.getCurrentScn();
            Awaitility.await().atMost(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> {
                BigInteger bigInteger = (BigInteger) getStreamingMetric("CurrentScn");
                if (bigInteger == null) {
                    return false;
                }
                return Boolean.valueOf(new Scn(bigInteger).compareTo(currentScn) > 0);
            });
            if (adminConnection != null) {
                adminConnection.close();
            }
        } catch (Throwable th) {
            if (adminConnection != null) {
                try {
                    adminConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Struct getAfter(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }

    private Struct getBefore(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("before");
    }
}
