/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.kvfile.merge;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.KvString;
import io.datarouter.bytes.blockfile.dto.BlockfileNameAndSize;
import io.datarouter.bytes.kvfile.merge.KvFileMergePlan;
import io.datarouter.scanner.Scanner;
import java.lang.invoke.CallSite;
import java.math.BigInteger;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvFileMergerTracker {
    private static final Logger logger = LoggerFactory.getLogger(KvFileMergerTracker.class);
    public final KvFileMergePlan plan;
    public final String filename;
    public final AtomicLong lastLogTimeNs;
    public Instant startTime;
    public Instant mergeStartTime;
    public final AtomicLong compressedBytesRead = new AtomicLong();
    public final AtomicLong compressedBytesReadSinceLastLog = new AtomicLong();
    public final AtomicLong decompressedBytesRead = new AtomicLong();
    public final AtomicLong decompressedBytesReadSinceLastLog = new AtomicLong();
    public final AtomicLong blocksRead = new AtomicLong();
    public final AtomicLong blocksReadSinceLastLog = new AtomicLong();
    public final AtomicLong recordsRead = new AtomicLong();
    public final AtomicLong recordsReadSinceLastLog = new AtomicLong();
    public final AtomicLong recordsWritten = new AtomicLong();
    public final AtomicLong recordsWrittenSinceLastLog = new AtomicLong();
    public final AtomicLong blocksWritten = new AtomicLong();
    public final AtomicLong blocksWrittenSinceLastLog = new AtomicLong();
    public final AtomicLong waitForReadersNs = new AtomicLong();
    public final AtomicLong waitForBlocksNs = new AtomicLong();
    public final AtomicLong waitForCollatorNs = new AtomicLong();

    public KvFileMergerTracker(KvFileMergePlan plan, String filename) {
        this.plan = plan;
        this.filename = filename;
        this.lastLogTimeNs = new AtomicLong();
    }

    public void resetCountersSinceLastLog() {
        this.lastLogTimeNs.set(System.nanoTime());
        this.compressedBytesReadSinceLastLog.set(0L);
        this.decompressedBytesReadSinceLastLog.set(0L);
        this.blocksReadSinceLastLog.set(0L);
        this.recordsReadSinceLastLog.set(0L);
        this.recordsWrittenSinceLastLog.set(0L);
        this.blocksWrittenSinceLastLog.set(0L);
    }

    public void logInitializationStats() {
        Function<Long, String> fmtBytes = bytes -> ByteLength.ofBytes(bytes).toDisplay();
        Function<Number, String> fmtCommas = number -> new DecimalFormat("###,###,###,###,###,###,###").format(number);
        logger.warn("initialized {}", (Object)new KvString().add("compressedBytes", this.compressedBytesRead.get(), fmtBytes::apply).add("decompressedBytes", this.decompressedBytesRead.get(), fmtBytes::apply).add("blocks", this.blocksRead.get(), fmtCommas::apply).add("records", this.recordsRead.get(), fmtCommas::apply));
    }

    public void logIntermediateProgress() {
        this.logProgress(false, null);
    }

    public void logProgress(boolean complete, BlockfileNameAndSize newFile) {
        Duration totalDuration = Duration.between(this.startTime, Instant.now());
        Duration mergeDuration = Duration.between(this.mergeStartTime, Instant.now());
        long nsSinceLastLog = System.nanoTime() - this.lastLogTimeNs.get();
        String action = complete ? "merged" : "merging";
        Function<Number, String> withCommas = number -> new DecimalFormat("###,###,###,###,###,###,###").format(number);
        Function<Long, String> nanosToString = nanos -> (String)withCommas.apply(nanos / 1000000L) + "ms";
        BigInteger bigBillion = BigInteger.valueOf(1000000000L);
        BigInteger bigMergeDurationNs = BigInteger.valueOf(mergeDuration.toNanos());
        BigInteger bigNsSinceLastLog = BigInteger.valueOf(nsSinceLastLog);
        Function<Long, Long> perSecondLatestFn = count -> BigInteger.valueOf(count).multiply(bigBillion).divide(bigNsSinceLastLog).longValue();
        Function<Long, Long> perSecondAvgFn = count -> BigInteger.valueOf(count).multiply(bigBillion).divide(bigMergeDurationNs).longValue();
        double readCompletionRatio = (double)this.compressedBytesRead.get() / (double)this.plan.totalInputSize().toBytes();
        if (complete) {
            readCompletionRatio = 1.0;
        }
        String readCompletionPctString = new DecimalFormat("#.##").format(100.0 * readCompletionRatio) + "%";
        long compressedBytesRpsLatest = perSecondLatestFn.apply(this.compressedBytesReadSinceLastLog.get());
        long decompressedBytesRpsLatest = perSecondLatestFn.apply(this.decompressedBytesReadSinceLastLog.get());
        long blocksRpsLatest = perSecondLatestFn.apply(this.blocksReadSinceLastLog.get());
        long recordsRpsLatest = perSecondLatestFn.apply(this.recordsReadSinceLastLog.get());
        long recordsWpsLatest = perSecondLatestFn.apply(this.recordsWrittenSinceLastLog.get());
        long blocksWpssLatest = perSecondLatestFn.apply(this.blocksWrittenSinceLastLog.get());
        long compressedBytesRpsAvg = perSecondAvgFn.apply(this.compressedBytesRead.get());
        long decompressedBytesRpsAvg = perSecondAvgFn.apply(this.decompressedBytesRead.get());
        long blocksRpsAvg = perSecondAvgFn.apply(this.blocksRead.get());
        long recordsRpsAvg = perSecondAvgFn.apply(this.recordsRead.get());
        long recordsWpsAvg = perSecondAvgFn.apply(this.recordsWritten.get());
        long blocksWpsAvg = perSecondAvgFn.apply(this.blocksWritten.get());
        long waitForReadersNsActual = this.waitForReadersNs.get();
        long waitForBlocksNsActual = this.waitForBlocksNs.get();
        long waitForCollatorNsActual = this.waitForCollatorNs.get() - waitForBlocksNsActual;
        LinkedHashMap<String, Object> planAttrs = new LinkedHashMap<String, Object>();
        planAttrs.put("progress", readCompletionPctString);
        planAttrs.put("files", withCommas.apply(this.plan.files().size()) + "/" + withCommas.apply(this.plan.numCompactorFiles()));
        planAttrs.put("levels", this.plan.levels().stream().map(Object::toString).collect(Collectors.joining("/")));
        planAttrs.put("bytes", this.plan.totalInputSize().toDisplay() + "/" + this.plan.numCompactorBytes().toDisplay());
        planAttrs.put("collator", this.plan.collatorStrategy().name());
        planAttrs.put("filename", this.filename);
        LinkedHashMap<String, String> compressedByteReadAttrs = new LinkedHashMap<String, String>();
        compressedByteReadAttrs.put("compressedBytesRead", ByteLength.ofBytes(this.compressedBytesRead.get()).toDisplay());
        compressedByteReadAttrs.put("perSec", ByteLength.ofBytes(compressedBytesRpsLatest).toDisplay());
        compressedByteReadAttrs.put("perSecAvg", ByteLength.ofBytes(compressedBytesRpsAvg).toDisplay());
        LinkedHashMap<String, String> decompressedByteReadAttrs = new LinkedHashMap<String, String>();
        decompressedByteReadAttrs.put("decompressedBytesRead", ByteLength.ofBytes(this.decompressedBytesRead.get()).toDisplay());
        decompressedByteReadAttrs.put("perSec", ByteLength.ofBytes(decompressedBytesRpsLatest).toDisplay());
        decompressedByteReadAttrs.put("perSecAvg", ByteLength.ofBytes(decompressedBytesRpsAvg).toDisplay());
        LinkedHashMap<String, String> recordReadAttrs = new LinkedHashMap<String, String>();
        recordReadAttrs.put("count", withCommas.apply(this.recordsRead.get()));
        recordReadAttrs.put("perSec", withCommas.apply(recordsRpsLatest));
        recordReadAttrs.put("perSecAvg", withCommas.apply(recordsRpsAvg));
        LinkedHashMap<String, String> blockReadAttrs = new LinkedHashMap<String, String>();
        blockReadAttrs.put("count", withCommas.apply(this.blocksRead.get()));
        blockReadAttrs.put("perSec", withCommas.apply(blocksRpsLatest));
        blockReadAttrs.put("perSecAvg", withCommas.apply(blocksRpsAvg));
        LinkedHashMap<String, String> recordWriteAttrs = new LinkedHashMap<String, String>();
        recordWriteAttrs.put("count", withCommas.apply(this.recordsWritten.get()));
        recordWriteAttrs.put("perSec", withCommas.apply(recordsWpsLatest));
        recordWriteAttrs.put("perSecAvg", withCommas.apply(recordsWpsAvg));
        LinkedHashMap<String, String> blockWriteAttrs = new LinkedHashMap<String, String>();
        blockWriteAttrs.put("count", withCommas.apply(this.blocksWritten.get()));
        blockWriteAttrs.put("perSec", withCommas.apply(blocksWpssLatest));
        blockWriteAttrs.put("perSecAvg", withCommas.apply(blocksWpsAvg));
        LinkedHashMap<String, String> otherAttrs = new LinkedHashMap<String, String>();
        otherAttrs.put("totalDuration", nanosToString.apply(totalDuration.toNanos()).toString());
        otherAttrs.put("mergeDuration", nanosToString.apply(mergeDuration.toNanos()).toString());
        otherAttrs.put("waitForReaders", nanosToString.apply(waitForReadersNsActual));
        otherAttrs.put("waitForBlocks", nanosToString.apply(waitForBlocksNsActual));
        otherAttrs.put("waitForCollator", nanosToString.apply(waitForCollatorNsActual));
        Function<Map, String> toLineFn = map -> (String)Scanner.of(map.keySet()).map(key -> key + "=" + (String)map.get(key)).collect(Collectors.joining(", ", "[", "]"));
        String linePrefix = "\n  ";
        List<CallSite> lines = List.of("kvMergePlan             " + toLineFn.apply(planAttrs), "kvReadCompressedBytes   " + toLineFn.apply(compressedByteReadAttrs), "kvReadDecompressedBytes " + toLineFn.apply(decompressedByteReadAttrs), "kvReadBlocks            " + toLineFn.apply(blockReadAttrs), "kvReadRecords           " + toLineFn.apply(recordReadAttrs), "kvWriteRecords          " + toLineFn.apply(recordWriteAttrs), "kvWriteBlocks           " + toLineFn.apply(blockWriteAttrs), "kvTime                  " + toLineFn.apply(otherAttrs));
        String message = String.format("%s, newFile=%s %s", action, Optional.ofNullable(newFile).map(BlockfileNameAndSize::toString).orElse("?"), lines.stream().collect(Collectors.joining(linePrefix, linePrefix, "")));
        logger.warn(message);
    }
}

