/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.filesystem.snapshot.writer;

import io.datarouter.bytes.EmptyArray;
import io.datarouter.bytes.codec.bytestringcodec.CsvIntByteStringCodec;
import io.datarouter.filesystem.snapshot.block.root.RootBlock;
import io.datarouter.filesystem.snapshot.encode.BranchBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.LeafBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.ValueBlockEncoder;
import io.datarouter.filesystem.snapshot.entry.SnapshotEntry;
import io.datarouter.filesystem.snapshot.key.SnapshotKey;
import io.datarouter.filesystem.snapshot.storage.block.SnapshotBlockStorage;
import io.datarouter.filesystem.snapshot.storage.file.SnapshotFileStorage;
import io.datarouter.filesystem.snapshot.writer.SnapshotBlockWriter;
import io.datarouter.filesystem.snapshot.writer.SnapshotWriterConfig;
import io.datarouter.filesystem.snapshot.writer.SnapshotWriterTracker;
import io.datarouter.scanner.Scanner;
import io.datarouter.util.concurrent.CountDownLatchTool;
import io.datarouter.util.concurrent.LinkedBlockingDequeTool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotWriter
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotWriter.class);
    private final SnapshotKey snapshotKey;
    private final SnapshotWriterTracker tracker;
    private final SnapshotWriterConfig config;
    private final SnapshotBlockWriter blockWriter;
    private long lastStatusLogMs;
    private final Thread writerThread;
    private final long startTimeMs;
    private final LinkedBlockingDeque<Message> messages;
    private final CountDownLatch writerThreadCompletionLatch;
    private final List<BranchBlockEncoder> branchBlockEncoders;
    private final List<Integer> numBranchBlocksByLevel;
    private LeafBlockEncoder leafBlockEncoder;
    private long numKeys;
    private int numLeafBlocks;
    private SnapshotEntry lastEntry;
    private final int numColumns;
    private final List<ValueBlockEncoder> valueBlockEncoders;
    private final List<Integer> numValueBlocksByColumn;
    private final List<Integer> numValuesInBlockByColumn;

    public SnapshotWriter(SnapshotKey snapshotKey, SnapshotFileStorage snapshotFileStorage, SnapshotBlockStorage snapshotBlockStorage, SnapshotWriterConfig config, ExecutorService exec) {
        this.snapshotKey = snapshotKey;
        this.tracker = new SnapshotWriterTracker(snapshotKey);
        this.config = config;
        this.blockWriter = new SnapshotBlockWriter(snapshotKey, this.tracker, snapshotBlockStorage, snapshotFileStorage, config, exec);
        this.lastStatusLogMs = System.currentTimeMillis();
        this.messages = new LinkedBlockingDeque(config.batchQueueLength);
        this.writerThreadCompletionLatch = new CountDownLatch(1);
        this.branchBlockEncoders = new ArrayList<BranchBlockEncoder>();
        this.numBranchBlocksByLevel = new ArrayList<Integer>();
        this.leafBlockEncoder = config.leafBlockEncoderSupplier.get();
        this.numLeafBlocks = 0;
        this.numColumns = config.numColumns;
        this.valueBlockEncoders = new ArrayList<ValueBlockEncoder>();
        this.numValueBlocksByColumn = new ArrayList<Integer>();
        this.numValuesInBlockByColumn = new ArrayList<Integer>();
        IntStream.range(0, this.numColumns).forEach($ -> {
            this.valueBlockEncoders.add(snapshotWriterConfig.valueBlockEncoderSupplier.get());
            this.numValueBlocksByColumn.add(0);
            this.numValuesInBlockByColumn.add(0);
        });
        this.writerThread = this.startWriterThread();
        this.startTimeMs = System.currentTimeMillis();
    }

    private Thread startWriterThread() {
        Runnable writerRunnable = () -> {
            Message batch;
            do {
                long beforeNs = System.nanoTime();
                batch = (Message)LinkedBlockingDequeTool.pollForever(this.messages);
                long ns = System.nanoTime() - beforeNs;
                this.tracker.readStallNs.incrementBy(ns);
                batch.entries.forEach(this::add);
                this.tracker.entriesQueued.decrementBySize(batch.entries);
                this.tracker.entriesProcessed.incrementBySize(batch.entries);
            } while (!batch.isLast);
            this.writerThreadCompletionLatch.countDown();
        };
        String writerThreadName = String.join((CharSequence)"-", this.getClass().getSimpleName(), this.snapshotKey.toString());
        Thread thread = new Thread(writerRunnable, writerThreadName);
        thread.start();
        return thread;
    }

    @Override
    public void close() {
        this.writerThread.interrupt();
    }

    public void addBatch(List<SnapshotEntry> entries) {
        LinkedBlockingDequeTool.put(this.messages, (Object)Message.addBatch(entries));
        this.tracker.entriesQueued.incrementBySize(entries);
        this.logStatusOccasional();
    }

    private void add(SnapshotEntry entry) {
        int column;
        int[] valueIndexes;
        int[] valueBlockIds;
        int diff;
        if (this.numColumns != entry.columnValues.length) {
            String message = String.format("Expected %s values but found %s", this.numColumns, entry.columnValues.length);
            throw new IllegalArgumentException(message);
        }
        if (this.config.sorted && this.lastEntry != null && this.leafBlockEncoder.numRecords() == 0 && (diff = Arrays.compareUnsigned(entry.keySlab(), entry.keyFrom(), entry.keyTo(), this.lastEntry.keySlab(), this.lastEntry.keyFrom(), this.lastEntry.keyTo())) <= 0) {
            String message = String.format("key=[%s] must sort after lastKey=[%s]", CsvIntByteStringCodec.INSTANCE.encode(entry.key()), CsvIntByteStringCodec.INSTANCE.encode(this.lastEntry.key()));
            throw new IllegalArgumentException(message);
        }
        long keyId = this.numKeys;
        if (this.numColumns == 0) {
            valueBlockIds = EmptyArray.INT;
            valueIndexes = EmptyArray.INT;
        } else {
            valueBlockIds = new int[this.numColumns];
            valueIndexes = new int[this.numColumns];
            column = 0;
            while (column < this.numColumns) {
                valueBlockIds[column] = this.numValueBlocksByColumn.get(column);
                valueIndexes[column] = this.numValuesInBlockByColumn.get(column);
                ++column;
            }
        }
        this.leafBlockEncoder.add(this.numLeafBlocks, keyId, entry, valueBlockIds, valueIndexes);
        if (this.leafBlockEncoder.numBytes() >= this.config.leafBlockSize) {
            this.addBranchEntry(0, keyId, entry, this.numLeafBlocks);
            this.blockWriter.submitLeaf(this.leafBlockEncoder);
            this.leafBlockEncoder = this.config.leafBlockEncoderSupplier.get();
            ++this.numLeafBlocks;
        }
        column = 0;
        while (column < this.numColumns) {
            ValueBlockEncoder valueBlockEncoder = this.valueBlockEncoders.get(column);
            valueBlockEncoder.add(entry, column);
            this.numValuesInBlockByColumn.set(column, this.numValuesInBlockByColumn.get(column) + 1);
            if (valueBlockEncoder.numBytes() >= this.config.valueBlockSize) {
                this.blockWriter.submitValueBlock(column, this.numValueBlocksByColumn.get(column), valueBlockEncoder);
                this.valueBlockEncoders.set(column, this.config.valueBlockEncoderSupplier.get());
                this.numValueBlocksByColumn.set(column, this.numValueBlocksByColumn.get(column) + 1);
                this.numValuesInBlockByColumn.set(column, 0);
            }
            ++column;
        }
        ++this.numKeys;
        this.lastEntry = entry;
    }

    private void addBranchEntry(int level, long keyId, SnapshotEntry entry, int childBlockId) {
        if (level > this.branchBlockEncoders.size() - 1) {
            this.branchBlockEncoders.add(this.config.branchBlockEncoderFactory.apply(level));
            this.numBranchBlocksByLevel.add(0);
        }
        BranchBlockEncoder encoder = this.branchBlockEncoders.get(level);
        int blockId = this.numBranchBlocksByLevel.get(level);
        encoder.add(blockId, keyId, entry, childBlockId);
        if (encoder.numBytes() >= this.config.branchBlockSize) {
            this.addBranchEntry(level + 1, keyId, entry, this.numBranchBlocksByLevel.get(level));
            this.blockWriter.submitBranch(encoder);
            this.branchBlockEncoders.set(level, this.config.branchBlockEncoderFactory.apply(level));
            this.numBranchBlocksByLevel.set(level, this.numBranchBlocksByLevel.get(level) + 1);
        }
    }

    public Optional<RootBlock> complete() {
        LinkedBlockingDequeTool.put(this.messages, (Object)Message.last());
        CountDownLatchTool.await((CountDownLatch)this.writerThreadCompletionLatch);
        IntStream.range(0, this.valueBlockEncoders.size()).forEach(column -> {
            ValueBlockEncoder valueBlockEncoder = this.valueBlockEncoders.get(column);
            if (valueBlockEncoder.numRecords() > 0) {
                this.blockWriter.submitValueBlock(column, this.numValueBlocksByColumn.get(column), valueBlockEncoder);
                this.numValueBlocksByColumn.set(column, this.numValueBlocksByColumn.get(column) + 1);
            }
        });
        if (this.leafBlockEncoder.numRecords() > 0) {
            this.addBranchEntry(0, this.numKeys, this.lastEntry, this.numLeafBlocks);
            this.blockWriter.submitLeaf(this.leafBlockEncoder);
            ++this.numLeafBlocks;
        }
        IntStream.range(0, this.branchBlockEncoders.size()).forEach(level -> {
            BranchBlockEncoder branchEncoder = this.branchBlockEncoders.get(level);
            if (branchEncoder.numRecords() > 0) {
                if (level != this.branchBlockEncoders.size() - 1) {
                    this.addBranchEntry(level + 1, this.numKeys, this.lastEntry, this.numBranchBlocksByLevel.get(level));
                }
                this.blockWriter.submitBranch(branchEncoder);
                this.branchBlockEncoders.set(level, this.config.branchBlockEncoderFactory.apply(level));
                this.numBranchBlocksByLevel.set(level, this.numBranchBlocksByLevel.get(level) + 1);
            }
        });
        this.blockWriter.complete();
        if (this.numKeys == 0L) {
            return Optional.empty();
        }
        RootBlock root = this.blockWriter.flushRootBlock(this.startTimeMs, this.numBranchBlocksByLevel, this.numValueBlocksByColumn, this.branchBlockEncoders.size(), this.numKeys, this.numLeafBlocks);
        this.logStatus();
        String logTokens = (String)Scanner.of(root.toKeyValueStrings().entrySet()).map(kv -> String.valueOf((String)kv.getKey()) + "=" + (String)kv.getValue()).collect(Collectors.joining(", "));
        logger.warn("Completed group={}, id={}, {}", new Object[]{this.snapshotKey.groupId, this.snapshotKey.snapshotId, logTokens});
        return Optional.of(root);
    }

    private void logStatusOccasional() {
        long now = System.currentTimeMillis();
        long elapsedMs = now - this.lastStatusLogMs;
        if (elapsedMs > this.config.logPeriodMs) {
            this.logStatus();
        }
    }

    private void logStatus() {
        long elapsedMs = System.currentTimeMillis() - this.startTimeMs;
        logger.warn("{}", (Object)this.tracker.toLog(elapsedMs));
        this.lastStatusLogMs = System.currentTimeMillis();
    }

    private static class Message {
        final List<SnapshotEntry> entries;
        final boolean isLast;

        Message(List<SnapshotEntry> entries, boolean isLast) {
            this.entries = entries;
            this.isLast = isLast;
        }

        static Message addBatch(List<SnapshotEntry> entries) {
            return new Message(entries, false);
        }

        static Message last() {
            return new Message(List.of(), true);
        }
    }
}

