/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.filesystem.snapshot.writer;

import io.datarouter.filesystem.snapshot.block.root.RootBlock;
import io.datarouter.filesystem.snapshot.block.root.RootBlockV1;
import io.datarouter.filesystem.snapshot.compress.CompressedBlock;
import io.datarouter.filesystem.snapshot.encode.BranchBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.EncodedBlock;
import io.datarouter.filesystem.snapshot.encode.LeafBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.RootBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.RootBlockFields;
import io.datarouter.filesystem.snapshot.encode.ValueBlockEncoder;
import io.datarouter.filesystem.snapshot.key.SnapshotKey;
import io.datarouter.filesystem.snapshot.path.SnapshotPaths;
import io.datarouter.filesystem.snapshot.storage.block.CacheBlockKey;
import io.datarouter.filesystem.snapshot.storage.block.SnapshotBlockStorage;
import io.datarouter.filesystem.snapshot.storage.file.SnapshotFileStorage;
import io.datarouter.filesystem.snapshot.writer.BlockQueue;
import io.datarouter.filesystem.snapshot.writer.SnapshotFileWriter;
import io.datarouter.filesystem.snapshot.writer.SnapshotWriterConfig;
import io.datarouter.filesystem.snapshot.writer.SnapshotWriterTracker;
import io.datarouter.scanner.Scanner;
import io.datarouter.util.Count;
import io.datarouter.util.Require;
import io.datarouter.util.concurrent.FutureTool;
import io.datarouter.util.concurrent.ThreadTool;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotBlockWriter {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotBlockWriter.class);
    private final SnapshotKey snapshotKey;
    private final SnapshotWriterTracker tracker;
    private final SnapshotBlockStorage blockStorage;
    private final SnapshotFileStorage fileStorage;
    private final SnapshotWriterConfig config;
    public final SnapshotFileWriter fileWriter;
    private final ExecutorService exec;
    private final SnapshotPaths paths;
    private final long maxTasks;
    private final int stallMs;
    private final Queue<LeafBlockEncoder> pendingLeafEncoders;
    private final Map<Integer, Queue<BranchBlockEncoder>> pendingBranchEncodersByLevel;
    private final Map<Integer, Map<Integer, Future<?>>> branchFutureByBlockIdByLevel;
    private final Map<Integer, Future<?>> leafFutureByBlockId;
    private final Map<Integer, Map<Integer, Future<?>>> valueFutureByBlockIdByColumn;

    public SnapshotBlockWriter(SnapshotKey snapshotKey, SnapshotWriterTracker tracker, SnapshotBlockStorage blockStorage, SnapshotFileStorage fileStorage, SnapshotWriterConfig config, ExecutorService exec) {
        this.snapshotKey = snapshotKey;
        this.tracker = tracker;
        this.blockStorage = blockStorage;
        this.fileStorage = fileStorage;
        this.config = config;
        this.fileWriter = new SnapshotFileWriter(tracker, fileStorage, config, this::onValueFileWrite, this::onLeafFileWrite, this::onBranchFileWrite);
        this.exec = exec;
        this.paths = config.pathsSupplier.get();
        this.maxTasks = config.numThreads * 100;
        this.stallMs = 1;
        this.pendingLeafEncoders = new LinkedBlockingQueue<LeafBlockEncoder>();
        this.pendingBranchEncodersByLevel = new ConcurrentHashMap<Integer, Queue<BranchBlockEncoder>>();
        this.branchFutureByBlockIdByLevel = new ConcurrentHashMap();
        this.leafFutureByBlockId = new ConcurrentHashMap();
        this.valueFutureByBlockIdByColumn = new ConcurrentHashMap();
    }

    public RootBlock flushRootBlock(long writeStartTimeMs, List<Integer> numBranchBlocksByLevel, List<Integer> numValueBlocksByColumn, int numBranchLevels, long numKeys, int numLeafBlocks) {
        int[] numBranchBlocksByLevelArray = numBranchBlocksByLevel.stream().mapToInt(Integer::intValue).toArray();
        int[] numValueBlocksByColumnArray = numValueBlocksByColumn.stream().mapToInt(Integer::intValue).toArray();
        long writeDurationMs = System.currentTimeMillis() - writeStartTimeMs;
        RootBlockEncoder encoder = this.config.rootBlockEncoderSupplier.get();
        RootBlockFields rootBlockFields = new RootBlockFields(this.config.sorted, this.config.pathsSupplier.get(), new RootBlockFields.RootBlockEncoderFormats(this.config.branchBlockEncoderFactory.apply(0).format(), this.config.leafBlockEncoderSupplier.get().format(), this.config.valueBlockEncoderSupplier.get().format()), new RootBlockFields.RootBlockEncoderCompressors(this.config.branchBlockCompressor.name(), this.config.leafBlockCompressor.name(), this.config.valueBlockCompressor.name()), new RootBlockFields.RootBlockEncoderBytesPerFile(this.config.branchBytesPerFile, this.config.leafBytesPerFile, this.config.valueBytesPerFile), new RootBlockFields.RootBlockEncoderBlocksPerFile(this.config.branchBlocksPerFile, this.config.leafBlocksPerFile, this.config.valueBlocksPerFile), numKeys, numBranchLevels, new RootBlockFields.RootBlockEncoderBlockCounts(numBranchBlocksByLevelArray, numLeafBlocks, numValueBlocksByColumnArray), new RootBlockFields.RootBlockEncoderByteCountsEncoded(this.tracker.branchBytesEncoded.value(), this.tracker.leafBytesEncoded.value(), this.tracker.valueBytesEncoded.value()), new RootBlockFields.RootBlockEncoderByteCountsCompressed(this.tracker.branchBytesCompressed.value(), this.tracker.leafBytesCompressed.value(), this.tracker.valueBytesCompressed.value()), new RootBlockFields.RootBlockEncoderBlockEndings(this.fileWriter.rootBranchEnding()), new RootBlockFields.RootBlockEncoderTimings(writeStartTimeMs, writeDurationMs));
        encoder.set(rootBlockFields);
        EncodedBlock encodedRootBlock = encoder.encode();
        if (this.config.persist) {
            this.fileStorage.addRootFile(encodedRootBlock);
        }
        return new RootBlockV1(encodedRootBlock.concat());
    }

    public void submitBranch(BranchBlockEncoder encoder) {
        this.pendingBranchEncodersByLevel.computeIfAbsent(encoder.level(), $ -> new LinkedBlockingQueue()).add(encoder);
    }

    private boolean isBranchFlushable(BranchBlockEncoder encoder) {
        int numBlockIds;
        boolean includePreviousChildBlockId = encoder.firstChildBlockId() > 0;
        int firstBlockId = includePreviousChildBlockId ? encoder.firstChildBlockId() - 1 : encoder.firstChildBlockId();
        int n = numBlockIds = includePreviousChildBlockId ? 1 + encoder.numRecords() : encoder.numRecords();
        if (encoder.level() == 0) {
            return this.fileWriter.leafFileInfoReady(firstBlockId, numBlockIds);
        }
        int childLevel = encoder.level() - 1;
        return this.fileWriter.branchFileInfoReady(childLevel, firstBlockId, numBlockIds);
    }

    private void flushBranch(BranchBlockEncoder encoder) {
        this.tracker.branchTasks.increment();
        Map futureByBlockId = this.branchFutureByBlockIdByLevel.computeIfAbsent(encoder.level(), ConcurrentHashMap::new);
        Future<?> future = this.exec.submit(() -> {
            BlockQueue.FileIdsAndEndings fileIdsAndEndings;
            int firstChildBlockId = encoder.firstChildBlockId();
            int numRecords = encoder.numRecords();
            int numEndings = numRecords + 1;
            if (encoder.level() == 0) {
                fileIdsAndEndings = this.fileWriter.leafFileInfo(firstChildBlockId - 1, numEndings);
            } else {
                int childLevel = encoder.level() - 1;
                fileIdsAndEndings = this.fileWriter.branchFileInfo(childLevel, firstChildBlockId - 1, numEndings);
            }
            EncodedBlock encodedPages = encoder.encode(fileIdsAndEndings);
            CompressedBlock compressedBlock = this.config.branchBlockCompressor.compress(encodedPages, this.config.compressorConcatChunks);
            this.tracker.branchBlock(encodedPages, compressedBlock);
            this.fileWriter.addBranchBlock(encoder.level(), encoder.blockId(), compressedBlock);
            if (this.blockStorage != null && this.config.updateCache) {
                CacheBlockKey cacheBlockKey = CacheBlockKey.branch(this.snapshotKey, encoder.level(), encoder.blockId());
                this.blockStorage.addBranchBlock(this.paths, cacheBlockKey, compressedBlock);
            }
            futureByBlockId.remove(encoder.blockId());
            this.tracker.branchTasks.decrement();
        });
        futureByBlockId.put(encoder.blockId(), future);
    }

    public void submitLeaf(LeafBlockEncoder encoder) {
        if (this.config.numColumns == 0) {
            this.flushLeaf(encoder);
        } else {
            this.pendingLeafEncoders.add(encoder);
        }
    }

    private boolean isLeafFlushable(LeafBlockEncoder encoder) {
        int column = 0;
        while (column < this.config.numColumns) {
            if (!this.fileWriter.valueFileInfoReady(column, encoder.firstValueBlockId(column), encoder.numValueBlocks(column))) {
                return false;
            }
            ++column;
        }
        return true;
    }

    private void flushLeaf(LeafBlockEncoder encoder) {
        this.leafBackpressure();
        this.tracker.leafTasks.increment();
        Future<?> future = this.exec.submit(() -> {
            if (this.config.sorted) {
                encoder.assertKeysSorted();
            }
            BlockQueue.FileIdsAndEndings[] fileIdsAndEndings = new BlockQueue.FileIdsAndEndings[this.config.numColumns];
            int column = 0;
            while (column < this.config.numColumns) {
                int firstValueBlockId = encoder.firstValueBlockId(column);
                int numValueBlocks = encoder.numValueBlocks(column);
                int numEndings = numValueBlocks + 1;
                fileIdsAndEndings[column] = this.fileWriter.valueFileInfo(column, firstValueBlockId - 1, numEndings);
                ++column;
            }
            EncodedBlock encodedPages = encoder.encode(fileIdsAndEndings);
            CompressedBlock compressedBytes = this.config.leafBlockCompressor.compress(encodedPages, this.config.compressorConcatChunks);
            this.tracker.leafBlock(encodedPages, compressedBytes);
            this.fileWriter.addLeafBlock(encoder.blockId(), compressedBytes);
            if (this.blockStorage != null && this.config.updateCache) {
                CacheBlockKey cacheBlockKey = CacheBlockKey.leaf(this.snapshotKey, encoder.blockId());
                this.blockStorage.addLeafBlock(this.paths, cacheBlockKey, compressedBytes);
            }
            this.leafFutureByBlockId.remove(encoder.blockId());
            this.tracker.leafTasks.decrement();
        });
        this.leafFutureByBlockId.put(encoder.blockId(), future);
    }

    private void leafBackpressure() {
        if (this.config.numColumns > 0) {
            return;
        }
        long beforeNs = System.nanoTime();
        while (this.tracker.leafTasks.value() >= this.maxTasks) {
            ThreadTool.trySleep((long)this.stallMs);
        }
        long ns = System.nanoTime() - beforeNs;
        this.tracker.leafStallNs.incrementBy(ns);
    }

    public void submitValueBlock(int column, int blockId, ValueBlockEncoder encoder) {
        this.flushValueBlock(column, blockId, encoder);
    }

    private void flushValueBlock(int column, int blockId, ValueBlockEncoder encoder) {
        this.valueBackpressure();
        this.tracker.valueTasks.increment();
        Map futureByBlockId = this.valueFutureByBlockIdByColumn.computeIfAbsent(column, ConcurrentHashMap::new);
        Future<?> future = this.exec.submit(() -> {
            EncodedBlock encodedPages = encoder.encode();
            CompressedBlock compressedBytes = this.config.valueBlockCompressor.compress(encodedPages, this.config.compressorConcatChunks);
            this.tracker.valueBlock(encodedPages, compressedBytes);
            this.fileWriter.addValueBlock(column, blockId, compressedBytes);
            if (this.blockStorage != null && this.config.updateCache) {
                CacheBlockKey cacheBlockKey = CacheBlockKey.value(this.snapshotKey, column, blockId);
                this.blockStorage.addValueBlock(this.paths, cacheBlockKey, compressedBytes);
            }
            futureByBlockId.remove(blockId);
            this.tracker.valueTasks.decrement();
        });
        futureByBlockId.put(blockId, future);
    }

    private void valueBackpressure() {
        long beforeNs = System.nanoTime();
        while (this.tracker.valueTasks.value() >= this.maxTasks) {
            ThreadTool.trySleep((long)this.stallMs);
        }
        long ns = System.nanoTime() - beforeNs;
        this.tracker.valueStallNs.incrementBy(ns);
    }

    public synchronized void onValueFileWrite(Void unused) {
        Scanner.of(this.pendingLeafEncoders).include(this::isLeafFlushable).each($ -> {
            LeafBlockEncoder leafBlockEncoder = this.pendingLeafEncoders.remove();
        }).forEach(this::flushLeaf);
    }

    public void onLeafFileWrite(Void unused) {
        this.tryFlushBranches(0);
    }

    public void onBranchFileWrite(int levelWritten) {
        this.tryFlushBranches(levelWritten + 1);
    }

    private synchronized void tryFlushBranches(int level) {
        Queue<BranchBlockEncoder> pendingBranchEncodersForLevel = this.pendingBranchEncodersByLevel.get(level);
        if (pendingBranchEncodersForLevel == null) {
            return;
        }
        Scanner.of(pendingBranchEncodersForLevel).include(this::isBranchFlushable).each(encoder -> {
            Object e = pendingBranchEncodersForLevel.remove();
        }).forEach(this::flushBranch);
    }

    public void complete() {
        Scanner.of(this.valueFutureByBlockIdByColumn.keySet()).sort().forEach(column -> this.drainFutures(this.valueFutureByBlockIdByColumn.get(column).values(), "value column " + column));
        this.fileWriter.completeValues();
        Scanner.of(this.pendingLeafEncoders).each(leafEncoder -> Require.isTrue((boolean)this.isLeafFlushable((LeafBlockEncoder)leafEncoder))).sort(LeafBlockEncoder.BLOCK_ID_COMPARATOR).each(this.pendingLeafEncoders::remove).forEach(this::flushLeaf);
        this.drainFutures(this.leafFutureByBlockId.values(), "leaf");
        this.fileWriter.completeLeaves();
        Scanner.of(this.pendingBranchEncodersByLevel.keySet()).sort().forEach(level -> {
            Queue<BranchBlockEncoder> encodersForLevel = this.pendingBranchEncodersByLevel.get(level);
            Scanner.of(encodersForLevel).exclude(BranchBlockEncoder::isEmpty).each(encoder -> Require.isTrue((boolean)this.isBranchFlushable((BranchBlockEncoder)encoder))).forEach(this::flushBranch);
            this.drainFutures(this.branchFutureByBlockIdByLevel.get(level).values(), "branch level " + level);
            this.fileWriter.completeBranches((int)level);
        });
        this.fileWriter.logQueueStats();
    }

    private void drainFutures(Iterable<Future<?>> futures, String stageName) {
        Count.Counts counts = new Count.Counts();
        Count pending = counts.add("pending");
        Count done = counts.add("done");
        Count canceled = counts.add("canceled");
        Count waited = counts.add("waited");
        Scanner.of(futures).each(future -> {
            pending.increment();
            if (future.isDone()) {
                done.increment();
            }
            if (future.isCancelled()) {
                canceled.increment();
            }
            if (!future.isDone() && !future.isCancelled()) {
                waited.increment();
            }
        }).each(FutureTool::get).count();
        logger.info("drained {} {}", (Object)stageName, (Object)counts);
    }
}

