/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.blockfile.write;

import io.datarouter.bytes.BinaryDictionary;
import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.ByteTool;
import io.datarouter.bytes.Codec;
import io.datarouter.bytes.EmptyArray;
import io.datarouter.bytes.blockfile.checksum.BlockfileChecksummer;
import io.datarouter.bytes.blockfile.compress.BlockfileCompressor;
import io.datarouter.bytes.blockfile.dto.BlockfileTokens;
import io.datarouter.bytes.blockfile.enums.BlockfileSection;
import io.datarouter.bytes.blockfile.section.BlockfileFooter;
import io.datarouter.bytes.blockfile.section.BlockfileHeader;
import io.datarouter.bytes.blockfile.section.BlockfileTrailer;
import io.datarouter.bytes.blockfile.storage.BlockfileStorage;
import io.datarouter.bytes.blockfile.write.BlockfileListener;
import io.datarouter.bytes.codec.intcodec.RawIntCodec;
import io.datarouter.bytes.io.MultiByteArrayInputStream;
import io.datarouter.scanner.ObjectScanner;
import io.datarouter.scanner.PagedList;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

public class BlockfileWriter<T> {
    public static final int NUM_VALUE_LENGTH_BYTES = RawIntCodec.INSTANCE.length();
    public static final int NUM_SECTION_BYTES = 1;
    public static final int NUM_HEADER_METADATA_BYTES = RawIntCodec.INSTANCE.length() + 1;
    public static final int NUM_FOOTER_METADATA_BYTES = RawIntCodec.INSTANCE.length() + 1;
    public static final int NUM_TRAILER_BYTES = RawIntCodec.INSTANCE.length() + RawIntCodec.INSTANCE.length();
    private final BlockfileWriterConfig<T> config;
    private final String name;
    private final AtomicLong dataBlockCounter = new AtomicLong();
    private final AtomicInteger footerBlockLength = new AtomicInteger();
    private final AtomicLong fileLengthBytesCounter = new AtomicLong();

    public BlockfileWriter(BlockfileWriterConfig<T> config, String name) {
        this.config = config;
        this.name = name;
    }

    public BlockfileWriteResult write(Scanner<T> items) {
        BlockfileTokens headerTokens = this.makeHeaderTokens();
        Scanner tokenGroups = ObjectScanner.of((Object)headerTokens).append(this.makeBlockScanner(items)).append(this.makeFooterScanner()).append(this.makeTrailerScanner(headerTokens)).each(token -> {
            long l = this.fileLengthBytesCounter.addAndGet(token.totalLength());
        });
        for (BlockfileListener listener : this.config.listeners()) {
            tokenGroups = tokenGroups.each(listener::accept);
        }
        Scanner tokens = tokenGroups.concatIter(BlockfileTokens::toList);
        if (this.config.multipartWrite()) {
            this.config.storage().write(this.name, (InputStream)tokens.apply(MultiByteArrayInputStream::new), this.config.writeThreads());
        } else {
            List allTokens = (List)tokens.collect(PagedList::new);
            this.config.storage().write(this.name, ByteTool.concat(allTokens));
        }
        this.config.listeners().forEach(BlockfileListener::complete);
        return new BlockfileWriteResult(this.dataBlockCounter.get(), ByteLength.ofBytes(this.fileLengthBytesCounter.get()));
    }

    private BlockfileTokens makeHeaderTokens() {
        BlockfileHeader header = new BlockfileHeader(this.config.userDictionary(), this.config.compressor(), this.config.checksummer().numBytes(), this.config.checksummer());
        byte[] headerValueBytes = this.config.headerCodec().encode(header);
        int headerBlockLength = NUM_HEADER_METADATA_BYTES + headerValueBytes.length;
        return new BlockfileTokens(BlockfileSection.HEADER, RawIntCodec.INSTANCE.encode(headerBlockLength), EmptyArray.BYTE, headerValueBytes);
    }

    private Scanner<BlockfileTokens> makeBlockScanner(Scanner<T> items) {
        return items.batch(this.config.encodeBatchSize()).parallelOrdered(this.config.encodeThreads()).map(this::encodeBlocks).each(blockBatch -> {
            long l = this.dataBlockCounter.addAndGet(blockBatch.size());
        }).concat(Scanner::of);
    }

    private Scanner<BlockfileTokens> makeFooterScanner() {
        return Scanner.of(this.config.footerUserDictionarySupplier()).map(Supplier::get).map(footerUserDictionary -> {
            BlockfileFooter footer = new BlockfileFooter((BinaryDictionary)footerUserDictionary, this.dataBlockCounter.get());
            byte[] footerValueBytes = BlockfileFooter.VALUE_CODEC.encode(footer);
            BlockfileTokens footerTokens = BlockfileWriter.encodeFooter(footerValueBytes);
            this.footerBlockLength.set(footerTokens.totalLength());
            return footerTokens;
        });
    }

    private Scanner<BlockfileTokens> makeTrailerScanner(BlockfileTokens headerTokens) {
        Supplier<BlockfileTrailer> trailerSupplier = () -> new BlockfileTrailer(headerTokens.totalLength(), this.footerBlockLength.get());
        return Scanner.of(trailerSupplier).map(Supplier::get).map(BlockfileTrailer::encode);
    }

    public int numBlockMetadataBytes() {
        return NUM_VALUE_LENGTH_BYTES + this.config.checksummer().numBytes() + 1;
    }

    public List<BlockfileTokens> encodeBlocks(List<T> blocks) {
        Codec<byte[], byte[]> compressorCodec = this.config.compressor().codecSupplier().get();
        return (List)Scanner.of(blocks).map(block -> this.encodeBlock(compressorCodec, block)).collect(() -> new ArrayList(blocks.size()));
    }

    public BlockfileTokens encodeBlock(Codec<byte[], byte[]> compressorCodec, T item) {
        byte[] encodedBytes = this.config.encoder().apply(item);
        byte[] compressedBytes = compressorCodec.encode(encodedBytes);
        int blockLength = this.numBlockMetadataBytes() + compressedBytes.length;
        byte[] blockLengthBytes = RawIntCodec.INSTANCE.encode(blockLength);
        byte[] checksumBytes = this.config.checksummer().encoder().apply(compressedBytes);
        return new BlockfileTokens(BlockfileSection.BLOCK, blockLengthBytes, checksumBytes, compressedBytes);
    }

    public static BlockfileTokens encodeFooter(byte[] footerValueBytes) {
        int footerBlockLength = NUM_FOOTER_METADATA_BYTES + footerValueBytes.length;
        return new BlockfileTokens(BlockfileSection.FOOTER, RawIntCodec.INSTANCE.encode(footerBlockLength), EmptyArray.BYTE, footerValueBytes);
    }

    public BlockfileWriterConfig<T> config() {
        return this.config;
    }

    public record BlockfileWriteResult(long numDataBlocks, ByteLength fileLength) {
    }

    public record BlockfileWriterConfig<T>(BlockfileStorage storage, Function<T, byte[]> encoder, BlockfileHeader.BlockfileHeaderCodec headerCodec, BlockfileCompressor compressor, BlockfileChecksummer checksummer, BinaryDictionary userDictionary, Supplier<BinaryDictionary> footerUserDictionarySupplier, List<BlockfileListener> listeners, int encodeBatchSize, Threads encodeThreads, boolean multipartWrite, Threads writeThreads) {
    }
}

