package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.plugin.bigquery.ViewMaterializationCache;
import io.trino.spi.NodeManager;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.predicate.TupleDomain;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

/* loaded from: input_file:io/trino/plugin/bigquery/BigQuerySplitSource.class */
public class BigQuerySplitSource implements ConnectorSplitSource {
    private static final Logger log = Logger.get(BigQuerySplitSource.class);
    private final ConnectorSession session;
    private final BigQueryTableHandle table;
    private final BigQueryClientFactory bigQueryClientFactory;
    private final BigQueryReadClientFactory bigQueryReadClientFactory;
    private final boolean viewEnabled;
    private final boolean arrowSerializationEnabled;
    private final Duration viewExpiration;
    private final NodeManager nodeManager;
    private final int maxReadRowsRetries;

    @Nullable
    private List<BigQuerySplit> splits;
    private int offset;

    public BigQuerySplitSource(ConnectorSession connectorSession, BigQueryTableHandle bigQueryTableHandle, BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, boolean z, boolean z2, Duration duration, NodeManager nodeManager, int i) {
        this.session = (ConnectorSession) Objects.requireNonNull(connectorSession, "session is null");
        this.table = (BigQueryTableHandle) Objects.requireNonNull(bigQueryTableHandle, "table is null");
        this.bigQueryClientFactory = (BigQueryClientFactory) Objects.requireNonNull(bigQueryClientFactory, "bigQueryClientFactory cannot be null");
        this.bigQueryReadClientFactory = (BigQueryReadClientFactory) Objects.requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory cannot be null");
        this.viewEnabled = z;
        this.arrowSerializationEnabled = z2;
        this.viewExpiration = (Duration) Objects.requireNonNull(duration, "viewExpiration is null");
        this.nodeManager = (NodeManager) Objects.requireNonNull(nodeManager, "nodeManager cannot be null");
        this.maxReadRowsRetries = i;
    }

    public CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> getNextBatch(int i) {
        if (this.splits == null) {
            this.splits = getSplits(this.session, this.table);
        }
        return CompletableFuture.completedFuture(new ConnectorSplitSource.ConnectorSplitBatch(prepareNextBatch(i), isFinished()));
    }

    private List<ConnectorSplit> prepareNextBatch(int i) {
        Objects.requireNonNull(this.splits, "splits is null");
        int min = Math.min(this.splits.size(), this.offset + i);
        Stream<BigQuerySplit> stream = this.splits.subList(this.offset, min).stream();
        Class<ConnectorSplit> cls = ConnectorSplit.class;
        Objects.requireNonNull(ConnectorSplit.class);
        List<ConnectorSplit> list = (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).collect(ImmutableList.toImmutableList());
        this.offset = min;
        return list;
    }

    public boolean isFinished() {
        return this.splits != null && this.offset >= this.splits.size();
    }

    public void close() {
        this.splits = null;
    }

    private List<BigQuerySplit> getSplits(ConnectorSession connectorSession, BigQueryTableHandle bigQueryTableHandle) {
        TableId tableId;
        TableDefinition.Type valueOf;
        boolean isUseStorageApi;
        TupleDomain<ColumnHandle> constraint = bigQueryTableHandle.constraint();
        Optional<String> buildFilter = BigQueryFilterQueryBuilder.buildFilter(constraint);
        OptionalLong limit = bigQueryTableHandle.limit();
        if (bigQueryTableHandle.isQueryRelation()) {
            BigQueryQueryRelationHandle requiredQueryRelation = bigQueryTableHandle.getRequiredQueryRelation();
            List<BigQueryColumnHandle> orElse = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of());
            isUseStorageApi = requiredQueryRelation.isUseStorageApi();
            if (!isUseStorageApi) {
                log.debug("Using Rest API for running query: %s", new Object[]{requiredQueryRelation.getQuery()});
                return List.of(BigQuerySplit.forViewStream(orElse, buildFilter));
            }
            String buildNativeQuery = BigQueryUtil.buildNativeQuery(requiredQueryRelation.getQuery(), buildFilter, limit);
            TableInfo tableInfo = new ViewMaterializationCache.DestinationTableBuilder(this.bigQueryClientFactory.create(connectorSession), this.viewExpiration, buildNativeQuery, requiredQueryRelation.getDestinationTableName().toTableId()).get();
            log.debug("Using Storage API for running query: %s", new Object[]{buildNativeQuery});
            tableId = tableInfo.getTableId();
            valueOf = tableInfo.getDefinition().getType();
        } else {
            BigQueryNamedRelationHandle requiredNamedRelation = bigQueryTableHandle.getRequiredNamedRelation();
            tableId = requiredNamedRelation.getRemoteTableName().toTableId();
            valueOf = TableDefinition.Type.valueOf(requiredNamedRelation.getType());
            isUseStorageApi = requiredNamedRelation.isUseStorageApi();
        }
        return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns()) ? createEmptyProjection(connectorSession, valueOf, tableId, buildFilter, limit) : readFromBigQuery(connectorSession, valueOf, tableId, bigQueryTableHandle.projectedColumns(), constraint, isUseStorageApi);
    }

    private static boolean emptyProjectionIsRequired(Optional<List<BigQueryColumnHandle>> optional) {
        return optional.isPresent() && optional.get().isEmpty();
    }

    private List<BigQuerySplit> readFromBigQuery(ConnectorSession connectorSession, TableDefinition.Type type, TableId tableId, Optional<List<BigQueryColumnHandle>> optional, TupleDomain<ColumnHandle> tupleDomain, boolean z) {
        Preconditions.checkArgument(optional.isPresent() && optional.get().size() > 0, "Projected column is empty");
        Optional<String> buildFilter = BigQueryFilterQueryBuilder.buildFilter(tupleDomain);
        log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", new Object[]{tableId, optional, buildFilter});
        List<BigQueryColumnHandle> list = optional.get();
        List<String> projectedColumnNames = getProjectedColumnNames(list);
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.addAll(list);
        if (!z) {
            return ImmutableList.of(BigQuerySplit.forViewStream(list, buildFilter));
        }
        if (type == TableDefinition.Type.VIEW || type == TableDefinition.Type.MATERIALIZED_VIEW) {
            tupleDomain.getDomains().ifPresent(map -> {
                Stream stream = map.keySet().stream();
                Class<BigQueryColumnHandle> cls = BigQueryColumnHandle.class;
                Objects.requireNonNull(BigQueryColumnHandle.class);
                Stream filter = stream.map((v1) -> {
                    return r1.cast(v1);
                }).filter(bigQueryColumnHandle -> {
                    return !projectedColumnNames.contains(bigQueryColumnHandle.name());
                });
                Objects.requireNonNull(builder);
                filter.forEach((v1) -> {
                    r1.add(v1);
                });
            });
        }
        ReadSession createReadSession = createReadSession(connectorSession, tableId, ImmutableList.copyOf(builder.build()), buildFilter);
        String schemaAsString = getSchemaAsString(createReadSession);
        return (List) createReadSession.getStreamsList().stream().map(readStream -> {
            return BigQuerySplit.forStream(readStream.getName(), schemaAsString, list, OptionalInt.of(readStream.getSerializedSize()));
        }).collect(ImmutableList.toImmutableList());
    }

    @VisibleForTesting
    ReadSession createReadSession(ConnectorSession connectorSession, TableId tableId, List<BigQueryColumnHandle> list, Optional<String> optional) {
        return new ReadSessionCreator(this.bigQueryClientFactory, this.bigQueryReadClientFactory, this.viewEnabled, this.arrowSerializationEnabled, this.viewExpiration, this.maxReadRowsRetries).create(connectorSession, tableId, list, optional, this.nodeManager.getRequiredWorkerNodes().size());
    }

    private static List<String> getProjectedColumnNames(List<BigQueryColumnHandle> list) {
        return (List) list.stream().map((v0) -> {
            return v0.name();
        }).collect(ImmutableList.toImmutableList());
    }

    private List<BigQuerySplit> createEmptyProjection(ConnectorSession connectorSession, TableDefinition.Type type, TableId tableId, Optional<String> optional, OptionalLong optionalLong) {
        if (BigQueryClient.TABLE_TYPES.containsKey(type)) {
            return createEmptyProjection(connectorSession, "SELECT COUNT(*) FROM (%s)".formatted(BigQueryClient.selectSql(tableId, "true", optional, optionalLong)));
        }
        throw new TrinoException(StandardErrorCode.NOT_SUPPORTED, "Unsupported table type: " + String.valueOf(type));
    }

    private List<BigQuerySplit> createEmptyProjection(ConnectorSession connectorSession, String str) {
        BigQueryClient create = this.bigQueryClientFactory.create(connectorSession);
        log.debug("createEmptyProjection(sql=%s)", new Object[]{str});
        try {
            return ImmutableList.of(BigQuerySplit.emptyProjection(((FieldValue) Iterables.getOnlyElement((Iterable) Iterables.getOnlyElement(create.executeQuery(connectorSession, str).iterateAll()))).getLongValue()));
        } catch (BigQueryException e) {
            throw new TrinoException(BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY, "Failed to compute empty projection", e);
        }
    }

    private String getSchemaAsString(ReadSession readSession) {
        return this.arrowSerializationEnabled ? deserializeArrowSchema(readSession.getArrowSchema().getSerializedSchema()) : readSession.getAvroSchema().getSchema();
    }

    private static String deserializeArrowSchema(ByteString byteString) {
        try {
            return MessageSerializer.deserializeSchema(new ReadChannel(new ByteArrayReadableSeekableByteChannel(byteString.toByteArray()))).toJson();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
