package io.debezium.connector.cassandra;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraOffsetContext;
import io.debezium.connector.cassandra.CassandraPartition;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.spi.Offsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:io/debezium/connector/cassandra/AbstractConnectorTask.class */
public abstract class AbstractConnectorTask extends BaseSourceTask<CassandraPartition, CassandraOffsetContext> {
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private CassandraConnectorTaskTemplate template;

    public String version() {
        return Module.version();
    }

    protected ChangeEventSourceCoordinator<CassandraPartition, CassandraOffsetContext> start(Configuration configuration) {
        CassandraConnectorConfig cassandraConnectorConfig = new CassandraConnectorConfig(configuration);
        if (cassandraConnectorConfig.numOfChangeEventQueues() != 1) {
            throw new CassandraConnectorConfigException(String.format("configuration property %s must be equal to 1", CassandraConnectorConfig.NUM_OF_CHANGE_EVENT_QUEUES.name()));
        }
        cassandraConnectorConfig.setValidationFieldList(Arrays.asList(CassandraConnectorConfig.CASSANDRA_NODE_ID, CassandraConnectorConfig.COMMIT_LOG_RELOCATION_DIR, CassandraConnectorConfig.SCHEMA_POLL_INTERVAL_MS, CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS));
        this.queue = new ChangeEventQueue.Builder().pollInterval(cassandraConnectorConfig.getPollInterval()).maxBatchSize(cassandraConnectorConfig.getMaxBatchSize()).maxQueueSize(cassandraConnectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(cassandraConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return this.template.getTaskContext().configureLoggingContext(cassandraConnectorConfig.getContextName());
        }).build();
        CassandraOffsetContext.Loader loader = new CassandraOffsetContext.Loader();
        Offsets previousOffsets = getPreviousOffsets(new CassandraPartition.Provider(cassandraConnectorConfig), loader);
        CassandraOffsetContext cassandraOffsetContext = (CassandraOffsetContext) previousOffsets.getTheOnlyOffset();
        if (cassandraOffsetContext == null) {
            cassandraOffsetContext = loader.load((Map<String, ?>) new HashMap());
        }
        this.template = init(cassandraConnectorConfig, new ComponentFactoryDebezium(this.queue, (CassandraPartition) previousOffsets.getTheOnlyPartition(), cassandraOffsetContext));
        try {
            this.template.start();
            return null;
        } catch (Exception e) {
            throw new CassandraConnectorTaskException(e);
        }
    }

    protected abstract CassandraConnectorTaskTemplate init(CassandraConnectorConfig cassandraConnectorConfig, ComponentFactory componentFactory);

    protected List<SourceRecord> doPoll() throws InterruptedException {
        return (List) this.queue.poll().stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    protected void doStop() {
        try {
            this.template.stopAll();
        } catch (Exception e) {
            throw new CassandraConnectorTaskException(e);
        }
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return Collections.emptyList();
    }
}
