/*
 * Decompiled with CFR 0.152.
 */
package io.xpipe.core.impl;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.impl.StreamReadConnection;
import io.xpipe.core.impl.XpbtSource;
import io.xpipe.core.source.TableReadConnection;
import io.xpipe.core.store.StreamDataStore;
import io.xpipe.core.util.JacksonMapper;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class XpbtReadConnection
extends StreamReadConnection
implements TableReadConnection {
    private final StreamDataStore store;
    private TupleType dataType;
    private TypedDataStreamParser parser;
    private boolean empty;

    protected XpbtReadConnection(XpbtSource source) {
        super((StreamDataStore)source.getStore(), null);
        this.store = (StreamDataStore)source.getStore();
    }

    @Override
    public void init() throws Exception {
        TupleType dataType;
        super.init();
        this.inputStream.mark(8192);
        String header = new BufferedReader(new InputStreamReader(this.inputStream)).readLine();
        this.inputStream.reset();
        if (header == null || header.trim().length() == 0) {
            this.dataType = TupleType.empty();
            this.empty = true;
            return;
        }
        int headerLength = header.getBytes(StandardCharsets.UTF_8).length;
        this.inputStream.skip(headerLength + 1);
        List names = (List)JacksonMapper.newMapper().disable(new JsonParser.Feature[]{JsonParser.Feature.AUTO_CLOSE_SOURCE}).readerFor((TypeReference)new TypeReference<List<String>>(this){}).readValue(header);
        this.dataType = dataType = TupleType.tableType(names);
        this.parser = new TypedDataStreamParser(dataType);
    }

    @Override
    public TupleType getDataType() {
        return this.dataType;
    }

    @Override
    public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
        if (this.empty) {
            return;
        }
        TypedDataStructureNodeReader reader = TypedDataStructureNodeReader.of(this.dataType);
        AtomicBoolean quit = new AtomicBoolean(false);
        AtomicReference<Exception> exception = new AtomicReference<Exception>();
        while (!quit.get()) {
            DataStructureNode node = this.parser.parseStructure(this.inputStream, reader);
            if (node == null) {
                quit.set(true);
                break;
            }
            try {
                if (lineAcceptor.accept(node.asTuple())) continue;
                quit.set(true);
            }
            catch (Exception ex) {
                quit.set(true);
                exception.set(ex);
            }
        }
        if (exception.get() != null) {
            throw (Exception)exception.get();
        }
    }

    @Override
    public boolean canRead() throws Exception {
        return this.store.canOpen();
    }
}

