/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.blockfile.io.merge;

import io.datarouter.bytes.blockfile.block.parsed.BlockfileDecodedBlock;
import io.datarouter.bytes.blockfile.block.parsed.BlockfileDecodedBlockBatch;
import io.datarouter.bytes.blockfile.io.merge.BlockfileMergePlan;
import io.datarouter.bytes.blockfile.io.merge.BlockfileMergerParams;
import io.datarouter.bytes.blockfile.io.merge.BlockfileMergerThreadsCalculator;
import io.datarouter.bytes.blockfile.io.merge.BlockfileMergerTracker;
import io.datarouter.bytes.blockfile.io.read.BlockfileReader;
import io.datarouter.bytes.blockfile.io.read.BlockfileReaderBuilder;
import io.datarouter.bytes.blockfile.io.storage.BlockfileNameAndSize;
import io.datarouter.bytes.blockfile.io.write.BlockfileWriter;
import io.datarouter.bytes.blockfile.row.BlockfileRow;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

public class BlockfileMerger {
    private final BlockfileMergerParams params;
    private final BlockfileMergePlan plan;
    private final Supplier<Boolean> shouldStop;
    private final String filename;
    private final BlockfileMergerTracker tracker;
    private final int numVcpus;
    private final List<BlockfileMergerThreadsCalculator.ThreadsForFile> threadsForFileListDesc;

    public BlockfileMerger(BlockfileMergerParams params, BlockfileMergePlan plan, Supplier<Boolean> shouldStop) {
        this.params = params;
        this.plan = plan;
        this.shouldStop = shouldStop;
        this.filename = params.storageParams().filenameSupplier().get();
        this.tracker = new BlockfileMergerTracker(plan, this.filename);
        this.numVcpus = Runtime.getRuntime().availableProcessors();
        this.threadsForFileListDesc = new BlockfileMergerThreadsCalculator(plan, params.readParams()).calc();
    }

    public BlockfileNameAndSize merge() {
        this.tracker.startTime = Instant.now();
        List blockBatchScanners = Scanner.of(this.plan.files()).parallelUnordered(new Threads(this.params.readParams().prefetchExec(), this.plan.files().size())).map(this::makeReader).list();
        List kvScanners = Scanner.of((Iterable)blockBatchScanners).map(inputScanner -> inputScanner.timeNanos(this.tracker.waitForBlocksNs::addAndGet).concatIter(BlockfileDecodedBlockBatch::blocks).map(BlockfileDecodedBlock::items).concat(Scanner::of)).list();
        this.tracker.logInitializationStats();
        this.tracker.resetCountersSinceLastLog();
        this.tracker.mergeStartTime = Instant.now();
        this.tracker.waitForReadersNs.addAndGet(Duration.between(this.tracker.startTime, this.tracker.mergeStartTime).toNanos());
        BlockfileWriter.BlockfileWriteResult writeResult = (BlockfileWriter.BlockfileWriteResult)this.plan.collatorStrategy().method.apply(kvScanners).batchByMinSize(this.params.writeParams().minBlockSize().toBytes(), BlockfileRow::length).timeNanos(this.tracker.waitForCollatorNs::addAndGet).each(batchForBlock -> {
            this.tracker.blocksWritten.incrementAndGet();
            this.tracker.blocksWrittenSinceLastLog.incrementAndGet();
            this.tracker.recordsWritten.addAndGet(batchForBlock.size());
            this.tracker.recordsWrittenSinceLastLog.addAndGet(batchForBlock.size());
        }).periodic(this.params.heartbeatPeriod(), $ -> {
            this.tracker.logIntermediateProgress();
            this.tracker.resetCountersSinceLastLog();
            this.throwIfShouldStop();
        }).apply(this.makeWriter(this.filename)::writeBlocks);
        BlockfileNameAndSize newFile = new BlockfileNameAndSize(this.filename, writeResult.fileLength().toBytes());
        this.tracker.logProgress(true, newFile);
        return newFile;
    }

    private Scanner<BlockfileDecodedBlockBatch<BlockfileRow>> makeReader(BlockfileNameAndSize file) {
        int numReadThreads = (Integer)Scanner.of(this.threadsForFileListDesc).include(forFile -> forFile.file().equals(file)).map(BlockfileMergerThreadsCalculator.ThreadsForFile::threads).findFirst().orElseThrow();
        BlockfileReaderBuilder<BlockfileRow> readerBuilder = this.params.storageParams().blockfileGroup().newReaderBuilderKnownFileLength(file.name(), file.size(), Function.identity()).setReadChunkSize(this.params.readParams().readChunkSize()).setDecodeBatchSize(this.params.readParams().decodeBatchSize()).setDecodeThreads(new Threads(this.params.readParams().readExec(), this.numVcpus));
        if (numReadThreads > 0) {
            readerBuilder.setReadThreads(Threads.useExecForSingleThread((ExecutorService)this.params.readParams().readExec(), (int)numReadThreads));
        }
        BlockfileReader<BlockfileRow> reader = readerBuilder.build();
        return reader.sequential().scanDecodedBlockBatches().each(blockBatch -> {
            this.tracker.blocksRead.addAndGet(blockBatch.blocks().size());
            this.tracker.blocksReadSinceLastLog.addAndGet(blockBatch.blocks().size());
            this.tracker.compressedBytesRead.addAndGet(blockBatch.totalCompressedSize());
            this.tracker.compressedBytesReadSinceLastLog.addAndGet(blockBatch.totalCompressedSize());
            this.tracker.decompressedBytesRead.addAndGet(blockBatch.totalDecompressedSize());
            this.tracker.decompressedBytesReadSinceLastLog.addAndGet(blockBatch.totalDecompressedSize());
            blockBatch.blocks().forEach(block -> {
                this.tracker.recordsRead.addAndGet(block.items().size());
                this.tracker.recordsReadSinceLastLog.addAndGet(block.items().size());
            });
        }).peekFirst($ -> {});
    }

    private BlockfileWriter<BlockfileRow> makeWriter(String filename) {
        boolean multipartWrite = this.plan.totalInputSize().toBytes() > this.params.writeParams().multipartUploadThreshold().toBytes();
        Threads writeThreads = new Threads(this.params.writeParams().writeExec(), this.params.writeParams().writeThreads());
        return this.params.storageParams().blockfileGroup().newWriterBuilder(filename).setValueBlockFormat(this.params.writeParams().valueBlockFormat()).setIndexBlockFormat(this.params.writeParams().indexBlockFormat()).setCompressor(this.params.writeParams().compressor()).setEncodeBatchSize(this.params.writeParams().encodeBatchSize()).setEncodeThreads(new Threads(this.params.writeParams().encodeExec(), this.numVcpus)).setMultipartWrite(multipartWrite).setWriteThreads(writeThreads).build();
    }

    private void throwIfShouldStop() {
        if (this.shouldStop.get().booleanValue()) {
            throw new RuntimeException("stop requested");
        }
    }
}

