/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.pipeline;

import io.aleph0.yap.core.Pipeline;
import io.aleph0.yap.core.pipeline.PipelineWrapper;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoredPipeline
implements Pipeline {
    private static final Logger LOGGER = LoggerFactory.getLogger(MonitoredPipeline.class);
    private static final ScheduledExecutorService DEFAULT_SCHEDULER = Executors.newSingleThreadScheduledExecutor();
    public static final Duration DEFAULT_PERIOD = Duration.ofMinutes(1L);
    private final ScheduledExecutorService scheduler;
    private final Pipeline delegate;
    private final MetricsReporter reporter;
    private final Duration period;
    private volatile ScheduledFuture<?> reporting;

    public static PipelineWrapper newWrapper() {
        return pipeline -> new MonitoredPipeline(pipeline, MetricsReporter.stderr());
    }

    public static PipelineWrapper newWrapper(MetricsReporter reporter) {
        return pipeline -> new MonitoredPipeline(pipeline, reporter);
    }

    public static PipelineWrapper newWrapper(MetricsReporter reporter, Duration period) {
        return pipeline -> new MonitoredPipeline(pipeline, reporter, period);
    }

    public MonitoredPipeline(Pipeline delegate, MetricsReporter reporter) {
        this(delegate, reporter, DEFAULT_PERIOD);
    }

    public MonitoredPipeline(Pipeline delegate, MetricsReporter reporter, Duration period) {
        this(DEFAULT_SCHEDULER, delegate, reporter, period);
    }

    public MonitoredPipeline(ScheduledExecutorService scheduler, Pipeline delegate, MetricsReporter reporter, Duration period) {
        this.scheduler = Objects.requireNonNull(scheduler);
        this.delegate = Objects.requireNonNull(delegate);
        this.reporter = Objects.requireNonNull(reporter);
        this.period = Objects.requireNonNull(period);
    }

    @Override
    public int getId() {
        return this.delegate.getId();
    }

    @Override
    public void addLifecycleListener(Pipeline.LifecycleListener listener) {
        this.delegate.addLifecycleListener(listener);
    }

    @Override
    public void removeLifecycleListener(Pipeline.LifecycleListener listener) {
        this.delegate.removeLifecycleListener(listener);
    }

    @Override
    public void start() {
        this.delegate.addLifecycleListener(new Pipeline.LifecycleListener(){

            @Override
            public void onPipelineStarted(int pipeline) {
                this.startReporting();
            }

            private void startReporting() {
                MonitoredPipeline.this.reporting = MonitoredPipeline.this.scheduler.scheduleAtFixedRate(MonitoredPipeline.this::reportMetrics, 0L, MonitoredPipeline.this.period.toMillis(), TimeUnit.MILLISECONDS);
            }

            @Override
            public void onPipelineCompleted(int pipeline) {
                this.stopReporting();
            }

            @Override
            public void onPipelineCancelled(int pipeline) {
                this.stopReporting();
            }

            @Override
            public void onPipelineFailed(int pipeline, Throwable cause) {
                this.stopReporting();
            }

            private void stopReporting() {
                if (MonitoredPipeline.this.reporting != null) {
                    MonitoredPipeline.this.reporting.cancel(false);
                    MonitoredPipeline.this.reporting = null;
                }
            }
        });
        this.delegate.start();
    }

    @Override
    public void cancel() {
        this.delegate.cancel();
    }

    @Override
    public void await() throws InterruptedException, ExecutionException, CancellationException {
        this.delegate.await();
    }

    @Override
    public Pipeline.Metrics checkMetrics() {
        return (Pipeline.Metrics)this.delegate.checkMetrics();
    }

    @Override
    public Pipeline.Metrics flushMetrics() {
        return (Pipeline.Metrics)this.delegate.flushMetrics();
    }

    private void reportMetrics() {
        try {
            LOGGER.atDebug().addKeyValue("pipeline", (Object)this.getId()).addKeyValue("reporter", (Object)this.reporter).addKeyValue("period", (Object)this.period).log("Collecting and reporting metrics");
            Pipeline.Metrics metrics = (Pipeline.Metrics)this.delegate.flushMetrics();
            this.reporter.reportMetrics(metrics);
        }
        catch (Exception e) {
            LOGGER.atError().addKeyValue("pipeline", (Object)this.getId()).addKeyValue("reporter", (Object)this.reporter).addKeyValue("period", (Object)this.period).setCause((Throwable)e).log("Error collecting or reporting metrics");
        }
    }

    @FunctionalInterface
    public static interface MetricsReporter {
        public static MetricsReporter stdout() {
            return metrics -> System.out.println(metrics.toString());
        }

        public static MetricsReporter stderr() {
            return metrics -> System.err.println(metrics.toString());
        }

        public void reportMetrics(Pipeline.Metrics var1);
    }
}

