/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.worker;

import io.aleph0.yap.core.Source;
import io.aleph0.yap.core.worker.ConsumerWorkerFactory;
import io.aleph0.yap.core.worker.MeasuredConsumerWorker;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BinaryOperator;

public class DefaultConsumerWorkerFactory<InputT, MetricsT>
implements ConsumerWorkerFactory<InputT, MetricsT> {
    private final List<MeasuredConsumerWorker<InputT, MetricsT>> workers = new CopyOnWriteArrayList<MeasuredConsumerWorker<InputT, MetricsT>>();
    private final MeasuredConsumerWorker<InputT, MetricsT> workerSupplier;
    private final BinaryOperator<MetricsT> metricsAggregator;

    public DefaultConsumerWorkerFactory(MeasuredConsumerWorker<InputT, MetricsT> workerSupplier, BinaryOperator<MetricsT> metricsAggregator) {
        this.workerSupplier = Objects.requireNonNull(workerSupplier, "workerSupplier");
        this.metricsAggregator = Objects.requireNonNull(metricsAggregator, "metricsAggregator");
    }

    @Override
    public MeasuredConsumerWorker<InputT, MetricsT> newConsumerWorker() {
        return new MeasuredConsumerWorker<InputT, MetricsT>(){

            @Override
            public void consume(Source<InputT> source) throws Exception {
                DefaultConsumerWorkerFactory.this.workers.add(this);
                try {
                    DefaultConsumerWorkerFactory.this.workerSupplier.consume(source);
                }
                finally {
                    DefaultConsumerWorkerFactory.this.workers.remove(this);
                }
            }

            @Override
            public MetricsT checkMetrics() {
                return DefaultConsumerWorkerFactory.this.workerSupplier.checkMetrics();
            }

            @Override
            public MetricsT flushMetrics() {
                return DefaultConsumerWorkerFactory.this.workerSupplier.flushMetrics();
            }
        };
    }

    @Override
    public MetricsT checkMetrics() {
        Object result = null;
        for (MeasuredConsumerWorker<InputT, MetricsT> worker : this.workers) {
            Object metrics = worker.checkMetrics();
            if (result == null) {
                result = metrics;
                continue;
            }
            result = this.metricsAggregator.apply(result, metrics);
        }
        return (MetricsT)result;
    }

    @Override
    public MetricsT flushMetrics() {
        Object result = null;
        for (MeasuredConsumerWorker<InputT, MetricsT> worker : this.workers) {
            Object metrics = worker.flushMetrics();
            if (result == null) {
                result = metrics;
                continue;
            }
            result = this.metricsAggregator.apply(result, metrics);
        }
        return (MetricsT)result;
    }
}

