package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipOnReadOnly;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnReadOnly;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipOnReadOnly(reason = "Test expects flush table, not applicable during read only.")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Flush strategy only applies to LogMiner implementation")
/* loaded from: input_file:io/debezium/connector/oracle/logminer/FlushStrategyIT.class */
public class FlushStrategyIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();

    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();

    @Rule
    public final TestRule skipReadOnly = new SkipTestDependingOnReadOnly();
    private static OracleConnection connection;
    private static String flushTableName;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        flushTableName = TestHelper.defaultConfig().build().getString(OracleConnectorConfig.LOG_MINING_FLUSH_TABLE_NAME);
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    @FixFor({"DBZ-4118"})
    public void shouldOnlyMaintainOneRowInFlushStrategyTable() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4118");
            connection.execute(new String[]{"CREATE TABLE dbz4118 (id numeric(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz4118");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4118").build();
            dropFlushTable(build);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (1,'Test')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            assertFlushTableHasExactlyOneRow(build);
            stopConnector();
            insertFlushTable(build, "12345");
            LogInterceptor logInterceptor = new LogInterceptor(CommitLogWriterFlushStrategy.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(logInterceptor.containsWarnMessage("DBZ-4118: The flush table, " + flushTableName + ", has multiple rows"));
            });
            TestHelper.sleep(5L, TimeUnit.SECONDS);
            assertFlushTableHasExactlyOneRow(build);
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (2,'Test')"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            assertFlushTableHasExactlyOneRow(build);
            TestHelper.dropTable(connection, "dbz4118");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz4118");
            throw th;
        }
    }

    private void assertFlushTableHasExactlyOneRow(Configuration configuration) throws SQLException {
        OracleConnection defaultConnection = TestHelper.defaultConnection(true);
        try {
            String string = configuration.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty(string)) {
                defaultConnection.setSessionToPdb(string);
            }
            Assertions.assertThat(defaultConnection.getRowCount(getFlushTableId())).isEqualTo(1L);
            if (defaultConnection != null) {
                defaultConnection.close();
            }
        } catch (Throwable th) {
            if (defaultConnection != null) {
                try {
                    defaultConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void dropFlushTable(Configuration configuration) throws SQLException {
        OracleConnection adminConnection = TestHelper.adminConnection(true);
        try {
            String string = configuration.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty(string)) {
                adminConnection.setSessionToPdb(string);
            }
            TestHelper.dropTable(adminConnection, getFlushTableName());
            if (adminConnection != null) {
                adminConnection.close();
            }
        } catch (Throwable th) {
            if (adminConnection != null) {
                try {
                    adminConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void insertFlushTable(Configuration configuration, String str) throws SQLException {
        OracleConnection defaultConnection = TestHelper.defaultConnection(true);
        try {
            String string = configuration.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty(string)) {
                defaultConnection.setSessionToPdb(string);
            }
            defaultConnection.execute(new String[]{"INSERT INTO " + getFlushTableName() + " values (" + str + ")"});
            if (defaultConnection != null) {
                defaultConnection.close();
            }
        } catch (Throwable th) {
            if (defaultConnection != null) {
                try {
                    defaultConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static String getFlushTableName() {
        return TestHelper.getConnectorUserName().toUpperCase() + "." + flushTableName;
    }

    private static TableId getFlushTableId() {
        return TableId.parse(getFlushTableName());
    }
}
