/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.benchmarks;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Counter;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import io.scalecube.benchmarks.BenchmarksSettings;
import io.scalecube.benchmarks.BenchmarksTask;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class BenchmarksState<SELF extends BenchmarksState<SELF>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarksState.class);
    protected final BenchmarksSettings settings;
    private Scheduler scheduler;
    private List<Scheduler> schedulers;
    private ConsoleReporter consoleReporter;
    private CsvReporter csvReporter;
    private final AtomicBoolean started = new AtomicBoolean();

    public BenchmarksState(BenchmarksSettings settings) {
        this.settings = settings;
    }

    protected void beforeAll() throws Exception {
    }

    protected void afterAll() throws Exception {
    }

    public final void start() {
        if (!this.started.compareAndSet(false, true)) {
            throw new IllegalStateException("BenchmarksState is already started");
        }
        LOGGER.info("Benchmarks settings: " + this.settings);
        if (this.settings.consoleReporterEnabled()) {
            this.consoleReporter = ConsoleReporter.forRegistry(this.settings.registry()).outputTo(System.out).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build();
        }
        this.csvReporter = CsvReporter.forRegistry(this.settings.registry()).convertDurationsTo(this.settings.durationUnit()).convertRatesTo(this.settings.rateUnit()).build(this.settings.csvReporterDirectory());
        this.scheduler = Schedulers.fromExecutor(Executors.newFixedThreadPool(this.settings.nThreads()));
        this.schedulers = IntStream.rangeClosed(1, this.settings.nThreads()).mapToObj(i -> Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor())).collect(Collectors.toList());
        try {
            this.beforeAll();
        }
        catch (Exception ex) {
            throw new IllegalStateException("BenchmarksState beforeAll() failed: " + ex, ex);
        }
        this.settings.registry().register(this.settings.taskName() + "-gc", new GarbageCollectorMetricSet());
        this.settings.registry().register(this.settings.taskName() + "-memory", new MemoryUsageGaugeSet());
        this.settings.registry().register(this.settings.taskName() + "-threads", new ThreadStatesGaugeSet());
        if (this.settings.consoleReporterEnabled()) {
            this.consoleReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
        }
        this.csvReporter.start(this.settings.reporterInterval().toMillis(), TimeUnit.MILLISECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (this.started.get()) {
                this.csvReporter.report();
                if (this.consoleReporter != null) {
                    this.consoleReporter.report();
                }
            }
        }));
    }

    public final void shutdown() {
        if (!this.started.compareAndSet(true, false)) {
            throw new IllegalStateException("BenchmarksState is not started");
        }
        if (this.consoleReporter != null) {
            this.consoleReporter.report();
            this.consoleReporter.stop();
        }
        if (this.csvReporter != null) {
            this.csvReporter.report();
            this.csvReporter.stop();
        }
        if (this.scheduler != null) {
            this.scheduler.dispose();
        }
        if (this.schedulers != null) {
            this.schedulers.forEach(Scheduler::dispose);
        }
        try {
            this.afterAll();
        }
        catch (Exception ex) {
            throw new IllegalStateException("BenchmarksState afterAll() failed: " + ex, ex);
        }
    }

    public Scheduler scheduler() {
        return this.scheduler;
    }

    public List<Scheduler> schedulers() {
        return this.schedulers;
    }

    public Timer timer(String name) {
        return this.settings.registry().timer(this.settings.taskName() + "-" + name);
    }

    public Meter meter(String name) {
        return this.settings.registry().meter(this.settings.taskName() + "-" + name);
    }

    public Histogram histogram(String name) {
        return this.settings.registry().histogram(this.settings.taskName() + "-" + name);
    }

    public Counter counter(String name) {
        return this.settings.registry().counter(this.settings.taskName() + "-" + name);
    }

    public final void runForSync(Function<SELF, Function<Long, Object>> func) {
        BenchmarksState self = this;
        try {
            self.start();
            Function<Long, Object> unitOfWork = func.apply(self);
            CountDownLatch latch = new CountDownLatch(1);
            Flux.fromStream(LongStream.range(0L, this.settings.numOfIterations()).boxed()).parallel().runOn(this.scheduler()).map(unitOfWork).doOnTerminate(latch::countDown).subscribe();
            latch.await(this.settings.executionTaskDuration().toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        finally {
            self.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void runForAsync(Function<SELF, Function<Long, Publisher<?>>> func) {
        BenchmarksState self = this;
        try {
            self.start();
            Function<Long, Publisher<?>> unitOfWork = func.apply(self);
            Flux<Long> fromStream = Flux.fromStream(LongStream.range(0L, this.settings.numOfIterations()).boxed());
            Flux.merge(fromStream.publishOn(this.scheduler()).map(unitOfWork)).take(this.settings.executionTaskDuration()).blockLast();
        }
        finally {
            self.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final <T> void runWithRampUp(BiFunction<Long, SELF, Publisher<T>> setUp, Function<SELF, BiFunction<Long, T, Publisher<?>>> func, BiFunction<SELF, T, Mono<Void>> cleanUp) {
        BenchmarksState self = this;
        try {
            self.start();
            BiFunction unitOfWork = func.apply(self);
            Flux.interval(Duration.ZERO, this.settings.rampUpInterval()).take(this.settings.rampUpDuration()).flatMap(rampUpIteration -> {
                int schedulerIndex = (int)((rampUpIteration & Long.MAX_VALUE) % (long)this.schedulers().size());
                Scheduler scheduler = this.schedulers().get(schedulerIndex);
                return Flux.range(0, Math.max(1, this.settings.injectorsPerRampUpInterval())).flatMap(iteration1 -> {
                    Flux setUpFactory = Flux.create(sink -> {
                        Flux<Object> deferSetUp = Flux.defer(() -> (Publisher)setUp.apply((Long)rampUpIteration, (Object)self));
                        deferSetUp.subscribe(sink::next, ex -> {
                            LOGGER.error("Exception occured on setUp at rampUpIteration: {}, cause: {}, task won't start", rampUpIteration, ex);
                            sink.complete();
                        }, sink::complete);
                    });
                    return setUpFactory.subscribeOn(scheduler).map(setUpResult -> new BenchmarksTask<BenchmarksState, Object>(self, setUpResult, unitOfWork, cleanUp, scheduler)).doOnNext(scheduler::schedule).flatMap(BenchmarksTask::completionMono);
                });
            }, Integer.MAX_VALUE, Integer.MAX_VALUE).blockLast();
        }
        finally {
            self.shutdown();
        }
    }
}

