package io.debezium.connector.cassandra;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/SnapshotProcessor.class */
public class SnapshotProcessor extends AbstractProcessor {
    private static final Logger LOGGER;
    private static final String NAME = "Snapshot Processor";
    private static final String CASSANDRA_NOW_UNIXTIMESTAMP = "TOUNIXTIMESTAMP(NOW())";
    private static final String EXECUTION_TIME_ALIAS = "execution_time";
    private static final Set<Integer> collectionTypes;
    private static final CassandraSchemaFactory schemaFactory;
    private final CassandraClient cassandraClient;
    private final List<ChangeEventQueue<Event>> queues;
    private final OffsetWriter offsetWriter;
    private final SchemaHolder schemaHolder;
    private final RecordMaker recordMaker;
    private final CassandraConnectorConfig.SnapshotMode snapshotMode;
    private final ConsistencyLevel consistencyLevel;
    private final Set<String> startedTableNames;
    private final SnapshotProcessorMetrics metrics;
    private boolean initial;
    private final String clusterName;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SnapshotProcessor(CassandraConnectorContext cassandraConnectorContext, String str) {
        super(NAME, cassandraConnectorContext.getCassandraConnectorConfig().snapshotPollInterval());
        this.startedTableNames = new HashSet();
        this.metrics = new SnapshotProcessorMetrics();
        this.initial = true;
        this.queues = cassandraConnectorContext.getQueues();
        this.cassandraClient = cassandraConnectorContext.getCassandraClient();
        this.offsetWriter = cassandraConnectorContext.getOffsetWriter();
        this.schemaHolder = cassandraConnectorContext.getSchemaHolder();
        this.recordMaker = new RecordMaker(cassandraConnectorContext.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(cassandraConnectorContext.getCassandraConnectorConfig().fieldExcludeList()), cassandraConnectorContext.getCassandraConnectorConfig());
        this.snapshotMode = cassandraConnectorContext.getCassandraConnectorConfig().snapshotMode();
        this.consistencyLevel = cassandraConnectorContext.getCassandraConnectorConfig().snapshotConsistencyLevel();
        this.clusterName = str;
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() throws IOException {
        if (this.snapshotMode == CassandraConnectorConfig.SnapshotMode.ALWAYS) {
            snapshot();
        } else if (this.snapshotMode != CassandraConnectorConfig.SnapshotMode.INITIAL || !this.initial) {
            LOGGER.debug("Skipping snapshot [mode: {}]", this.snapshotMode);
        } else {
            snapshot();
            this.initial = false;
        }
    }

    public synchronized void snapshot() throws IOException {
        Set<TableMetadata> tablesToSnapshot = getTablesToSnapshot();
        if (tablesToSnapshot.isEmpty()) {
            LOGGER.info("No table to snapshot");
            return;
        }
        String[] strArr = (String[]) tablesToSnapshot.stream().map(SnapshotProcessor::tableName).toArray(i -> {
            return new String[i];
        });
        LOGGER.debug("Found {} tables to snapshot: {}", Integer.valueOf(tablesToSnapshot.size()), strArr);
        long currentTimeMillis = System.currentTimeMillis();
        this.metrics.setTableCount(tablesToSnapshot.size());
        this.metrics.startSnapshot();
        for (TableMetadata tableMetadata : tablesToSnapshot) {
            if (isRunning()) {
                String tableName = tableName(tableMetadata);
                LOGGER.info("Snapshotting table {} ...", tableName);
                this.startedTableNames.add(tableName);
                takeTableSnapshot(tableMetadata);
                this.metrics.completeTable();
                LOGGER.info("Snapshot of table {} has been taken", tableName);
            }
        }
        this.metrics.stopSnapshot();
        LOGGER.debug("Snapshot completely queued in {} seconds for tables: {}", Long.valueOf(Duration.ofMillis(System.currentTimeMillis() - currentTimeMillis).getSeconds()), strArr);
    }

    private Set<TableMetadata> getTablesToSnapshot() {
        LOGGER.info("Present tables: {}", this.schemaHolder.getCdcEnabledTableMetadataSet().stream().map(tableMetadata -> {
            return tableMetadata.describe(true);
        }).collect(Collectors.toList()));
        return (Set) this.schemaHolder.getCdcEnabledTableMetadataSet().stream().filter(tableMetadata2 -> {
            return !this.offsetWriter.isOffsetProcessed(tableName(tableMetadata2), OffsetPosition.defaultOffsetPosition().serialize(), true);
        }).filter(tableMetadata3 -> {
            return !this.startedTableNames.contains(tableName(tableMetadata3));
        }).collect(Collectors.toSet());
    }

    private void takeTableSnapshot(TableMetadata tableMetadata) throws IOException {
        try {
            SimpleStatement consistencyLevel = generateSnapshotStatement(tableMetadata).setConsistencyLevel(DefaultConsistencyLevel.valueOf(this.consistencyLevel.name()));
            LOGGER.info("Executing snapshot query '{}' with consistency level {}", consistencyLevel.getQuery(), consistencyLevel.getConsistencyLevel());
            ResultSet execute = this.cassandraClient.execute(consistencyLevel);
            LOGGER.info("Executed snapshot query for table {}", tableName(tableMetadata));
            processResultSet(tableMetadata, execute);
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw new DebeziumException(String.format("Failed to snapshot table %s in keyspace %s", tableMetadata.getName(), tableMetadata.getKeyspace()), e2);
        }
    }

    private static SimpleStatement generateSnapshotStatement(TableMetadata tableMetadata) {
        List<String> list = (List) tableMetadata.getColumns().values().stream().map(columnMetadata -> {
            return columnMetadata.getName().asInternal();
        }).collect(Collectors.toList());
        Set set = (Set) tableMetadata.getPrimaryKey().stream().map(columnMetadata2 -> {
            return columnMetadata2.getName().asInternal();
        }).collect(Collectors.toSet());
        List list2 = (List) tableMetadata.getColumns().values().stream().filter(columnMetadata3 -> {
            return collectionTypes.contains(Integer.valueOf(columnMetadata3.getType().getProtocolCode()));
        }).map(columnMetadata4 -> {
            return columnMetadata4.getName().asInternal();
        }).collect(Collectors.toList());
        SelectFrom selectFrom = QueryBuilder.selectFrom(tableMetadata.getKeyspace(), tableMetadata.getName());
        if (!$assertionsDisabled && list.isEmpty()) {
            throw new AssertionError();
        }
        Select select = null;
        for (String str : list) {
            select = select == null ? selectFrom.column(str) : select.column(str);
            if (!set.contains(str) && !list2.contains(str)) {
                select = select.ttl(withQuotes(str)).as(ttlAlias(str));
            }
        }
        return select.raw(CASSANDRA_NOW_UNIXTIMESTAMP).as(EXECUTION_TIME_ALIAS).build();
    }

    private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException {
        String tableName = tableName(tableMetadata);
        KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata);
        KeyValueSchema keyValueSchema = this.schemaHolder.getKeyValueSchema(keyspaceTable);
        Schema keySchema = keyValueSchema.keySchema();
        Schema valueSchema = keyValueSchema.valueSchema();
        Set set = (Set) tableMetadata.getPartitionKey().stream().map(columnMetadata -> {
            return columnMetadata.getName().toString();
        }).collect(Collectors.toSet());
        Set set2 = (Set) tableMetadata.getClusteringColumns().keySet().stream().map(columnMetadata2 -> {
            return columnMetadata2.getName().toString();
        }).collect(Collectors.toSet());
        Iterator it = resultSet.iterator();
        long j = 0;
        if (!it.hasNext()) {
            try {
                this.offsetWriter.markOffset(tableName, OffsetPosition.defaultOffsetPosition().serialize(), true).get();
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("error in marking snapshot offset of table {}", tableMetadata.getName(), e);
            }
            try {
                this.offsetWriter.flush().get();
            } catch (InterruptedException | ExecutionException e2) {
                LOGGER.error("error in flushing snapshot offsets to disk of table {}", tableMetadata.getName(), e2);
            }
        }
        while (it.hasNext()) {
            if (!isRunning()) {
                LOGGER.warn("Terminated snapshot processing while table {} is in progress", tableName);
                this.metrics.setRowsScanned(tableName, Long.valueOf(j));
                return;
            }
            Row row = (Row) it.next();
            Object readExecutionTime = readExecutionTime(row);
            CassandraSchemaFactory.RowData extractRowData = extractRowData(row, tableMetadata.getColumns().values(), set, set2, readExecutionTime);
            boolean z = !it.hasNext();
            RecordMaker recordMaker = this.recordMaker;
            String str = this.clusterName;
            OffsetPosition defaultOffsetPosition = OffsetPosition.defaultOffsetPosition();
            Instant instantFromMicros = Conversions.toInstantFromMicros(TimeUnit.MICROSECONDS.convert(((Long) readExecutionTime).longValue(), TimeUnit.MILLISECONDS));
            ChangeEventQueue<Event> changeEventQueue = this.queues.get(Math.abs(tableName.hashCode() % this.queues.size()));
            Objects.requireNonNull(changeEventQueue);
            recordMaker.insert(str, defaultOffsetPosition, keyspaceTable, true, instantFromMicros, extractRowData, keySchema, valueSchema, z, (v1) -> {
                r10.enqueue(v1);
            });
            j++;
            if (j % 10000 == 0) {
                LOGGER.debug("Queued {} snapshot records from table {}", Long.valueOf(j), tableName);
                this.metrics.setRowsScanned(tableName, Long.valueOf(j));
            }
        }
        this.metrics.setRowsScanned(tableName, Long.valueOf(j));
    }

    private static CassandraSchemaFactory.RowData extractRowData(Row row, Collection<ColumnMetadata> collection, Set<String> set, Set<String> set2, Object obj) {
        Object readColTtl;
        CassandraSchemaFactory.RowData rowData = schemaFactory.rowData();
        for (ColumnMetadata columnMetadata : collection) {
            String asInternal = columnMetadata.getName().asInternal();
            Object readCol = readCol(row, asInternal, columnMetadata);
            Long l = null;
            CassandraSchemaFactory.CellData.ColumnType type = getType(asInternal, set, set2);
            if (type == CassandraSchemaFactory.CellData.ColumnType.REGULAR && readCol != null && !collectionTypes.contains(Integer.valueOf(columnMetadata.getType().getProtocolCode())) && (readColTtl = readColTtl(row, asInternal)) != null && obj != null) {
                l = Long.valueOf(calculateDeletionTs(obj, readColTtl));
            }
            rowData.addCell(schemaFactory.cellData(asInternal, readCol, l, type));
        }
        return rowData;
    }

    private static CassandraSchemaFactory.CellData.ColumnType getType(String str, Set<String> set, Set<String> set2) {
        return set.contains(str) ? CassandraSchemaFactory.CellData.ColumnType.PARTITION : set2.contains(str) ? CassandraSchemaFactory.CellData.ColumnType.CLUSTERING : CassandraSchemaFactory.CellData.ColumnType.REGULAR;
    }

    private static Object readExecutionTime(Row row) {
        return CassandraTypeDeserializer.deserialize(DataTypes.BIGINT, row.getBytesUnsafe(EXECUTION_TIME_ALIAS));
    }

    private static Object readCol(Row row, String str, ColumnMetadata columnMetadata) {
        return CassandraTypeDeserializer.deserialize(columnMetadata.getType(), row.getBytesUnsafe(str));
    }

    private static Object readColTtl(Row row, String str) {
        if (row.getColumnDefinitions().contains(CqlIdentifier.fromInternal(ttlAlias(str)))) {
            return CassandraTypeDeserializer.deserialize(DataTypes.COUNTER, row.getBytesUnsafe(ttlAlias(str)));
        }
        return null;
    }

    private static long calculateDeletionTs(Object obj, Object obj2) {
        return TimeUnit.MICROSECONDS.convert(((Long) obj).longValue(), TimeUnit.MILLISECONDS) + TimeUnit.MICROSECONDS.convert(((Integer) obj2).intValue(), TimeUnit.SECONDS);
    }

    private static String ttlAlias(String str) {
        return str + "_ttl";
    }

    private static String withQuotes(String str) {
        return "\"" + str + "\"";
    }

    private static String tableName(TableMetadata tableMetadata) {
        return String.valueOf(tableMetadata.getKeyspace()) + "." + String.valueOf(tableMetadata.getName());
    }

    static {
        $assertionsDisabled = !SnapshotProcessor.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(SnapshotProcessor.class);
        collectionTypes = Collect.unmodifiableSet(new Integer[]{32, 34, 33});
        schemaFactory = CassandraSchemaFactory.get();
    }
}
