/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.blockfile.read;

import io.datarouter.bytes.ByteLength;
import io.datarouter.bytes.Codec;
import io.datarouter.bytes.blockfile.enums.BlockfileSection;
import io.datarouter.bytes.blockfile.read.BlockfileMetadataReader;
import io.datarouter.bytes.blockfile.section.BlockfileFooter;
import io.datarouter.bytes.blockfile.section.BlockfileHeader;
import io.datarouter.bytes.blockfile.section.BlockfileTrailer;
import io.datarouter.bytes.blockfile.storage.BlockfileStorage;
import io.datarouter.bytes.blockfile.write.BlockfileWriter;
import io.datarouter.bytes.codec.bytestringcodec.HexByteStringCodec;
import io.datarouter.bytes.codec.intcodec.RawIntCodec;
import io.datarouter.bytes.io.InputStreamTool;
import io.datarouter.scanner.Scanner;
import io.datarouter.scanner.Threads;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class BlockfileReader<T> {
    private final BlockfileMetadataReader<T> metadataReader;
    private final BlockfileReaderConfig<T> config;

    public BlockfileReader(BlockfileMetadataReader<T> metadataReader, BlockfileReaderConfig<T> config) {
        this.metadataReader = metadataReader;
        this.config = config;
    }

    public BlockfileMetadataReader<T> metadataReader() {
        return this.metadataReader;
    }

    public int headerBlockLength() {
        return this.metadataReader.headerBlockLength();
    }

    public BlockfileHeader header() {
        return this.metadataReader.header();
    }

    public BlockfileFooter footer() {
        return this.metadataReader.footer();
    }

    public BlockfileTrailer trailer() {
        return this.metadataReader.trailer();
    }

    private InputStream makeInputStream() {
        if (this.config.knownFileLength().isPresent() && this.config.knownFileLength().orElseThrow() <= this.config.readChunkSize().toBytes()) {
            return new ByteArrayInputStream(this.config.storage().read(this.metadataReader.name()));
        }
        return this.config.storage().readInputStream(this.metadataReader.name(), this.config.readThreads(), this.config.readChunkSize());
    }

    private void readAndCacheHeader(InputStream inputStream) {
        byte[] blockLengthBytes = InputStreamTool.readNBytes(inputStream, BlockfileWriter.NUM_VALUE_LENGTH_BYTES);
        int blockLength = RawIntCodec.INSTANCE.decode(blockLengthBytes);
        int valueLength = blockLength - BlockfileWriter.NUM_HEADER_METADATA_BYTES;
        InputStreamTool.readRequiredByte(inputStream);
        byte[] valueBytes = InputStreamTool.readNBytes(inputStream, valueLength);
        BlockfileMetadataReader.DecodedHeader decodedHeader = new BlockfileMetadataReader.DecodedHeader(this.config.headerCodec().decode(valueBytes), blockLength);
        this.metadataReader.setHeader(decodedHeader);
    }

    private Scanner<ParsedBlock> scanParsedBlocks() {
        InputStream inputStream = this.makeInputStream();
        this.readAndCacheHeader(inputStream);
        AtomicLong cursor = new AtomicLong(this.metadataReader.headerBlockLength());
        int checksumLength = this.metadataReader.header().checksumLength();
        int blockMetadataLength = this.metadataReader.numBlockMetadataBytes();
        return Scanner.generate(() -> {
            byte[] compressedValueLengthBytes = InputStreamTool.readNBytes(inputStream, BlockfileWriter.NUM_VALUE_LENGTH_BYTES);
            cursor.addAndGet(BlockfileWriter.NUM_VALUE_LENGTH_BYTES);
            int blockLength = RawIntCodec.INSTANCE.decode(compressedValueLengthBytes);
            byte[] checksumValue = InputStreamTool.readNBytes(inputStream, checksumLength);
            cursor.addAndGet(checksumLength);
            byte sectionCode = InputStreamTool.readRequiredByte(inputStream);
            if (sectionCode != BlockfileSection.BLOCK.codeByte) {
                return Optional.empty();
            }
            cursor.incrementAndGet();
            int compressedValueLength = blockLength - blockMetadataLength;
            byte[] compressedValue = InputStreamTool.readNBytes(inputStream, compressedValueLength);
            cursor.addAndGet(compressedValueLength);
            ParsedBlock parsedBlock = new ParsedBlock(compressedValueLengthBytes, checksumValue, compressedValue);
            return Optional.of(parsedBlock);
        }).advanceWhile(Optional::isPresent).map(Optional::get);
    }

    public Scanner<byte[]> scanDecompressedValues() {
        return this.scanParsedBlocks().batch(this.config.decodeBatchSize()).parallelOrdered(this.config.decodeThreads()).map(this::decompressBlocks).concat(Scanner::of);
    }

    public Scanner<BlockfileDecodedBlockBatch<T>> scanDecodedBlockBatches() {
        return this.scanParsedBlocks().batch(this.config.decodeBatchSize()).parallelOrdered(this.config.decodeThreads()).map(this::decompressAndDecodeBlocks);
    }

    public Scanner<BlockfileDecodedBlock<T>> scanDecodedBlocks() {
        return this.scanDecodedBlockBatches().concatIter(BlockfileDecodedBlockBatch::blocks);
    }

    public Scanner<T> scanDecodedValues() {
        return this.scanDecodedBlocks().map(BlockfileDecodedBlock::value);
    }

    private List<byte[]> decompressBlocks(List<ParsedBlock> parsedBlocks) {
        Codec<byte[], byte[]> compressorCodec = this.metadataReader.header().compressor().codecSupplier().get();
        ArrayList<byte[]> decompressedByteArrays = new ArrayList<byte[]>(parsedBlocks.size());
        for (ParsedBlock parsedBlock : parsedBlocks) {
            if (this.config.validateChecksums()) {
                this.validateChecksum(parsedBlock);
            }
            byte[] decompressedBytes = compressorCodec.decode(parsedBlock.compressedValue);
            decompressedByteArrays.add(decompressedBytes);
        }
        return decompressedByteArrays;
    }

    private BlockfileDecodedBlockBatch<T> decompressAndDecodeBlocks(List<ParsedBlock> parsedBlocks) {
        int numMetadataBytes = this.metadataReader.numBlockMetadataBytes();
        Codec<byte[], byte[]> compressorCodec = this.metadataReader.header().compressor().codecSupplier().get();
        ArrayList decodedBlocks = new ArrayList(parsedBlocks.size());
        long totalCompressedSize = 0L;
        long totalDecompressedSize = 0L;
        for (ParsedBlock parsedBlock : parsedBlocks) {
            if (this.config.validateChecksums()) {
                this.validateChecksum(parsedBlock);
            }
            byte[] decompressedBytes = compressorCodec.decode(parsedBlock.compressedValue);
            T value = this.config.decoder().apply(decompressedBytes);
            BlockfileDecodedBlock<T> decodedBlock = new BlockfileDecodedBlock<T>(numMetadataBytes + parsedBlock.compressedValue.length, numMetadataBytes + decompressedBytes.length, value);
            decodedBlocks.add(decodedBlock);
            totalCompressedSize += (long)parsedBlock.compressedValue.length;
            totalDecompressedSize += (long)decompressedBytes.length;
        }
        return new BlockfileDecodedBlockBatch(totalCompressedSize, totalDecompressedSize, decodedBlocks);
    }

    private void validateChecksum(ParsedBlock parsedBlock) {
        byte[] actual;
        byte[] expected = parsedBlock.checksum();
        if (!Arrays.equals(expected, actual = this.metadataReader.header().checksummer().encoder().apply(parsedBlock.compressedValue))) {
            String message = String.format("invalid checksum: expected=%s, actual=%s", HexByteStringCodec.INSTANCE.encode(expected), HexByteStringCodec.INSTANCE.encode(actual));
            throw new RuntimeException(message);
        }
    }

    public BlockfileReaderConfig<T> config() {
        return this.config;
    }

    public record BlockfileDecodedBlock<T>(int compressedSize, int decompressedSize, T value) {
    }

    public record BlockfileDecodedBlockBatch<T>(long totalCompressedSize, long totalDecompressedSize, List<BlockfileDecodedBlock<T>> blocks) {
        public List<T> values() {
            return (List)Scanner.of(this.blocks).map(BlockfileDecodedBlock::value).collect(() -> new ArrayList(this.blocks.size()));
        }
    }

    public record BlockfileReaderConfig<T>(BlockfileStorage storage, Function<byte[], T> decoder, BlockfileHeader.BlockfileHeaderCodec headerCodec, Threads readThreads, ByteLength readChunkSize, int decodeBatchSize, Threads decodeThreads, boolean validateChecksums, Optional<Long> knownFileLength) {
    }

    record ParsedBlock(byte[] length, byte[] checksum, byte[] compressedValue) {
    }
}

