package io.openlineage.flink;

import io.openlineage.flink.tracker.OpenLineageContinousJobTracker;
import io.openlineage.flink.tracker.OpenLineageContinousJobTrackerFactory;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContext;
import io.openlineage.flink.visitor.lifecycle.FlinkExecutionContextFactory;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener.class */
public class OpenLineageFlinkJobListener implements JobListener {
    private static final Logger log = LoggerFactory.getLogger(OpenLineageFlinkJobListener.class);
    private final StreamExecutionEnvironment executionEnvironment;
    private final OpenLineageContinousJobTracker openLineageContinousJobTracker;
    private final Map<JobID, FlinkExecutionContext> jobContexts;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/openlineage/flink/OpenLineageFlinkJobListener$ArchivedList.class */
    public static class ArchivedList<T> extends ArrayList<T> {
        List<T> value;

        public ArchivedList(Collection<T> collection) {
            super(collection);
            this.value = new ArrayList(collection);
        }

        @Override // java.util.ArrayList, java.util.AbstractList, java.util.AbstractCollection, java.util.Collection, java.util.List
        public void clear() {
            this.value = new ArrayList(this);
            super.clear();
        }

        public List<T> getValue() {
            return this.value;
        }
    }

    public OpenLineageFlinkJobListener(StreamExecutionEnvironment streamExecutionEnvironment) {
        this(streamExecutionEnvironment, OpenLineageContinousJobTrackerFactory.getTracker(streamExecutionEnvironment.getConfiguration()));
    }

    public OpenLineageFlinkJobListener(StreamExecutionEnvironment streamExecutionEnvironment, OpenLineageContinousJobTracker openLineageContinousJobTracker) {
        this.jobContexts = new HashMap();
        this.executionEnvironment = streamExecutionEnvironment;
        this.openLineageContinousJobTracker = openLineageContinousJobTracker;
        makeTransformationsArchivedList(streamExecutionEnvironment);
    }

    public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
        if (jobClient == null) {
            return;
        }
        try {
            start(jobClient);
        } catch (Exception | NoClassDefFoundError | NoSuchFieldError e) {
            log.error("Failed to notify OpenLineage about start", e);
        }
    }

    void start(JobClient jobClient) {
        try {
            FlinkExecutionContext context = FlinkExecutionContextFactory.getContext(jobClient.getJobID(), ((ArchivedList) FieldUtils.getField(StreamExecutionEnvironment.class, "transformations", true).get(this.executionEnvironment)).getValue());
            this.jobContexts.put(jobClient.getJobID(), context);
            context.onJobSubmitted();
            log.info("Job submitted");
            log.info("OpenLineageContinousJobTracker is starting");
            this.openLineageContinousJobTracker.startTracking(context);
        } catch (IllegalAccessException e) {
            log.error("Can't access the field. ", e);
        }
    }

    public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        try {
            finish(jobExecutionResult, th);
        } catch (Exception | NoClassDefFoundError | NoSuchFieldError e) {
            log.error("Failed to notify OpenLineage about complete", e);
        }
    }

    void finish(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        if (jobExecutionResult instanceof DetachedJobExecutionResult) {
            this.jobContexts.remove(jobExecutionResult.getJobID());
            log.warn("Job running in detached mode. Set execution.attached to true if you want to emit completed events.");
        } else if (jobExecutionResult != null) {
            this.jobContexts.remove(jobExecutionResult.getJobID()).onJobCompleted(jobExecutionResult);
        } else if (this.jobContexts.size() == 1) {
            this.jobContexts.remove(this.jobContexts.entrySet().stream().findFirst().get().getKey()).onJobFailed(th);
        }
    }

    private void makeTransformationsArchivedList(StreamExecutionEnvironment streamExecutionEnvironment) {
        try {
            Field field = FieldUtils.getField(StreamExecutionEnvironment.class, "transformations", true);
            FieldUtils.writeField(field, streamExecutionEnvironment, new ArchivedList((Collection) Optional.ofNullable((ArrayList) FieldUtils.readField(field, streamExecutionEnvironment, true)).orElse(new ArrayList())), true);
        } catch (IllegalAccessException e) {
            log.error("Failed to rewrite transformations");
        }
    }
}
