/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.blockfile.io.write;

import io.datarouter.bytes.BinaryDictionary;
import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.RecordByteArrayField;
import io.datarouter.bytes.blockfile.block.decoded.BlockfileFooterBlock;
import io.datarouter.bytes.blockfile.block.decoded.BlockfileHeaderBlock;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileBaseTokens;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileFooterTokens;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileHeaderTokens;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileValueTokens;
import io.datarouter.bytes.blockfile.encoding.checksum.BlockfileChecksummer;
import io.datarouter.bytes.blockfile.encoding.compression.BlockfileCompressor;
import io.datarouter.bytes.blockfile.encoding.indexblock.BlockfileIndexBlockCodec;
import io.datarouter.bytes.blockfile.encoding.indexblock.BlockfileIndexBlockFormat;
import io.datarouter.bytes.blockfile.encoding.valueblock.BlockfileValueBlockCodec;
import io.datarouter.bytes.blockfile.encoding.valueblock.BlockfileValueBlockEncoder;
import io.datarouter.bytes.blockfile.encoding.valueblock.BlockfileValueBlockFormat;
import io.datarouter.bytes.blockfile.index.BlockfileByteRange;
import io.datarouter.bytes.blockfile.index.BlockfileRowIdRange;
import io.datarouter.bytes.blockfile.index.BlockfileRowRange;
import io.datarouter.bytes.blockfile.io.storage.BlockfileStorage;
import io.datarouter.bytes.blockfile.io.write.BlockfileIndexer;
import io.datarouter.bytes.blockfile.io.write.BlockfileWriterState;
import io.datarouter.bytes.blockfile.io.write.listener.BlockfileListener;
import io.datarouter.bytes.blockfile.row.BlockfileRow;
import io.datarouter.bytes.io.MultiByteArrayInputStream;
import io.datarouter.scanner.ObjectScanner;
import io.datarouter.scanner.PagedList;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

public class BlockfileWriter<T> {
    private final BlockfileWriterConfig config;
    private final String name;
    private final BlockfileIndexBlockCodec indexBlockCodec;
    private final BlockfileWriterState state;
    private final BlockfileIndexer indexer;
    private final BlockfileValueBlockEncoder valueBlockEncoder;
    private final BlockfileHeaderBlock header;

    public BlockfileWriter(BlockfileWriterConfig config, String name) {
        this.config = config;
        this.name = name;
        this.indexBlockCodec = config.indexBlockFormat().supplier().get();
        this.state = new BlockfileWriterState();
        this.indexer = new BlockfileIndexer(this.state, config.indexFanOut(), config.optTargetIndexBlockSize(), this.indexBlockCodec);
        BlockfileValueBlockEncoder.BlockfileValueBlockEncoderConfig encodingManagerConfig = new BlockfileValueBlockEncoder.BlockfileValueBlockEncoderConfig(config.valueBlockFormat(), config.compressor(), config.checksummer());
        this.valueBlockEncoder = new BlockfileValueBlockEncoder(encodingManagerConfig);
        this.header = new BlockfileHeaderBlock(config.userDictionary(), config.valueBlockFormat(), config.indexBlockFormat(), config.compressor(), config.checksummer());
    }

    public BlockfileWriterConfig config() {
        return this.config;
    }

    public BlockfileWriterState state() {
        return this.state;
    }

    public BlockfileWriteResult writeItems(Function<T, BlockfileRow> rowEncoder, ByteLength targetValueBlockSize, Scanner<T> rows) {
        return (BlockfileWriteResult)rows.map(rowEncoder).batchByMinSize(targetValueBlockSize.toBytes(), BlockfileRow::length).apply(this::writeBlocks);
    }

    public BlockfileWriteResult writeRows(ByteLength targetValueBlockSize, Scanner<BlockfileRow> rows) {
        return (BlockfileWriteResult)rows.batchByMinSize(targetValueBlockSize.toBytes(), BlockfileRow::length).apply(this::writeBlocks);
    }

    public BlockfileWriteResult writeBlocks(Scanner<List<BlockfileRow>> items) {
        BlockfileHeaderTokens headerTokens = this.makeHeaderTokens();
        this.state.appendHeaderBlock(headerTokens);
        Scanner tokenGroups = ObjectScanner.of((Object)headerTokens).append(this.makeValueAndIndexScanner(items)).append(this.makeFinalIndexScanner()).append(this.makeFooterScanner());
        for (BlockfileListener listener : this.config.listeners()) {
            tokenGroups = tokenGroups.each(listener::accept);
        }
        tokenGroups.concatIter(BlockfileBaseTokens::toList).then(this::persist);
        this.config.listeners().forEach(BlockfileListener::complete);
        return new BlockfileWriteResult(this.state.numValueBlocks(), ByteLength.ofBytes(this.state.cursor()));
    }

    private void persist(Scanner<byte[]> tokens) {
        if (this.config.multipartWrite()) {
            InputStream inputStream = (InputStream)tokens.apply(MultiByteArrayInputStream::new);
            this.config.storage().write(this.name, inputStream, this.config.writeThreads());
        } else {
            List allTokens = (List)tokens.collect(PagedList::new);
            byte[] bytes = ByteTool.concat(allTokens);
            this.config.storage().write(this.name, bytes);
        }
    }

    private BlockfileHeaderTokens makeHeaderTokens() {
        byte[] headerValueBytes = this.config.headerCodec().encode(this.header);
        int headerBlockLength = BlockfileBaseTokens.NUM_PREFIX_BYTES + headerValueBytes.length;
        return new BlockfileHeaderTokens(headerBlockLength, headerValueBytes);
    }

    private Scanner<BlockfileBaseTokens> makeValueAndIndexScanner(Scanner<List<BlockfileRow>> blocksOfRows) {
        return blocksOfRows.map(blockOfRows -> new BlockfileValueBlockCodec.BlockfileValueBlockRows(this.state.takeValueBlockId(), this.state.getNumItemsAndAdd(blockOfRows.size()), (List<BlockfileRow>)blockOfRows)).batch(this.config.encodeBatchSize()).parallelOrdered(this.config.encodeThreads()).map(this.valueBlockEncoder::encodeValueBlocks).concat(Scanner::of).each(valueBlock -> {
            BlockfileRowIdRange rowIdRange = valueBlock.toRowIdRange();
            BlockfileRowRange rowRange = valueBlock.toRowKeyRange();
            BlockfileByteRange byteRange = this.state.appendValueBlock((BlockfileValueTokens)valueBlock);
            this.indexer.onValueBlockWrite(this.state.previousGlobalBlockId(), valueBlock.valueBlockId(), rowIdRange, rowRange, byteRange);
        }).map(BlockfileBaseTokens.class::cast).concat(valueBlock -> ObjectScanner.of((Object)valueBlock).append(this.indexer.drainCompletedBlocks().map(BlockfileBaseTokens.class::cast)));
    }

    private Scanner<BlockfileBaseTokens> makeFinalIndexScanner() {
        return this.indexer.drainAllBlocks().map(BlockfileBaseTokens.class::cast);
    }

    private Scanner<BlockfileBaseTokens> makeFooterScanner() {
        return Scanner.of(this.config.footerUserDictionarySupplier()).map(Supplier::get).map(footerUserDictionary -> {
            BlockfileFooterBlock footer = new BlockfileFooterBlock(new RecordByteArrayField(this.config.headerCodec().encode(this.header)), (BinaryDictionary)footerUserDictionary, this.state.headerBlockLocation(), this.state.latestIndexBlockLocation(), this.state.numValueBlocks(), this.state.numIndexBlocks());
            byte[] footerValueBytes = BlockfileFooterBlock.VALUE_CODEC.encode(footer);
            return BlockfileWriter.encodeFooter(footerValueBytes);
        }).each(this.state::appendFooterBlock);
    }

    public static BlockfileBaseTokens encodeFooter(byte[] footerValueBytes) {
        int footerBlockLength = BlockfileBaseTokens.NUM_PREFIX_BYTES + footerValueBytes.length + BlockfileBaseTokens.NUM_LENGTH_BYTES;
        return new BlockfileFooterTokens(footerBlockLength, footerValueBytes);
    }

    public record BlockfileWriteResult(long numValueBlocks, ByteLength fileLength) {
    }

    public record BlockfileWriterConfig(BlockfileStorage storage, BlockfileHeaderBlock.BlockfileHeaderCodec headerCodec, BlockfileValueBlockFormat valueBlockFormat, BlockfileIndexBlockFormat indexBlockFormat, BlockfileCompressor compressor, BlockfileChecksummer checksummer, BinaryDictionary userDictionary, Supplier<BinaryDictionary> footerUserDictionarySupplier, List<BlockfileListener> listeners, int encodeBatchSize, Threads encodeThreads, boolean multipartWrite, Threads writeThreads, int indexFanOut, Optional<ByteLength> optTargetIndexBlockSize) {
    }
}

