/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.task;

import io.aleph0.yap.core.Measureable;
import io.aleph0.yap.core.task.TaskController;
import io.aleph0.yap.core.task.action.CancelTaskAction;
import io.aleph0.yap.core.task.action.FailTaskAction;
import io.aleph0.yap.core.task.action.StartWorkerTaskAction;
import io.aleph0.yap.core.task.action.StopWorkerTaskAction;
import io.aleph0.yap.core.task.action.SucceedTaskAction;
import io.aleph0.yap.core.task.action.TaskAction;
import io.aleph0.yap.core.transport.Queue;
import io.aleph0.yap.core.transport.Topic;
import java.lang.runtime.SwitchBootstraps;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskManager<WorkerMetricsT>
implements Measureable<Metrics<WorkerMetricsT>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class);
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicLong metricCommencements = new AtomicLong(0L);
    private final AtomicLong metricCompletions = new AtomicLong(0L);
    private final AtomicLong metricNormalCompletions = new AtomicLong(0L);
    private final AtomicLong metricStopCompletions = new AtomicLong(0L);
    private final AtomicLong metricExceptionalCompletions = new AtomicLong(0L);
    private final Map<Integer, Future<?>> workers = new HashMap();
    private final BlockingQueue<WorkerEvent> events = new LinkedBlockingQueue<WorkerEvent>();
    private final List<LifecycleListener> lifecycleListeners = new CopyOnWriteArrayList<LifecycleListener>();
    private final String id;
    private final Set<String> subscribers;
    private final ExecutorService executor;
    private final TaskController controller;
    private final WorkerBodyFactory<WorkerMetricsT> workerBodyFactory;
    private final Queue<?> queue;
    private final Topic<?> topic;
    private volatile TaskState state = TaskState.READY;
    private ExecutionException failureCause = null;

    public TaskManager(String id, Set<String> subscribers, ExecutorService executor, TaskController controller, WorkerBodyFactory<WorkerMetricsT> workerBodyFactory, Queue<?> queue, Topic<?> topic) {
        this.id = Objects.requireNonNull(id);
        this.subscribers = Collections.unmodifiableSet(subscribers);
        this.executor = Objects.requireNonNull(executor);
        this.controller = Objects.requireNonNull(controller);
        this.workerBodyFactory = Objects.requireNonNull(workerBodyFactory);
        this.queue = queue;
        this.topic = topic;
    }

    public String getId() {
        return this.id;
    }

    public Set<String> getSubscribers() {
        return this.subscribers;
    }

    public void addLifecycleListener(LifecycleListener listener) {
        if (listener == null) {
            throw new NullPointerException();
        }
        this.lifecycleListeners.add(listener);
    }

    public void removeLifecycleListener(LifecycleListener listener) {
        this.lifecycleListeners.remove(listener);
    }

    public void run() throws Exception {
        try {
            try {
                this.state = this.state.to(TaskState.RUNNING);
                LOGGER.atDebug().addKeyValue("task", (Object)this.id).log("Task runner started");
                List<TaskAction> actions = this.controller.onTaskStart();
                this.notifyLifecycleListeners(l -> l.onTaskStarted(this.id));
                this.eventLoop(actions);
                switch (this.state) {
                    case SUCCEEDED: {
                        LOGGER.atDebug().addKeyValue("task", (Object)this.id).log("Task completed");
                        this.controller.onTaskSucceeded();
                        this.notifyLifecycleListeners(listener -> listener.onTaskCompleted(this.id));
                        break;
                    }
                    case CANCELED: {
                        LOGGER.atWarn().addKeyValue("task", (Object)this.id).log("Task cancelled, but without cancel request");
                        this.controller.onTaskCancelled();
                        this.notifyLifecycleListeners(listener -> listener.onTaskCancelled(this.id));
                        break;
                    }
                    case FAILED: {
                        LOGGER.atDebug().addKeyValue("task", (Object)this.id).setCause((Throwable)this.failureCause).log("Task failed");
                        this.controller.onTaskFailed(this.failureCause);
                        this.notifyLifecycleListeners(listener -> listener.onTaskFailed(this.id, this.failureCause));
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Task manager in invalid state after run: " + String.valueOf((Object)this.state));
                    }
                }
                LOGGER.atDebug().addKeyValue("task", (Object)this.id).log("Task runner completed normally");
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.atInfo().addKeyValue("pipeline", (Object)this.id).log("Pipeline manager interrupted, treating as cancel request");
                if (this.state.getPhase() != TaskPhase.FINISHED) {
                    List<TaskAction> actions = this.controller.onCancelRequested();
                    this.eventLoop(actions);
                }
                switch (this.state) {
                    case SUCCEEDED: {
                        LOGGER.atWarn().addKeyValue("task", (Object)this.id).log("Task succeeded, but after cancel request");
                        this.controller.onTaskSucceeded();
                        this.notifyLifecycleListeners(listener -> listener.onTaskCompleted(this.id));
                        break;
                    }
                    case CANCELED: {
                        LOGGER.atInfo().addKeyValue("task", (Object)this.id).log("Task cancelled");
                        this.controller.onTaskCancelled();
                        this.notifyLifecycleListeners(listener -> listener.onTaskCancelled(this.id));
                        break;
                    }
                    case FAILED: {
                        LOGGER.atWarn().addKeyValue("task", (Object)this.id).setCause((Throwable)this.failureCause).log("Task failed, but after cancel request");
                        this.controller.onTaskFailed(this.failureCause);
                        this.notifyLifecycleListeners(listener -> listener.onTaskFailed(this.id, this.failureCause));
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Task manager in invalid state after interrupt: " + String.valueOf((Object)this.state));
                    }
                }
                if (this.topic != null) {
                    this.topic.close();
                }
            }
            catch (Exception e) {
                LOGGER.atError().addKeyValue("task", (Object)this.id).setCause((Throwable)e).log("Task manager failed; hard failing task, canceling all workers, and propagating exception...");
                this.state = TaskState.FAILED;
                for (Future<?> workerFuture : this.workers.values()) {
                    workerFuture.cancel(true);
                }
                ExecutionException cause = new ExecutionException(e);
                this.notifyLifecycleListeners(listener -> listener.onTaskFailed(this.id, cause));
                throw e;
            }
        }
        finally {
            if (this.topic != null) {
                this.topic.close();
            }
        }
    }

    private void eventLoop(List<TaskAction> initialActions) throws InterruptedException {
        for (TaskAction action : initialActions) {
            this.performTaskAction(action);
        }
        while (this.state == TaskState.RUNNING) {
            if (Thread.interrupted()) {
                Thread.currentThread().interrupt();
                throw new InterruptedException();
            }
            WorkerEvent event = this.events.poll(this.controller.getHeartbeatInterval().toMillis(), TimeUnit.MILLISECONDS);
            List<TaskAction> actions = this.handleWorkerEvent(event);
            for (TaskAction action : actions) {
                this.performTaskAction(action);
            }
        }
    }

    List<TaskAction> handleWorkerEvent(WorkerEvent event) {
        List<TaskAction> result;
        WorkerEvent workerEvent = event;
        switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{WorkerStartedEvent.class, WorkerCompletedEvent.class, WorkerFailedEvent.class, WorkerStoppedEvent.class}, (Object)workerEvent, 0)) {
            case 0: {
                WorkerStartedEvent started = (WorkerStartedEvent)workerEvent;
                LOGGER.atDebug().addKeyValue("id", (Object)started.id).log("Worker started");
                result = this.controller.onWorkerStarted(started.id);
                this.metricCommencements.incrementAndGet();
                this.notifyLifecycleListeners(l -> l.onTaskWorkerStarted(this.id, workerStartedEvent.id));
                break;
            }
            case 1: {
                WorkerCompletedEvent completed = (WorkerCompletedEvent)workerEvent;
                LOGGER.atDebug().addKeyValue("id", (Object)completed.id).log("Worker completed normally");
                this.workers.remove(completed.id);
                result = this.controller.onWorkerCompletedNormally(completed.id);
                this.metricCompletions.incrementAndGet();
                this.metricNormalCompletions.incrementAndGet();
                this.notifyLifecycleListeners(l -> l.onTaskWorkerCompletedNormally(this.id, workerCompletedEvent.id));
                break;
            }
            case 2: {
                WorkerFailedEvent failed = (WorkerFailedEvent)workerEvent;
                LOGGER.atError().addKeyValue("id", (Object)failed.id).setCause(failed.cause).log("Worker completed exceptionally");
                this.workers.remove(failed.id);
                result = this.controller.onWorkerCompletedExceptionally(failed.id, failed.cause);
                this.metricCompletions.incrementAndGet();
                this.metricExceptionalCompletions.incrementAndGet();
                this.notifyLifecycleListeners(l -> l.onTaskWorkerCompletedExceptionally(this.id, workerFailedEvent.id, workerFailedEvent.cause));
                break;
            }
            case 3: {
                WorkerStoppedEvent stopped = (WorkerStoppedEvent)workerEvent;
                LOGGER.atDebug().addKeyValue("id", (Object)stopped.id).log("Worker stopped");
                this.workers.remove(stopped.id);
                result = this.controller.onWorkerStopped(stopped.id);
                this.metricCompletions.incrementAndGet();
                this.metricStopCompletions.incrementAndGet();
                this.notifyLifecycleListeners(l -> l.onTaskWorkerStopped(this.id, workerStoppedEvent.id));
                break;
            }
            case -1: {
                LOGGER.atDebug().log("Heartbeat");
                result = this.controller.onHeartbeat();
                break;
            }
            default: {
                throw new MatchException(null, null);
            }
        }
        return result;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    void performTaskAction(TaskAction action) throws InterruptedException {
        TaskAction taskAction = action;
        switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{StartWorkerTaskAction.class, StopWorkerTaskAction.class, SucceedTaskAction.class, CancelTaskAction.class, FailTaskAction.class}, (Object)taskAction, 0)) {
            case 0: {
                StartedWorker<WorkerMetricsT> w = this.startWorker();
                this.workers.put(w.id, w.future);
                LOGGER.atDebug().addKeyValue("id", (Object)w.id).log("Started worker");
                return;
            }
            case 1: {
                int id = this.stopAnyWorker();
                LOGGER.atDebug().addKeyValue("id", (Object)id).log("Stopped worker");
                return;
            }
            case 2: {
                this.state = this.state.to(TaskState.SUCCEEDED);
                LOGGER.atDebug().log("Succeeded task");
                return;
            }
            case 3: {
                this.state = this.state.to(TaskState.CANCELED);
                LOGGER.atDebug().log("Canceled task");
                return;
            }
            case 4: {
                ExecutionException cause;
                try {
                    cause = ((FailTaskAction)taskAction).cause();
                    this.state = this.state.to(TaskState.FAILED);
                    this.failureCause = cause;
                }
                catch (Throwable throwable) {
                    throw new MatchException(throwable.toString(), throwable);
                }
                LOGGER.atDebug().setCause((Throwable)cause).log("Failed task");
                return;
            }
            case -1: {
                LOGGER.atDebug().log("No action");
                return;
            }
        }
        throw new MatchException(null, null);
    }

    protected StartedWorker<WorkerMetricsT> startWorker() throws RejectedExecutionException {
        int id = this.sequence.getAndIncrement();
        WorkerBody body = this.workerBodyFactory.newWorkerBody();
        WorkerRunner worker = new WorkerRunner(id, body);
        Future<Object> future = this.executor.submit(worker, null);
        return new StartedWorker(id, worker, future);
    }

    protected int stopAnyWorker() {
        if (this.workers.isEmpty()) {
            throw new IllegalStateException("no workers");
        }
        Iterator<Map.Entry<Integer, Future<?>>> iterator = this.workers.entrySet().iterator();
        Map.Entry<Integer, Future<?>> entry = iterator.next();
        int workerId = entry.getKey();
        Future<?> workerFuture = entry.getValue();
        iterator.remove();
        workerFuture.cancel(true);
        return workerId;
    }

    protected void stopAllWorkers() {
        for (Future<?> future : this.workers.values()) {
            future.cancel(true);
        }
    }

    @Override
    public Metrics<WorkerMetricsT> checkMetrics() {
        long stalls;
        long produced;
        long waits;
        long consumed;
        long pending;
        if (this.queue != null) {
            Queue.Metrics m = (Queue.Metrics)this.queue.checkMetrics();
            pending = m.pending();
            consumed = m.consumed();
            waits = m.waits();
        } else {
            waits = 0L;
            consumed = 0L;
            pending = 0L;
        }
        if (this.topic != null) {
            Topic.Metrics m = (Topic.Metrics)this.topic.checkMetrics();
            produced = m.published();
            stalls = m.stalls();
        } else {
            stalls = 0L;
            produced = 0L;
        }
        WorkerMetricsT worker = this.workerBodyFactory.checkMetrics();
        TaskState state = this.state;
        TaskPhase phase = state.getPhase();
        long workers = this.workers.size();
        long commencements = this.metricCommencements.get();
        long completions = this.metricCompletions.get();
        long normalCompletions = this.metricNormalCompletions.get();
        long stopCompletions = this.metricStopCompletions.get();
        long exceptionalCompletions = this.metricExceptionalCompletions.get();
        return new Metrics<WorkerMetricsT>(this.id, phase, state, worker, workers, commencements, completions, normalCompletions, stopCompletions, exceptionalCompletions, pending, consumed, waits, produced, stalls);
    }

    @Override
    public Metrics<WorkerMetricsT> flushMetrics() {
        Object result = this.checkMetrics();
        if (this.queue != null) {
            this.queue.flushMetrics();
        }
        if (this.topic != null) {
            this.topic.flushMetrics();
        }
        this.workerBodyFactory.flushMetrics();
        this.metricCommencements.set(0L);
        this.metricCompletions.set(0L);
        this.metricNormalCompletions.set(0L);
        this.metricStopCompletions.set(0L);
        this.metricExceptionalCompletions.set(0L);
        return result;
    }

    private void notifyLifecycleListeners(Consumer<LifecycleListener> event) {
        for (LifecycleListener lifecycleListener : this.lifecycleListeners) {
            try {
                event.accept(lifecycleListener);
            }
            catch (Exception e) {
                LOGGER.atError().setCause((Throwable)e).log("Error notifying lifecycle listener");
            }
        }
    }

    public static interface LifecycleListener {
        default public void onTaskStarted(String task) {
        }

        default public void onTaskWorkerStarted(String task, int worker) {
        }

        default public void onTaskWorkerStopRequested(String task, int worker) {
        }

        default public void onTaskWorkerStopped(String task, int worker) {
        }

        default public void onTaskWorkerCompletedNormally(String task, int worker) {
        }

        default public void onTaskWorkerCompletedExceptionally(String task, int worker, Throwable cause) {
        }

        default public void onTaskCancelRequested(String task, int worker) {
        }

        default public void onTaskCompleted(String task) {
        }

        default public void onTaskCancelled(String task) {
        }

        default public void onTaskFailed(String task, ExecutionException cause) {
        }
    }

    public record Metrics<WorkerMetricsT>(String id, TaskPhase phase, TaskState state, WorkerMetricsT worker, long workers, long commencements, long completions, long normalCompletions, long stopCompletions, long exceptionalCompletions, long pending, long consumed, long waits, long produced, long stalls) {
        public Metrics {
            Objects.requireNonNull(id);
            Objects.requireNonNull(phase);
            Objects.requireNonNull(state);
            Objects.requireNonNull(worker);
            if (workers < 0L) {
                throw new IllegalArgumentException("workers must be greater than or equal to 0");
            }
            if (commencements < 0L) {
                throw new IllegalArgumentException("commencements must be greater than or equal to 0");
            }
            if (completions < 0L) {
                throw new IllegalArgumentException("completions must be greater than or equal to 0");
            }
            if (normalCompletions < 0L) {
                throw new IllegalArgumentException("normalCompletions must be greater than or equal to 0");
            }
            if (stopCompletions < 0L) {
                throw new IllegalArgumentException("stopCompletions must be greater than or equal to 0");
            }
            if (exceptionalCompletions < 0L) {
                throw new IllegalArgumentException("exceptionalCompletions must be greater than or equal to 0");
            }
            if (pending < 0L) {
                throw new IllegalArgumentException("pending must be greater than or equal to 0");
            }
            if (consumed < 0L) {
                throw new IllegalArgumentException("consumed must be greater than or equal to 0");
            }
            if (waits < 0L) {
                throw new IllegalArgumentException("waits must be greater than or equal to 0");
            }
            if (produced < 0L) {
                throw new IllegalArgumentException("produced must be greater than or equal to 0");
            }
            if (stalls < 0L) {
                throw new IllegalArgumentException("stalls must be greater than or equal to 0");
            }
        }
    }

    private record StartedWorker<MetricsT>(int id, WorkerRunner worker, Future<?> future) {
    }

    public static enum TaskPhase {
        READY,
        RUNNING,
        FINISHED;

    }

    public static enum TaskState {
        READY(TaskPhase.READY){

            @Override
            public TaskState to(TaskState target) {
                if (target == RUNNING) {
                    return target;
                }
                throw new IllegalStateException("Invalid transition from READY to " + String.valueOf((Object)target));
            }
        }
        ,
        RUNNING(TaskPhase.RUNNING){

            @Override
            public TaskState to(TaskState target) {
                if (target == SUCCEEDED || target == CANCELED || target == FAILED) {
                    return target;
                }
                throw new IllegalStateException("Invalid transition from RUNNING to " + String.valueOf((Object)target));
            }
        }
        ,
        SUCCEEDED(TaskPhase.FINISHED){

            @Override
            public TaskState to(TaskState target) {
                throw new IllegalStateException("Invalid transition from SUCCEEDED to " + String.valueOf((Object)target));
            }
        }
        ,
        CANCELED(TaskPhase.FINISHED){

            @Override
            public TaskState to(TaskState target) {
                throw new IllegalStateException("Invalid transition from CANCELED to " + String.valueOf((Object)target));
            }
        }
        ,
        FAILED(TaskPhase.FINISHED){

            @Override
            public TaskState to(TaskState target) {
                throw new IllegalStateException("Invalid transition from FAILED to " + String.valueOf((Object)target));
            }
        };

        private final TaskPhase phase;

        private TaskState(TaskPhase phase) {
            this.phase = Objects.requireNonNull(phase);
        }

        public TaskPhase getPhase() {
            return this.phase;
        }

        public abstract TaskState to(TaskState var1);
    }

    @FunctionalInterface
    public static interface WorkerBody {
        public void run() throws Exception;
    }

    public static interface WorkerBodyFactory<MetricsT> {
        public WorkerBody newWorkerBody();

        public MetricsT checkMetrics();

        public MetricsT flushMetrics();
    }

    record WorkerCompletedEvent(int id) implements WorkerEvent
    {
    }

    static sealed interface WorkerEvent
    permits WorkerStartedEvent, WorkerCompletedEvent, WorkerFailedEvent, WorkerStoppedEvent {
    }

    record WorkerFailedEvent(int id, Throwable cause) implements WorkerEvent
    {
    }

    private class WorkerRunner
    implements Runnable {
        private final int id;
        private final WorkerBody body;

        public WorkerRunner(int id, WorkerBody body) {
            this.id = id;
            this.body = Objects.requireNonNull(body);
        }

        @Override
        public void run() {
            try {
                this.offer(new WorkerStartedEvent(this.id));
                LOGGER.atDebug().addKeyValue("id", (Object)this.id).log("Worker started");
                this.body.run();
                this.offer(new WorkerCompletedEvent(this.id));
                LOGGER.atDebug().addKeyValue("id", (Object)this.id).log("Worker completed normally");
            }
            catch (InterruptedException e) {
                this.offer(new WorkerStoppedEvent(this.id));
                LOGGER.atInfo().addKeyValue("id", (Object)this.id).log("Worker stopped");
            }
            catch (Throwable t) {
                this.offer(new WorkerFailedEvent(this.id, t));
                LOGGER.atError().addKeyValue("id", (Object)this.id).setCause(t).log("Worker completed exceptionally");
            }
        }

        private void offer(WorkerEvent event) {
            boolean success = TaskManager.this.events.offer(event);
            assert (success);
        }
    }

    record WorkerStartedEvent(int id) implements WorkerEvent
    {
    }

    record WorkerStoppedEvent(int id) implements WorkerEvent
    {
    }
}

