package io.debezium.connector.cassandra;

import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.time.Conversions;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Properties;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/cassandra/FileOffsetWriterTest.class */
public class FileOffsetWriterTest {
    private Path offsetDir;
    private OffsetWriter offsetWriter;
    private Properties snapshotProps;
    private Properties commitLogProps;
    private CassandraSchemaFactory schemaFactory = CassandraSchemaFactory.get();

    @Before
    public void setUp() throws IOException {
        this.offsetDir = Files.createTempDirectory("offset", new FileAttribute[0]);
        this.offsetWriter = new FileOffsetWriter(this.offsetDir.toAbsolutePath().toString());
        this.snapshotProps = new Properties();
        this.commitLogProps = new Properties();
    }

    @Test
    public void testMarkOffset() {
        ChangeRecord generateRecord = generateRecord(true, true, new OffsetPosition("", -1), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
        ChangeRecord generateRecord2 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
        ChangeRecord generateRecord3 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
        ChangeRecord generateRecord4 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12344.log", 101), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
        ChangeRecord generateRecord5 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_another_table"));
        Assert.assertFalse(isProcessed(generateRecord));
        process(generateRecord);
        Assert.assertTrue(isProcessed(generateRecord));
        Assert.assertFalse(isProcessed(generateRecord2));
        process(generateRecord2);
        Assert.assertTrue(isProcessed(generateRecord2));
        Assert.assertTrue(isProcessed(generateRecord3));
        process(generateRecord3);
        Assert.assertTrue(isProcessed(generateRecord3));
        Assert.assertTrue(isProcessed(generateRecord4));
        process(generateRecord4);
        Assert.assertTrue(isProcessed(generateRecord4));
        Assert.assertTrue(isProcessed(generateRecord2));
        Assert.assertFalse(isProcessed(generateRecord5));
        process(generateRecord5);
        Assert.assertTrue(isProcessed(generateRecord5));
    }

    @Test
    public void testFlush() throws IOException {
        this.offsetWriter.flush();
        FileInputStream fileInputStream = new FileInputStream(this.offsetDir.toString() + "/snapshot_offset.properties");
        try {
            this.snapshotProps.load(fileInputStream);
            fileInputStream.close();
            fileInputStream = new FileInputStream(this.offsetDir.toString() + "/commitlog_offset.properties");
            try {
                this.commitLogProps.load(fileInputStream);
                fileInputStream.close();
                Assert.assertEquals(0L, this.snapshotProps.size());
                Assert.assertEquals(0L, this.commitLogProps.size());
                ChangeRecord generateRecord = generateRecord(true, true, new OffsetPosition("", -1), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
                ChangeRecord generateRecord2 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table"));
                ChangeRecord generateRecord3 = generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_another_table"));
                process(generateRecord);
                process(generateRecord2);
                process(generateRecord3);
                this.offsetWriter.flush();
                FileInputStream fileInputStream2 = new FileInputStream(this.offsetDir.toString() + "/snapshot_offset.properties");
                try {
                    this.snapshotProps.load(fileInputStream2);
                    fileInputStream2.close();
                    fileInputStream = new FileInputStream(this.offsetDir.toString() + "/commitlog_offset.properties");
                    try {
                        this.commitLogProps.load(fileInputStream);
                        fileInputStream.close();
                        Assert.assertEquals(1L, this.snapshotProps.size());
                        Assert.assertEquals(2L, this.commitLogProps.size());
                        Assert.assertEquals(OffsetPosition.defaultOffsetPosition().serialize(), this.snapshotProps.getProperty(new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table").name()));
                        Assert.assertEquals(new OffsetPosition("CommitLog-6-12345.log", 100).serialize(), this.commitLogProps.getProperty(new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_table").name()));
                        Assert.assertEquals(new OffsetPosition("CommitLog-6-12345.log", 100).serialize(), this.commitLogProps.getProperty(new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "test_another_table").name()));
                    } finally {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                } finally {
                    try {
                        fileInputStream2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test(expected = CassandraConnectorTaskException.class)
    public void testTwoFileWriterCannotCoexist() throws IOException {
        new FileOffsetWriter(this.offsetDir.toAbsolutePath().toString());
    }

    private ChangeRecord generateRecord(boolean z, boolean z2, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
        return new ChangeRecord(new SourceInfo(new CassandraConnectorConfig(Configuration.empty().edit().with(CassandraConnectorConfig.TOPIC_PREFIX, "someconnector").build()), "test-cluster", offsetPosition, keyspaceTable, z2, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)), this.schemaFactory.rowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, z);
    }

    private boolean isProcessed(ChangeRecord changeRecord) {
        return this.offsetWriter.isOffsetProcessed(changeRecord.getSource().keyspaceTable.name(), changeRecord.getSource().offsetPosition.serialize(), changeRecord.getSource().snapshot);
    }

    private void process(ChangeRecord changeRecord) {
        this.offsetWriter.markOffset(changeRecord.getSource().keyspaceTable.name(), changeRecord.getSource().offsetPosition.serialize(), changeRecord.getSource().snapshot);
    }
}
