package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/QueueProcessor.class */
public class QueueProcessor extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueProcessor.class);
    private final ChangeEventQueue<Event> queue;
    private final Emitter recordEmitter;
    private final String commitLogRelocationDir;
    private final Set<String> erroneousCommitLogs;
    private static final String NAME_PREFIX = "Queue Processor ";
    public static final String ARCHIVE_FOLDER = "archive";
    public static final String ERROR_FOLDER = "error";

    public QueueProcessor(CassandraConnectorContext cassandraConnectorContext, int i, Emitter emitter) {
        super("Queue Processor [" + i + "]", Duration.ZERO);
        this.queue = cassandraConnectorContext.getQueues().get(i);
        this.erroneousCommitLogs = cassandraConnectorContext.getErroneousCommitLogs();
        this.commitLogRelocationDir = cassandraConnectorContext.getCassandraConnectorConfig().commitLogRelocationDir();
        this.recordEmitter = emitter;
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() throws InterruptedException {
        for (Event event : this.queue.poll()) {
            try {
                processEvent(event);
            } catch (Exception e) {
                LOGGER.error("Processing of event {} was errorneous: {}", event, e);
            }
        }
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() throws Exception {
        File file = new File(this.commitLogRelocationDir);
        if (!file.exists() && !file.mkdir()) {
            throw new IOException("Failed to create " + this.commitLogRelocationDir);
        }
        File file2 = new File(file, ARCHIVE_FOLDER);
        if (!file2.exists() && !file2.mkdir()) {
            throw new IOException("Failed to create " + file2);
        }
        File file3 = new File(file, ERROR_FOLDER);
        if (!file3.exists() && !file3.mkdir()) {
            throw new IOException("Failed to create " + file3);
        }
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() throws Exception {
        this.recordEmitter.close();
    }

    private void processEvent(Event event) {
        if (event == null) {
            return;
        }
        switch (event.getEventType()) {
            case CHANGE_EVENT:
                this.recordEmitter.emit((ChangeRecord) event);
                return;
            case TOMBSTONE_EVENT:
                this.recordEmitter.emit((TombstoneRecord) event);
                return;
            case EOF_EVENT:
                Path path = Paths.get(((EOFEvent) event).file.getAbsolutePath(), new String[0]);
                String path2 = path.getFileName().toString();
                LOGGER.info("Encountered EOF event for {} ...", path2);
                CommitLogUtil.moveCommitLog(path, Paths.get(this.commitLogRelocationDir, this.erroneousCommitLogs.contains(path2) ? ERROR_FOLDER : ARCHIVE_FOLDER));
                return;
            default:
                LOGGER.warn("Encountered unexpected record with type: {}", event.getEventType());
                return;
        }
    }
}
