/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.bytes.blockfile.io.read.query;

import io.datarouter.bytes.blockfile.block.BlockfileBlockType;
import io.datarouter.bytes.blockfile.block.parsed.BlockfileDecodedBlock;
import io.datarouter.bytes.blockfile.block.parsed.BlockfileDecodedBlockBatch;
import io.datarouter.bytes.blockfile.block.parsed.ParsedValueBlock;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileBaseTokens;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileIndexTokens;
import io.datarouter.bytes.blockfile.block.tokens.BlockfileValueTokens;
import io.datarouter.bytes.blockfile.index.BlockfileIndexEntryRange;
import io.datarouter.bytes.blockfile.io.read.BlockfileReader;
import io.datarouter.bytes.blockfile.io.read.query.BlockfileRowKeyRangeReader;
import io.datarouter.bytes.io.InputStreamTool;
import io.datarouter.scanner.Scanner;
import java.io.InputStream;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockfileSequentialReader<T> {
    private static final Logger logger = LoggerFactory.getLogger(BlockfileSequentialReader.class);
    private final BlockfileReader<T> reader;

    public BlockfileSequentialReader(BlockfileReader<T> reader) {
        this.reader = reader;
    }

    public Scanner<byte[]> scanDecompressedValues() {
        return this.scanParsedValueBlocks().batch(this.reader.config().decodeBatchSize()).parallelOrdered(this.reader.config().decodeThreads()).map(this.reader.valueBlockDecoder()::decompressValueBlocks).concat(Scanner::of);
    }

    public Scanner<T> scan() {
        return this.scanDecodedValues().concat(Scanner::of);
    }

    public Scanner<List<T>> scanDecodedValues() {
        return this.scanDecodedBlocks().map(BlockfileDecodedBlock::items);
    }

    public Scanner<BlockfileDecodedBlock<T>> scanDecodedBlocks() {
        return this.scanDecodedBlockBatches().concatIter(BlockfileDecodedBlockBatch::blocks);
    }

    public Scanner<BlockfileDecodedBlockBatch<T>> scanDecodedBlockBatches() {
        return this.scanParsedValueBlocks().batch(this.reader.config().decodeBatchSize()).parallelOrdered(this.reader.config().decodeThreads()).map(this.reader.valueBlockDecoder()::decompressAndDecodeValueBlocks);
    }

    private Scanner<ParsedValueBlock> scanParsedValueBlocks() {
        InputStream inputStream = this.reader.makeInputStream();
        this.reader.metadata().readAndCacheHeader(inputStream);
        return this.scanParsedBlocks(inputStream).advanceUntil(parsedBlock -> parsedBlock.blockType() == BlockfileBlockType.FOOTER).include(parsedBlock -> parsedBlock.blockType() == BlockfileBlockType.VALUE).map(ParsedBlock::parsedValueBlock);
    }

    public Scanner<ParsedValueBlock> scanParsedValueBlocks(BlockfileIndexEntryRange indexEntryRange, BlockfileRowKeyRangeReader.BlockfileKeyRange keyRange) {
        long bytesFrom = indexEntryRange.first().byteRange().from();
        long bytesTo = indexEntryRange.last().byteRange().to();
        logger.debug("scanning globalBlockIds(from={},to={}), bytes(from={},to={})", new Object[]{indexEntryRange.first().childGlobalBlockId(), indexEntryRange.last().childGlobalBlockId(), bytesFrom, bytesTo});
        InputStream inputStream = this.reader.makeInputStream(bytesFrom, bytesTo);
        return this.scanParsedBlocks(inputStream).each(block -> logger.warn("block type={}", (Object)block.blockType())).limit(indexEntryRange.numBlocks()).include(parsedBlock -> parsedBlock.blockType() == BlockfileBlockType.VALUE).map(ParsedBlock::parsedValueBlock);
    }

    private Scanner<ParsedBlock> scanParsedBlocks(InputStream inputStream) {
        int checksumLength = this.reader.metadata().header().checksummer().numBytes();
        int valueBlockMetadataLength = BlockfileValueTokens.lengthWithoutValue(this.reader.metadata().header().checksummer().numBytes());
        int indexBlockMetadataLength = BlockfileIndexTokens.lengthWithoutValue();
        return Scanner.generate(() -> {
            byte[] compressedValueLengthBytes = InputStreamTool.readNBytes(inputStream, BlockfileBaseTokens.NUM_LENGTH_BYTES);
            int blockLength = BlockfileBaseTokens.decodeLength(compressedValueLengthBytes);
            byte blockTypeCode = InputStreamTool.readRequiredByte(inputStream);
            BlockfileBlockType blockType = BlockfileBlockType.decode(blockTypeCode);
            if (blockType == BlockfileBlockType.INDEX) {
                InputStreamTool.readNBytes(inputStream, blockLength - indexBlockMetadataLength);
                return new ParsedBlock(BlockfileBlockType.INDEX, null);
            }
            if (blockType == BlockfileBlockType.FOOTER) {
                return new ParsedBlock(BlockfileBlockType.FOOTER, null);
            }
            byte[] checksumValue = InputStreamTool.readNBytes(inputStream, checksumLength);
            int compressedValueLength = blockLength - valueBlockMetadataLength;
            byte[] compressedValue = InputStreamTool.readNBytes(inputStream, compressedValueLength);
            ParsedValueBlock parsedValueBlock = new ParsedValueBlock(compressedValueLengthBytes, checksumValue, compressedValue);
            return new ParsedBlock(BlockfileBlockType.VALUE, parsedValueBlock);
        });
    }

    private record ParsedBlock(BlockfileBlockType blockType, ParsedValueBlock parsedValueBlock) {
    }
}

