/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.scanner;

import io.datarouter.scanner.BaseScanner;
import io.datarouter.scanner.Scanner;
import java.util.LinkedHashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;

public class ParallelMappingScanner<T, R>
extends BaseScanner<R> {
    private final Scanner<T> input;
    private final Function<? super T, ? extends R> mapper;
    private final ExecutorService executor;
    private final boolean allowUnorderedResults;
    private final LinkedHashSet<Future<R>> runningFutures;
    private final CompletionService<R> completionService;

    public ParallelMappingScanner(Scanner<T> input, boolean allowUnorderedResults, ExecutorService executor, int numThreads, Function<? super T, ? extends R> mapper) {
        this.input = input;
        this.mapper = mapper;
        this.allowUnorderedResults = allowUnorderedResults;
        this.executor = executor;
        this.runningFutures = new LinkedHashSet();
        this.completionService = allowUnorderedResults ? new ExecutorCompletionService(executor) : null;
        this.submitCallables(numThreads);
    }

    @Override
    public boolean advance() {
        if (this.runningFutures.isEmpty()) {
            this.current = null;
            return false;
        }
        try {
            this.current = this.nextResult();
            return true;
        }
        catch (RuntimeException e) {
            this.runningFutures.forEach(runningFuture -> {
                boolean bl = runningFuture.cancel(true);
            });
            throw e;
        }
    }

    private void submitCallables(int limit) {
        int i = 0;
        while (i < limit) {
            if (!this.input.advance()) {
                return;
            }
            T item = this.input.current();
            Callable<R> callable = this.makeCallable(item);
            this.submitCallable(callable);
            ++i;
        }
    }

    private Callable<R> makeCallable(T item) {
        return () -> this.mapper.apply(item);
    }

    private void submitCallable(Callable<R> callable) {
        Future<R> future = this.allowUnorderedResults ? this.completionService.submit(callable) : this.executor.submit(callable);
        this.runningFutures.add(future);
    }

    private R nextResult() {
        try {
            Future<R> future = this.nextFuture();
            this.runningFutures.remove(future);
            this.submitCallables(1);
            return future.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private Future<R> nextFuture() throws InterruptedException {
        return this.allowUnorderedResults ? this.completionService.take() : (Future)this.runningFutures.iterator().next();
    }
}

