/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.kvfile;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.CountingInputStream;
import io.datarouter.bytes.kvfile.KvFileBlock;
import io.datarouter.bytes.kvfile.KvFileCompactorFileCache;
import io.datarouter.bytes.kvfile.KvFileEntry;
import io.datarouter.bytes.kvfile.KvFileNameAndSize;
import io.datarouter.bytes.kvfile.KvFileReader;
import io.datarouter.bytes.kvfile.KvFileStorage;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.InputStream;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KvFileMerger {
    private static final Logger logger = LoggerFactory.getLogger(KvFileMerger.class);
    private final KvFileMergerParams params;
    private final KvFileCompactorFileCache.KvFileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final Instant startTime;
    private final AtomicLong lastLogTimeNs;
    private final AtomicLong waitForChunksNs = new AtomicLong();
    private final AtomicLong waitForCollatorNs = new AtomicLong();
    private final AtomicLong waitForShouldStopNs = new AtomicLong();
    private final AtomicLong waitForEncoderNs = new AtomicLong();
    private final AtomicLong recordsRead = new AtomicLong();
    private final AtomicLong recordsWritten = new AtomicLong();
    private final AtomicLong recordsWrittenSinceLastLog = new AtomicLong();
    private final AtomicLong blocksRead = new AtomicLong();
    private final AtomicLong blocksWritten = new AtomicLong();
    private final AtomicLong bytesRead = new AtomicLong();
    private final AtomicLong bytesWritten = new AtomicLong();

    public KvFileMerger(KvFileMergerParams params, KvFileCompactorFileCache.KvFileMergePlan plan, Supplier<Boolean> shouldStop) {
        this.params = params;
        this.plan = plan;
        this.shouldStop = shouldStop;
        this.startTime = Instant.now();
        this.lastLogTimeNs = new AtomicLong(0L);
        ByteLength maxUploadSizeForS3 = ByteLength.ofBytes(10000L * params.writeParams().partSize().toBytes());
        if (plan.totalInputSize().toBytes() > maxUploadSizeForS3.toBytes() / 2L) {
            logger.warn("totalInputSize={} is greater than half of maxUploadSizeForS3={}.  Consider increasing partSize.", (Object)plan.totalInputSize().toDisplay(), (Object)maxUploadSizeForS3.toDisplay());
        }
    }

    public KvFileNameAndSize merge() {
        KvFileNameAndSize newFile = (KvFileNameAndSize)((Scanner)((Scanner)this.makeReaders().listTo(this.plan.collatorStrategy().method)).batchByMinSize(this.params.encodeParams().minBlockSize().toBytes(), KvFileEntry::length).batch(this.params.encodeParams().encodeBatchSize()).timeNanos(this.waitForCollatorNs::addAndGet).periodic(this.params.heartbeatPeriod(), $ -> this.throwIfShouldStop()).timeNanos(this.waitForShouldStopNs::addAndGet).periodic(this.params.logPeriod(), $ -> this.logIntermediateProgress()).apply(this::encodeBlocksToBytes)).timeNanos(this.waitForEncoderNs::addAndGet).concat(Scanner::of).apply(this::write);
        this.logProgress(true, newFile);
        return newFile;
    }

    private Scanner<KvFileReader> makeReaders() {
        AtomicInteger remainingFiles = new AtomicInteger(this.plan.files().size());
        AtomicInteger remainingThreads = new AtomicInteger(this.params.byteReaderParams().totalThreads());
        return Scanner.of(this.plan.files()).sort(Comparator.comparing(KvFileNameAndSize::size)).parallelUnordered(this.params.byteReaderParams().makeReadersThreads()).map(file -> {
            int numThreadsActual;
            if (remainingFiles.get() == 1) {
                numThreadsActual = Math.max(1, remainingThreads.get());
            } else {
                double pctOfTotalSize = (double)file.size() / (double)this.plan.totalInputSize().toBytes();
                double numThreadsCalc = pctOfTotalSize * (double)this.params.byteReaderParams().totalThreads();
                numThreadsActual = Math.max(1, (int)numThreadsCalc);
            }
            remainingFiles.decrementAndGet();
            remainingThreads.addAndGet(-numThreadsActual);
            logger.info("making KvReader size={}, threads={}, chunkSize={}", new Object[]{ByteLength.ofBytes(file.size()).toDisplay(), numThreadsActual, this.params.byteReaderParams().chunkSize().toDisplay()});
            return this.makeReader((KvFileNameAndSize)file, numThreadsActual, this.params.byteReaderParams().chunkSize());
        });
    }

    private KvFileReader makeReader(KvFileNameAndSize file, int numThreads, ByteLength chunkSize) {
        if (file.size() <= (long)chunkSize.toBytesInt()) {
            long startNs = System.nanoTime();
            byte[] fullBytes = this.params.storageParams().storage().read(file.name());
            this.waitForChunksNs.addAndGet(System.nanoTime() - startNs);
            this.bytesRead.addAndGet(fullBytes.length);
            return new KvFileReader(fullBytes, file.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads());
        }
        if (this.params.byteReaderParams().readParallel()) {
            return (KvFileReader)this.params.storageParams().storage().readParallel(file.name(), 0L, file.size(), new Threads(this.params.byteReaderParams().readParallelExec(), numThreads), chunkSize).timeNanos(this.waitForChunksNs::addAndGet).each(chunk -> {
                long l = this.bytesRead.addAndGet(((byte[])chunk).length);
            }).apply(chunks -> new KvFileReader((Scanner<byte[]>)chunks, file.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads()));
        }
        InputStream inputStream = this.params.storageParams().storage().readInputStream(file.name());
        CountingInputStream countingInputStream = new CountingInputStream(inputStream, ByteLength.ofMiB(1L).toBytesInt(), this.bytesRead::addAndGet);
        return new KvFileReader(countingInputStream, file.name(), this.params.kvReaderParams().parseBatchSize(), this.params.kvReaderParams().parseThreads());
    }

    private Scanner<List<byte[]>> encodeBlocksToBytes(Scanner<List<List<KvFileEntry>>> entryBlockBatches) {
        return entryBlockBatches.parallelOrdered(this.params.encodeParams().encodeThreads()).map(entryBlockBatch -> Scanner.of((Iterable)entryBlockBatch).each(batch -> {
            this.recordsWritten.addAndGet(batch.size());
            this.recordsWrittenSinceLastLog.addAndGet(batch.size());
            this.blocksWritten.incrementAndGet();
        }).map(KvFileBlock::new).map(KvFileBlock::toBytes).each(blockBytes -> {
            long l = this.bytesWritten.addAndGet(((byte[])blockBytes).length);
        }).list());
    }

    private KvFileNameAndSize write(Scanner<byte[]> blocks) {
        String filename = this.params.storageParams().filenameSupplier().get();
        if (this.plan.totalInputSize().toBytes() > this.params.writeParams().partSize().toBytes()) {
            Scanner<List<byte[]>> parts = this.groupBlocksIntoUploadParts(blocks);
            this.params.storageParams().storage().writeParallel(filename, parts, this.params.writeParams().writeThreads());
        } else {
            byte[] bytes = (byte[])blocks.listTo(ByteTool::concat);
            this.params.storageParams().storage().write(filename, bytes);
        }
        return new KvFileNameAndSize(filename, this.bytesWritten.get());
    }

    private Scanner<List<byte[]>> groupBlocksIntoUploadParts(Scanner<byte[]> blocks) {
        AtomicLong splitId = new AtomicLong();
        AtomicLong splitLength = new AtomicLong();
        return blocks.each(blockBytes -> {
            if (splitLength.addAndGet(((byte[])blockBytes).length) >= this.params.writeParams().partSize().toBytes()) {
                splitId.incrementAndGet();
                splitLength.set(0L);
            }
        }).splitBy($ -> splitId.get()).map(Scanner::list);
    }

    private void logIntermediateProgress() {
        this.logProgress(false, null);
        this.lastLogTimeNs.set(System.nanoTime());
        this.recordsWrittenSinceLastLog.set(0L);
    }

    private void logProgress(boolean complete, KvFileNameAndSize newFile) {
        Duration duration = Duration.between(this.startTime, Instant.now());
        String action = complete ? "merged" : "merging";
        Function<Number, String> withCommas = number -> new DecimalFormat("###,###,###,###,###,###,###").format(number);
        Function<Long, String> nanosToString = nanos -> String.valueOf((String)withCommas.apply(nanos / 1000000L)) + "ms";
        LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
        long nsSinceLastLog = System.nanoTime() - this.lastLogTimeNs.get();
        long rwpsLatest = this.recordsWrittenSinceLastLog.get() * 1000000000L / nsSinceLastLog;
        kvs.put("rwpsLatest", withCommas.apply(rwpsLatest));
        long rwpsCumulative = this.recordsWritten.get() * 1000000000L / duration.toNanos();
        kvs.put("rwpsCumulative", withCommas.apply(rwpsCumulative));
        kvs.put("recordsRead", withCommas.apply(this.recordsRead.get()));
        kvs.put("recordsWritten", withCommas.apply(this.recordsWritten.get()));
        kvs.put("blocksRead", withCommas.apply(this.blocksRead.get()));
        kvs.put("blocksWritten", withCommas.apply(this.blocksWritten.get()));
        kvs.put("bytesRead", ByteLength.ofBytes(this.bytesRead.get()).toDisplay());
        kvs.put("bytesWritten", ByteLength.ofBytes(this.bytesWritten.get()).toDisplay());
        kvs.put("duration", nanosToString.apply(duration.toNanos()).toString());
        kvs.put("waitForChunks", nanosToString.apply(this.waitForChunksNs.get()));
        kvs.put("waitForCollator", nanosToString.apply(this.waitForCollatorNs.get()));
        kvs.put("waitForShouldStop", nanosToString.apply(this.waitForShouldStopNs.get()));
        kvs.put("waitForEncoder", nanosToString.apply(this.waitForEncoderNs.get()));
        kvs.put("collator", this.plan.collatorStrategy().name());
        if (newFile != null) {
            kvs.put("newFile", newFile.name());
        }
        String message = (String)Scanner.of(kvs.keySet()).map(key -> String.valueOf(key) + "=" + (String)kvs.get(key)).collect(Collectors.joining(", ", String.valueOf(action) + "[", "]"));
        logger.warn(message);
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }

    public record KvFileMergerByteReaderParams(int memoryFanIn, int streamingFanIn, Threads makeReadersThreads, boolean readParallel, ExecutorService readParallelExec, ByteLength readBufferSize, ByteLength chunkSize) {
        public int totalThreads() {
            return Math.toIntExact(this.readBufferSize.toBytes() / this.chunkSize.toBytes());
        }
    }

    public record KvFileMergerEncodeParams(ByteLength minBlockSize, int encodeBatchSize, Threads encodeThreads) {
    }

    public record KvFileMergerKvReaderParams(int parseBatchSize, Threads parseThreads) {
    }

    public record KvFileMergerParams(KvFileMergerStorageParams storageParams, KvFileMergerByteReaderParams byteReaderParams, KvFileMergerKvReaderParams kvReaderParams, KvFileMergerEncodeParams encodeParams, KvFileMergerWriteParams writeParams, Duration heartbeatPeriod, Duration logPeriod) {
    }

    public record KvFileMergerStorageParams(KvFileStorage storage, Supplier<String> filenameSupplier) {
    }

    public record KvFileMergerWriteParams(Threads writeThreads, ByteLength partSize) {
    }
}

