/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.kvfile.merge;

import io.datarouter.bytes.blockfile.dto.BlockfileNameAndSize;
import io.datarouter.bytes.blockfile.read.BlockfileMetadataReader;
import io.datarouter.bytes.blockfile.read.BlockfileReader;
import io.datarouter.bytes.blockfile.read.BlockfileReaderBuilder;
import io.datarouter.bytes.blockfile.write.BlockfileWriter;
import io.datarouter.bytes.kvfile.kv.KvFileEntry;
import io.datarouter.bytes.kvfile.merge.KvFileMergePlan;
import io.datarouter.bytes.kvfile.merge.KvFileMergerParams;
import io.datarouter.bytes.kvfile.merge.KvFileMergerThreadsCalculator;
import io.datarouter.bytes.kvfile.merge.KvFileMergerTracker;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class KvFileMerger {
    private final KvFileMergerParams params;
    private final KvFileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final String filename;
    private final KvFileMergerTracker tracker;
    private final int numVcpus;
    private final List<KvFileMergerThreadsCalculator.ThreadsForFile> threadsForFileListDesc;

    public KvFileMerger(KvFileMergerParams params, KvFileMergePlan plan, Supplier<Boolean> shouldStop) {
        this.params = params;
        this.plan = plan;
        this.shouldStop = shouldStop;
        this.filename = params.storageParams().filenameSupplier().get();
        this.tracker = new KvFileMergerTracker(plan, this.filename);
        this.numVcpus = Runtime.getRuntime().availableProcessors();
        this.threadsForFileListDesc = new KvFileMergerThreadsCalculator(plan, params.readParams()).calc();
    }

    public BlockfileNameAndSize merge() {
        this.tracker.startTime = Instant.now();
        List blockBatchScanners = Scanner.of(this.plan.files()).parallelUnordered(new Threads(this.params.readParams().prefetchExec(), this.plan.files().size())).map(this::makeReader).list();
        List kvScanners = Scanner.of((Iterable)blockBatchScanners).map(inputScanner -> inputScanner.timeNanos(this.tracker.waitForBlocksNs::addAndGet).concatIter(BlockfileReader.BlockfileDecodedBlockBatch::blocks).map(BlockfileReader.BlockfileDecodedBlock::value).concat(Scanner::of)).list();
        this.tracker.logInitializationStats();
        this.tracker.resetCountersSinceLastLog();
        this.tracker.mergeStartTime = Instant.now();
        this.tracker.waitForReadersNs.addAndGet(Duration.between(this.tracker.startTime, this.tracker.mergeStartTime).toNanos());
        BlockfileWriter.BlockfileWriteResult writeResult = (BlockfileWriter.BlockfileWriteResult)this.plan.collatorStrategy().method.apply(kvScanners).batchByMinSize(this.params.writeParams().minBlockSize().toBytes(), KvFileEntry::length).timeNanos(this.tracker.waitForCollatorNs::addAndGet).each(batchForBlock -> {
            this.tracker.blocksWritten.incrementAndGet();
            this.tracker.blocksWrittenSinceLastLog.incrementAndGet();
            this.tracker.recordsWritten.addAndGet(batchForBlock.size());
            this.tracker.recordsWrittenSinceLastLog.addAndGet(batchForBlock.size());
        }).periodic(this.params.heartbeatPeriod(), $ -> {
            this.tracker.logIntermediateProgress();
            this.tracker.resetCountersSinceLastLog();
            this.throwIfShouldStop();
        }).apply(this.makeWriter(this.filename)::write);
        BlockfileNameAndSize newFile = new BlockfileNameAndSize(this.filename, writeResult.fileLength().toBytes());
        this.tracker.logProgress(true, newFile);
        return newFile;
    }

    private Scanner<BlockfileReader.BlockfileDecodedBlockBatch<List<KvFileEntry>>> makeReader(BlockfileNameAndSize file) {
        int numReadThreads = (Integer)Scanner.of(this.threadsForFileListDesc).include(forFile -> forFile.file().equals(file)).map(KvFileMergerThreadsCalculator.ThreadsForFile::threads).findFirst().orElseThrow();
        BlockfileMetadataReader<List<KvFileEntry>> metadataReader = this.params.storageParams().blockfile().newMetadataReaderBuilder(file.name()).setKnownFileLength(file.size()).build();
        BlockfileReaderBuilder<List<KvFileEntry>> readerBuilder = this.params.storageParams().blockfile().newReaderBuilder(metadataReader, $ -> this.params.blockFormat().newBlockCodec()::decodeAll).setReadChunkSize(this.params.readParams().readChunkSize()).setDecodeBatchSize(this.params.readParams().decodeBatchSize()).setDecodeThreads(new Threads(this.params.readParams().readExec(), this.numVcpus));
        if (numReadThreads > 0) {
            readerBuilder.setReadThreads(Threads.useExecForSingleThread((ExecutorService)this.params.readParams().readExec(), (int)numReadThreads));
        }
        BlockfileReader<List<KvFileEntry>> reader = readerBuilder.build();
        return reader.scanDecodedBlockBatches().each(blockBatch -> {
            this.tracker.blocksRead.addAndGet(blockBatch.blocks().size());
            this.tracker.blocksReadSinceLastLog.addAndGet(blockBatch.blocks().size());
            this.tracker.compressedBytesRead.addAndGet(blockBatch.totalCompressedSize());
            this.tracker.compressedBytesReadSinceLastLog.addAndGet(blockBatch.totalCompressedSize());
            this.tracker.decompressedBytesRead.addAndGet(blockBatch.totalDecompressedSize());
            this.tracker.decompressedBytesReadSinceLastLog.addAndGet(blockBatch.totalDecompressedSize());
            blockBatch.blocks().forEach(block -> {
                this.tracker.recordsRead.addAndGet(((List)block.value()).size());
                this.tracker.recordsReadSinceLastLog.addAndGet(((List)block.value()).size());
            });
        }).peekFirst($ -> {});
    }

    private BlockfileWriter<List<KvFileEntry>> makeWriter(String filename) {
        boolean multipartWrite = this.plan.totalInputSize().toBytes() > this.params.writeParams().multipartUploadThreshold().toBytes();
        Threads writeThreads = new Threads(this.params.writeParams().writeExec(), this.params.writeParams().writeThreads());
        return this.params.storageParams().blockfile().newWriterBuilder(filename, this.params.blockFormat().newBlockCodec()::encodeAll).setCompressor(this.params.writeParams().compressor()).setEncodeBatchSize(this.params.writeParams().encodeBatchSize()).setEncodeThreads(new Threads(this.params.writeParams().encodeExec(), this.numVcpus)).setMultipartWrite(multipartWrite).setWriteThreads(writeThreads).build();
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }
}

