package io.delta.kernel.internal.snapshot;

import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.TableFeatures;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.checkpoints.CheckpointMetaData;
import io.delta.kernel.internal.checkpoints.Checkpointer;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.replay.LogReplayUtils;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Preconditions;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/kernel/internal/snapshot/SnapshotManager.class */
public class SnapshotManager {
    private AtomicReference<SnapshotHint> latestSnapshotHint = new AtomicReference<>();
    private final Path logPath;
    private final Path tablePath;
    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

    public SnapshotManager(Path path, Path path2) {
        this.logPath = path;
        this.tablePath = path2;
    }

    public static void verifyDeltaVersions(List<Long> list, Optional<Long> optional, Optional<Long> optional2) {
        if (!list.isEmpty() && !((List) LongStream.rangeClosed(list.get(0).longValue(), list.get(list.size() - 1).longValue()).boxed().collect(Collectors.toList())).equals(list)) {
            throw new IllegalStateException(String.format("Versions (%s) are not continuous", list));
        }
        optional.ifPresent(l -> {
            Preconditions.checkArgument(!list.isEmpty() && Objects.equals(list.get(0), l), String.format("Did not get the first delta file version %s to compute Snapshot", l));
        });
        optional2.ifPresent(l2 -> {
            Preconditions.checkArgument(!list.isEmpty() && Objects.equals(list.get(list.size() - 1), l2), String.format("Did not get the last delta file version %s to compute Snapshot", l2));
        });
    }

    public Snapshot buildLatestSnapshot(Engine engine) throws TableNotFoundException {
        return getSnapshotAtInit(engine);
    }

    public Snapshot getSnapshotAt(Engine engine, long j) throws TableNotFoundException {
        return (Snapshot) getLogSegmentForVersion(engine, Optional.empty(), Optional.of(Long.valueOf(j))).map(logSegment -> {
            return createSnapshot(logSegment, engine);
        }).orElseThrow(() -> {
            return new TableNotFoundException(this.tablePath.toString());
        });
    }

    public Snapshot getSnapshotForTimestamp(Engine engine, long j) throws TableNotFoundException {
        long currentTimeMillis = System.currentTimeMillis();
        long activeCommitAtTimestamp = DeltaHistoryManager.getActiveCommitAtTimestamp(engine, this.logPath, j);
        logger.info("{}: Took {}ms to fetch version at timestamp {}", new Object[]{this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(j)});
        return getSnapshotAt(engine, activeCommitAtTimestamp);
    }

    public void checkpoint(Engine engine, long j) throws TableNotFoundException, IOException {
        logger.info("{}: Starting checkpoint for version: {}", this.tablePath, Long.valueOf(j));
        SnapshotImpl snapshotImpl = (SnapshotImpl) getSnapshotAt(engine, j);
        TableFeatures.validateWriteSupportedTable(snapshotImpl.getProtocol(), snapshotImpl.getMetadata(), snapshotImpl.getSchema(engine), this.tablePath.toString());
        Path checkpointFileSingular = FileNames.checkpointFileSingular(this.logPath, j);
        try {
            CreateCheckpointIterator createCheckpointIterator = snapshotImpl.getCreateCheckpointIterator(engine);
            try {
                engine.getParquetHandler().writeParquetFileAtomically(checkpointFileSingular.toString(), createCheckpointIterator);
                logger.info("{}: Checkpoint file is written for version: {}", this.tablePath, Long.valueOf(j));
                long numberOfAddActions = createCheckpointIterator.getNumberOfAddActions();
                if (createCheckpointIterator != null) {
                    createCheckpointIterator.close();
                }
                new Checkpointer(this.logPath).writeLastCheckpointFile(engine, new CheckpointMetaData(j, numberOfAddActions, Optional.empty()));
                logger.info("{}: Last checkpoint metadata file is written for version: {}", this.tablePath, Long.valueOf(j));
                logger.info("{}: Finished checkpoint for version: {}", this.tablePath, Long.valueOf(j));
            } finally {
            }
        } catch (FileAlreadyExistsException e) {
            throw new CheckpointAlreadyExistsException(j);
        }
    }

    private void registerHint(SnapshotHint snapshotHint) {
        this.latestSnapshotHint.updateAndGet(snapshotHint2 -> {
            if (snapshotHint2 != null && snapshotHint.getVersion() <= snapshotHint2.getVersion()) {
                return snapshotHint2;
            }
            return snapshotHint;
        });
    }

    private CloseableIterator<FileStatus> listFrom(Engine engine, long j) throws IOException {
        logger.debug("{}: startVersion: {}", this.tablePath, Long.valueOf(j));
        return engine.getFileSystemClient().listFrom(FileNames.listingPrefix(this.logPath, j));
    }

    private boolean isDeltaCommitOrCheckpointFile(String str) {
        return FileNames.isCheckpointFile(str) || FileNames.isCommitFile(str);
    }

    private Optional<CloseableIterator<FileStatus>> listFromOrNone(Engine engine, long j) {
        try {
            CloseableIterator<FileStatus> listFrom = listFrom(engine, j);
            return listFrom.hasNext() ? Optional.of(listFrom) : Optional.empty();
        } catch (FileNotFoundException e) {
            return Optional.empty();
        } catch (IOException e2) {
            throw new UncheckedIOException("Failed to list the files in delta log", e2);
        }
    }

    protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(Engine engine, long j, Optional<Long> optional) {
        optional.ifPresent(l -> {
            Preconditions.checkArgument(l.longValue() >= j, String.format("versionToLoad=%s provided is less than startVersion=%s", l, Long.valueOf(j)));
        });
        logger.debug("startVersion: {}, versionToLoad: {}", Long.valueOf(j), optional);
        return listFromOrNone(engine, j).map(closeableIterator -> {
            ArrayList arrayList = new ArrayList();
            while (true) {
                if (!closeableIterator.hasNext()) {
                    break;
                }
                FileStatus fileStatus = (FileStatus) closeableIterator.next();
                if (isDeltaCommitOrCheckpointFile(Path.getName(fileStatus.getPath())) && (!FileNames.isCheckpointFile(Path.getName(fileStatus.getPath())) || fileStatus.getSize() != 0)) {
                    if (((Boolean) optional.map(l2 -> {
                        return Boolean.valueOf(FileNames.getFileVersion(new Path(fileStatus.getPath())) <= l2.longValue());
                    }).orElse(true)).booleanValue()) {
                        arrayList.add(fileStatus);
                    } else if (arrayList.isEmpty()) {
                        throw DeltaErrors.versionBeforeFirstAvailableCommit(this.tablePath.toString(), ((Long) optional.get()).longValue(), DeltaHistoryManager.getEarliestRecreatableCommit(engine, this.logPath));
                    }
                }
            }
            return arrayList;
        });
    }

    private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundException {
        Optional<CheckpointMetaData> readLastCheckpointFile = new Checkpointer(this.logPath).readLastCheckpointFile(engine);
        if (!readLastCheckpointFile.isPresent()) {
            logger.warn("{}: Last checkpoint file is missing or corrupted. Will search for the checkpoint files directly.", this.tablePath);
        }
        return (SnapshotImpl) getLogSegmentFrom(engine, readLastCheckpointFile).map(logSegment -> {
            return createSnapshot(logSegment, engine);
        }).orElseThrow(() -> {
            return new TableNotFoundException(this.tablePath.toString());
        });
    }

    private SnapshotImpl createSnapshot(LogSegment logSegment, Engine engine) {
        String str = (String) logSegment.checkpointVersionOpt.map(l -> {
            return String.format("starting from checkpoint version %s.", l);
        }).orElse(Path.CUR_DIR);
        logger.info("{}: Loading version {} {}", new Object[]{this.tablePath, Long.valueOf(logSegment.version), str});
        LogReplay logReplay = new LogReplay(this.logPath, this.tablePath, logSegment.version, engine, logSegment, Optional.ofNullable(this.latestSnapshotHint.get()));
        long currentTimeMillis = System.currentTimeMillis();
        LogReplayUtils.assertLogFilesBelongToTable(this.logPath, logSegment.allLogFilesUnsorted());
        SnapshotImpl snapshotImpl = new SnapshotImpl(this.tablePath, logSegment, logReplay, logReplay.getProtocol(), logReplay.getMetadata());
        logger.info("{}: Took {}ms to construct the snapshot (loading protocol and metadata) for {} {}", new Object[]{this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(logSegment.version), str});
        registerHint(new SnapshotHint(snapshotImpl.getVersion(engine), snapshotImpl.getProtocol(), snapshotImpl.getMetadata()));
        return snapshotImpl;
    }

    private Optional<LogSegment> getLogSegmentFrom(Engine engine, Optional<CheckpointMetaData> optional) {
        return getLogSegmentForVersion(engine, optional.map(checkpointMetaData -> {
            return Long.valueOf(checkpointMetaData.version);
        }), Optional.empty());
    }

    public Optional<LogSegment> getLogSegmentForVersion(Engine engine, Optional<Long> optional, Optional<Long> optional2) {
        Optional<Long> filter = optional.filter(l -> {
            return !optional2.isPresent() || l.longValue() <= ((Long) optional2.get()).longValue();
        });
        if (!filter.isPresent() && optional2.isPresent()) {
            long longValue = optional2.get().longValue() + 1;
            long currentTimeMillis = System.currentTimeMillis();
            filter = Checkpointer.findLastCompleteCheckpointBefore(engine, this.logPath, longValue).map(checkpointInstance -> {
                return Long.valueOf(checkpointInstance.version);
            });
            logger.info("{}: Took {}ms to load last checkpoint before version {}", new Object[]{this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(longValue)});
        }
        long longValue2 = filter.orElseGet(() -> {
            logger.warn("{}: Starting checkpoint is missing. Listing from version as 0", this.tablePath);
            return 0L;
        }).longValue();
        long currentTimeMillis2 = System.currentTimeMillis();
        Optional<List<FileStatus>> listDeltaAndCheckpointFiles = listDeltaAndCheckpointFiles(engine, longValue2, optional2);
        logger.info("{}: Took {}ms to list the files after starting checkpoint", this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
        long currentTimeMillis3 = System.currentTimeMillis();
        try {
            Optional<LogSegment> logSegmentForVersion = getLogSegmentForVersion(engine, filter, optional2, listDeltaAndCheckpointFiles);
            logger.info("{}: Took {}ms to construct a log segment", this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis3));
            return logSegmentForVersion;
        } catch (Throwable th) {
            logger.info("{}: Took {}ms to construct a log segment", this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis3));
            throw th;
        }
    }

    protected Optional<LogSegment> getLogSegmentForVersion(Engine engine, Optional<Long> optional, Optional<Long> optional2, Optional<List<FileStatus>> optional3) {
        List<FileStatus> emptyList;
        if (optional3.isPresent()) {
            emptyList = optional3.get();
        } else {
            if (!optional.isPresent()) {
                return Optional.empty();
            }
            emptyList = Collections.emptyList();
        }
        List<FileStatus> list = emptyList;
        logDebug(() -> {
            return String.format("newFiles: %s", Arrays.toString(list.stream().map(fileStatus -> {
                return new Path(fileStatus.getPath()).getName();
            }).toArray()));
        });
        if (emptyList.isEmpty() && !optional.isPresent()) {
            throw new RuntimeException(String.format("No delta files found in the directory: %s", this.logPath));
        }
        if (emptyList.isEmpty()) {
            return getLogSegmentForVersion(engine, Optional.empty(), optional2);
        }
        Tuple2 partition = ListUtils.partition(emptyList, fileStatus -> {
            return FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName());
        });
        List list2 = (List) partition._1;
        List list3 = (List) partition._2;
        logDebug(() -> {
            return String.format("\ncheckpoints: %s\ndeltas: %s", Arrays.toString(list2.stream().map(fileStatus2 -> {
                return new Path(fileStatus2.getPath()).getName();
            }).toArray()), Arrays.toString(list3.stream().map(fileStatus3 -> {
                return new Path(fileStatus3.getPath()).getName();
            }).toArray()));
        });
        CheckpointInstance checkpointInstance = (CheckpointInstance) optional2.map((v1) -> {
            return new CheckpointInstance(v1);
        }).orElse(CheckpointInstance.MAX_VALUE);
        logger.debug("lastCheckpoint: {}", checkpointInstance);
        List list4 = (List) list2.stream().map(fileStatus2 -> {
            return new CheckpointInstance(fileStatus2.getPath());
        }).collect(Collectors.toList());
        logDebug(() -> {
            return String.format("checkpointFiles: %s", Arrays.toString(list4.toArray()));
        });
        Optional<CheckpointInstance> latestCompleteCheckpointFromList = Checkpointer.getLatestCompleteCheckpointFromList(list4, checkpointInstance);
        logger.debug("newCheckpointOpt: {}", latestCompleteCheckpointFromList);
        long longValue = ((Long) latestCompleteCheckpointFromList.map(checkpointInstance2 -> {
            return Long.valueOf(checkpointInstance2.version);
        }).orElseGet(() -> {
            optional.map(l -> {
                return getLogSegmentWithMaxExclusiveCheckpointVersion(((Long) optional2.orElseGet(() -> {
                    return Long.valueOf(FileNames.deltaVersion(new Path(((FileStatus) list3.get(list3.size() - 1)).getPath())));
                })).longValue(), l.longValue()).orElseThrow(() -> {
                    return new RuntimeException(String.format("Checkpoint file to load version: %s is missing.", l));
                });
            });
            return -1L;
        })).longValue();
        logger.debug("newCheckpointVersion: {}", Long.valueOf(longValue));
        List list5 = (List) list3.stream().filter(fileStatus3 -> {
            return FileNames.deltaVersion(new Path(fileStatus3.getPath())) > longValue;
        }).collect(Collectors.toList());
        logDebug(() -> {
            return String.format("deltasAfterCheckpoint: %s", Arrays.toString(list5.stream().map(fileStatus4 -> {
                return new Path(fileStatus4.getPath()).getName();
            }).toArray()));
        });
        LinkedList linkedList = (LinkedList) list5.stream().map(fileStatus4 -> {
            return Long.valueOf(FileNames.deltaVersion(new Path(fileStatus4.getPath())));
        }).collect(Collectors.toCollection(LinkedList::new));
        logDebug(() -> {
            return String.format("deltaVersions: %s", Arrays.toString(linkedList.toArray()));
        });
        long longValue2 = linkedList.isEmpty() ? latestCompleteCheckpointFromList.get().version : ((Long) linkedList.getLast()).longValue();
        if (list3.isEmpty()) {
            throw new IllegalStateException(String.format("Could not find any delta files for version %s", Long.valueOf(longValue2)));
        }
        optional2.filter(l -> {
            return l.longValue() != longValue2;
        }).ifPresent(l2 -> {
            throw DeltaErrors.versionAfterLatestCommit(this.tablePath.toString(), l2.longValue(), longValue2);
        });
        if (!linkedList.isEmpty()) {
            if (((Long) linkedList.getFirst()).longValue() != longValue + 1) {
                throw new RuntimeException(String.format("Log file not found.\nExpected: %s\nFound: %s", FileNames.deltaFile(this.logPath, longValue + 1), FileNames.deltaFile(this.logPath, ((Long) linkedList.get(0)).longValue())));
            }
            verifyDeltaVersions(linkedList, Optional.of(Long.valueOf(longValue + 1)), optional2);
        }
        return Optional.of(new LogSegment(this.logPath, longValue2, list5, (List) latestCompleteCheckpointFromList.map(checkpointInstance3 -> {
            HashSet hashSet = new HashSet(checkpointInstance3.getCorrespondingFiles(this.logPath));
            List list6 = (List) list2.stream().filter(fileStatus5 -> {
                return hashSet.contains(new Path(fileStatus5.getPath()));
            }).collect(Collectors.toList());
            if (list6.size() != hashSet.size()) {
                throw new IllegalStateException(String.format("Seems like the checkpoint is corrupted. Failed in getting the file information for:\n%s\namong\n%s", hashSet.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList()), list2.stream().map((v0) -> {
                    return v0.getPath();
                }).collect(Collectors.joining("\n - "))));
            }
            return list6;
        }).orElse(Collections.emptyList()), latestCompleteCheckpointFromList.map(checkpointInstance4 -> {
            return Long.valueOf(checkpointInstance4.version);
        }), ((FileStatus) list3.get(list3.size() - 1)).getModificationTime()));
    }

    private Optional<LogSegment> getLogSegmentWithMaxExclusiveCheckpointVersion(long j, long j2) {
        return Optional.empty();
    }

    private void logDebug(Supplier<String> supplier) {
        if (logger.isDebugEnabled()) {
            logger.debug(supplier.get());
        }
    }
}
