/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.worker;

import io.aleph0.yap.core.Sink;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.core.worker.MeasuredProcessorWorker;
import io.aleph0.yap.core.worker.ProcessorWorkerFactory;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;

public class DefaultProcessorWorkerFactory<InputT, OutputT, MetricsT>
implements ProcessorWorkerFactory<InputT, OutputT, MetricsT> {
    private final List<MeasuredProcessorWorker<InputT, OutputT, MetricsT>> workers = new CopyOnWriteArrayList<MeasuredProcessorWorker<InputT, OutputT, MetricsT>>();
    private final Supplier<MeasuredProcessorWorker<InputT, OutputT, MetricsT>> workerSupplier;
    private final BinaryOperator<MetricsT> metricsAggregator;

    public DefaultProcessorWorkerFactory(Supplier<MeasuredProcessorWorker<InputT, OutputT, MetricsT>> workerSupplier, BinaryOperator<MetricsT> metricsAggregator) {
        this.workerSupplier = Objects.requireNonNull(workerSupplier, "workerSupplier");
        this.metricsAggregator = Objects.requireNonNull(metricsAggregator, "metricsAggregator");
    }

    @Override
    public MeasuredProcessorWorker<InputT, OutputT, MetricsT> newProcessorWorker() {
        final MeasuredProcessorWorker<InputT, OutputT, MetricsT> newWorker = this.workerSupplier.get();
        return new MeasuredProcessorWorker<InputT, OutputT, MetricsT>(){
            final /* synthetic */ DefaultProcessorWorkerFactory this$0;
            {
                this.this$0 = this$0;
            }

            @Override
            public void process(Source<InputT> source, Sink<OutputT> sink) throws Exception {
                this.this$0.workers.add(this);
                try {
                    newWorker.process(source, sink);
                }
                finally {
                    this.this$0.workers.remove(this);
                }
            }

            @Override
            public MetricsT checkMetrics() {
                return newWorker.checkMetrics();
            }

            @Override
            public MetricsT flushMetrics() {
                return newWorker.flushMetrics();
            }
        };
    }

    @Override
    public MetricsT checkMetrics() {
        Object result = null;
        for (MeasuredProcessorWorker<InputT, OutputT, MetricsT> worker : this.workers) {
            Object metrics = worker.checkMetrics();
            if (result == null) {
                result = metrics;
                continue;
            }
            result = this.metricsAggregator.apply(result, metrics);
        }
        return (MetricsT)result;
    }

    @Override
    public MetricsT flushMetrics() {
        Object result = null;
        for (MeasuredProcessorWorker<InputT, OutputT, MetricsT> worker : this.workers) {
            Object metrics = worker.flushMetrics();
            if (result == null) {
                result = metrics;
                continue;
            }
            result = this.metricsAggregator.apply(result, metrics);
        }
        return (MetricsT)result;
    }
}

