package io.debezium.connector.cassandra;

import java.io.File;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogPostProcessor.class */
public class CommitLogPostProcessor extends AbstractProcessor {
    private static final String NAME = "Commit Log Post-Processor";
    private static final int THREAD_POOL_SIZE = 10;
    private static final int TERMINATION_WAIT_TIME_SECONDS = 10;
    private final ExecutorService executor;
    private final String commitLogRelocationDir;
    private final CommitLogTransfer commitLogTransfer;

    public CommitLogPostProcessor(CassandraConnectorConfig cassandraConnectorConfig) {
        super(NAME, cassandraConnectorConfig.commitLogRelocationDirPollInterval());
        this.commitLogRelocationDir = cassandraConnectorConfig.commitLogRelocationDir();
        this.executor = Executors.newFixedThreadPool(10);
        this.commitLogTransfer = cassandraConnectorConfig.getCommitLogTransfer();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() {
        File[] commitLogs = CommitLogUtil.getCommitLogs(Paths.get(this.commitLogRelocationDir, QueueProcessor.ARCHIVE_FOLDER).toFile());
        Arrays.sort(commitLogs, CommitLogUtil::compareCommitLogs);
        for (File file : commitLogs) {
            if (isRunning()) {
                this.executor.submit(() -> {
                    this.commitLogTransfer.onSuccessTransfer(file);
                });
            }
        }
        File[] commitLogs2 = CommitLogUtil.getCommitLogs(Paths.get(this.commitLogRelocationDir, QueueProcessor.ERROR_FOLDER).toFile());
        Arrays.sort(commitLogs2, CommitLogUtil::compareCommitLogs);
        for (File file2 : commitLogs2) {
            if (isRunning()) {
                this.executor.submit(() -> {
                    this.commitLogTransfer.onErrorTransfer(file2);
                });
            }
        }
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() throws Exception {
        shutDown(true);
        this.commitLogTransfer.destroy();
    }

    void shutDown(boolean z) {
        try {
            if (!this.executor.isShutdown()) {
                this.executor.shutdown();
                if (z && !this.executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                    this.executor.shutdownNow();
                }
            }
        } catch (InterruptedException e) {
            if (this.executor.isTerminated()) {
                return;
            }
            this.executor.shutdownNow();
        }
    }
}
