package org.apache.beam.sdk.io.fileschematransform;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.schemas.io.Providers;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CharStreams;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.class */
public class FileReadSchemaTransformProvider extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
    private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
    private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
    static final String INPUT_TAG = "input";
    static final String OUTPUT_TAG = "output";
    static final String FILEPATTERN_ROW_FIELD_NAME = "filepattern";

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider$FileReadSchemaTransform.class */
    public static class FileReadSchemaTransform extends SchemaTransform {
        private FileReadSchemaTransformConfiguration configuration;
        private boolean useInputPCollection;

        FileReadSchemaTransform(FileReadSchemaTransformConfiguration fileReadSchemaTransformConfiguration) {
            this.configuration = fileReadSchemaTransformConfiguration;
            this.useInputPCollection = Strings.isNullOrEmpty(fileReadSchemaTransformConfiguration.getFilepattern());
        }

        public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
            Preconditions.checkArgument(pCollectionRowTuple.getAll().isEmpty() ^ Strings.isNullOrEmpty(this.configuration.getFilepattern()), "Either an input PCollection of file patterns or the filepattern parameter must be set,but not both.");
            if (!Strings.isNullOrEmpty(this.configuration.getSchema())) {
                this.configuration = this.configuration.toBuilder().setSchema(resolveSchemaStringOrFilePath(this.configuration.getSafeSchema())).build();
            }
            return PCollectionRowTuple.of(FileReadSchemaTransformProvider.OUTPUT_TAG, (this.useInputPCollection ? (PCollection) pCollectionRowTuple.get(FileReadSchemaTransformProvider.INPUT_TAG).apply("Get filepatterns", MapElements.into(TypeDescriptors.strings()).via(row -> {
                return (String) Objects.requireNonNull(row.getString(FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME));
            }).exceptionsInto(TypeDescriptors.nulls()).exceptionsVia(exceptionElement -> {
                FileReadSchemaTransformProvider.LOG.warn("Could not acquire a faulty filepattern: {}. This will be skipped.", (String) Optional.ofNullable(((Row) exceptionElement.element()).getString(FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME)).orElse("[null filepattern]"));
                return null;
            })).output().apply("Match files", buildMatchTransform()) : pCollectionRowTuple.getPipeline().apply(buildMatchTransform())).apply(FileIO.readMatches()).apply("Read files", getProvider().buildTransform(this.configuration)));
        }

        @VisibleForTesting
        PTransform<?, PCollection<MatchResult.Metadata>> buildMatchTransform() {
            FileIO.MatchAll matchAll = this.useInputPCollection ? FileIO.matchAll() : FileIO.match().filepattern(this.configuration.getSafeFilepattern());
            Long terminateAfterSecondsSinceNewOutput = this.configuration.getTerminateAfterSecondsSinceNewOutput();
            Long pollIntervalMillis = this.configuration.getPollIntervalMillis();
            if (pollIntervalMillis != null && pollIntervalMillis.longValue() > 0) {
                Duration millis = Duration.millis(pollIntervalMillis.longValue());
                Watch.Growth.AfterTimeSinceNewOutput never = (terminateAfterSecondsSinceNewOutput == null || terminateAfterSecondsSinceNewOutput.longValue() <= 0) ? Watch.Growth.never() : Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSecondsSinceNewOutput.longValue()));
                matchAll = this.useInputPCollection ? FileIO.matchAll().continuously(millis, never) : FileIO.match().filepattern(this.configuration.getSafeFilepattern()).continuously(millis, never);
            }
            return matchAll;
        }

        private String resolveSchemaStringOrFilePath(String str) {
            try {
                try {
                    FileReadSchemaTransformProvider.LOG.info("Attempting to locate input schema as a file path.");
                    MatchResult match = FileSystems.match(str);
                    Preconditions.checkArgument(match.status() == MatchResult.Status.OK);
                    Preconditions.checkArgument(!match.metadata().isEmpty(), "Failed to match any files for the input schema file path.");
                    List list = (List) match.metadata().stream().map((v0) -> {
                        return v0.resourceId();
                    }).collect(Collectors.toList());
                    Preconditions.checkArgument(list.size() == 1, "Expected exactly 1 schema file, but got " + list.size() + " files.");
                    return CharStreams.toString(Channels.newReader(FileSystems.open((ResourceId) list.get(0)), StandardCharsets.UTF_8.name()));
                } catch (IllegalArgumentException e) {
                    FileReadSchemaTransformProvider.LOG.info("Input schema is not a valid file path. Will attempt to use it as a schema string.");
                    return str;
                }
            } catch (IOException e2) {
                throw new RuntimeException("Error when parsing input schema file: ", e2);
            }
        }

        private FileReadSchemaTransformFormatProvider getProvider() {
            String format = this.configuration.getFormat();
            Map loadProviders = Providers.loadProviders(FileReadSchemaTransformFormatProvider.class);
            Preconditions.checkArgument(loadProviders.containsKey(format), String.format("Received unsupported file format: %s. Supported formats are %s", format, loadProviders.keySet()));
            Optional ofNullable = Optional.ofNullable((FileReadSchemaTransformFormatProvider) loadProviders.get(format));
            Preconditions.checkState(ofNullable.isPresent());
            return (FileReadSchemaTransformFormatProvider) ofNullable.get();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1986886077:
                    if (implMethodName.equals("lambda$expand$9f70608a$1")) {
                        z = false;
                        break;
                    }
                    break;
                case -751122031:
                    if (implMethodName.equals("lambda$expand$2b1d20b8$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/ProcessFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider$FileReadSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/transforms/WithFailures$ExceptionElement;)Ljava/lang/Void;")) {
                        return exceptionElement -> {
                            FileReadSchemaTransformProvider.LOG.warn("Could not acquire a faulty filepattern: {}. This will be skipped.", (String) Optional.ofNullable(((Row) exceptionElement.element()).getString(FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME)).orElse("[null filepattern]"));
                            return null;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider$FileReadSchemaTransform") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Ljava/lang/String;")) {
                        return row -> {
                            return (String) Objects.requireNonNull(row.getString(FileReadSchemaTransformProvider.FILEPATTERN_ROW_FIELD_NAME));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
        return FileReadSchemaTransformConfiguration.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchemaTransform from(FileReadSchemaTransformConfiguration fileReadSchemaTransformConfiguration) {
        return new FileReadSchemaTransform(fileReadSchemaTransformConfiguration);
    }

    public String identifier() {
        return IDENTIFIER;
    }

    public List<String> inputCollectionNames() {
        return Collections.singletonList(INPUT_TAG);
    }

    public List<String> outputCollectionNames() {
        return Collections.singletonList(OUTPUT_TAG);
    }
}
