/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.build;

import io.aleph0.yap.core.ConsumerWorker;
import io.aleph0.yap.core.Pipeline;
import io.aleph0.yap.core.ProcessorWorker;
import io.aleph0.yap.core.ProducerWorker;
import io.aleph0.yap.core.Source;
import io.aleph0.yap.core.build.ConsumerTaskBuilder;
import io.aleph0.yap.core.build.PipelineControllerBuilder;
import io.aleph0.yap.core.build.ProcessorTaskBuilder;
import io.aleph0.yap.core.build.ProducerTaskBuilder;
import io.aleph0.yap.core.build.TaskBuilder;
import io.aleph0.yap.core.pipeline.DefaultPipeline;
import io.aleph0.yap.core.pipeline.DefaultPipelineController;
import io.aleph0.yap.core.pipeline.PipelineController;
import io.aleph0.yap.core.pipeline.PipelineManager;
import io.aleph0.yap.core.pipeline.PipelineWrapper;
import io.aleph0.yap.core.task.TaskController;
import io.aleph0.yap.core.task.TaskManager;
import io.aleph0.yap.core.transport.Queue;
import io.aleph0.yap.core.transport.Topic;
import io.aleph0.yap.core.transport.channel.DefaultChannel;
import io.aleph0.yap.core.util.DirectedGraphs;
import io.aleph0.yap.core.util.NoMetrics;
import io.aleph0.yap.core.worker.ConsumerWorkerFactory;
import io.aleph0.yap.core.worker.MeasuredConsumerWorker;
import io.aleph0.yap.core.worker.MeasuredProcessorWorker;
import io.aleph0.yap.core.worker.MeasuredProducerWorker;
import io.aleph0.yap.core.worker.ProcessorWorkerFactory;
import io.aleph0.yap.core.worker.ProducerWorkerFactory;
import io.aleph0.yap.core.worker.SingletonConsumerWorkerFactory;
import io.aleph0.yap.core.worker.SingletonProcessorWorkerFactory;
import io.aleph0.yap.core.worker.SingletonProducerWorkerFactory;
import java.lang.runtime.SwitchBootstraps;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class PipelineBuilder {
    private static final AtomicInteger sequence = new AtomicInteger(1);
    private ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private PipelineControllerBuilder controller = DefaultPipelineController.builder();
    private final Map<String, TaskBuilder> tasks = new LinkedHashMap<String, TaskBuilder>();
    private final List<PipelineWrapper> wrappers = new ArrayList<PipelineWrapper>();
    private final List<Pipeline.LifecycleListener> lifecycleListeners = new ArrayList<Pipeline.LifecycleListener>();

    public PipelineBuilder setExecutor(ExecutorService executor) {
        if (executor == null) {
            throw new IllegalArgumentException("executor must not be null");
        }
        this.executor = executor;
        return this;
    }

    public PipelineBuilder setPipelineController(PipelineControllerBuilder controller) {
        if (controller == null) {
            throw new IllegalArgumentException("controller must not be null");
        }
        this.controller = controller;
        return this;
    }

    public <OutputT> ProducerTaskBuilder<OutputT, NoMetrics> addProducer(String id, ProducerWorker<OutputT> worker) {
        return this.addProducer(id, MeasuredProducerWorker.withNoMetrics(worker));
    }

    public <OutputT, MetricsT> ProducerTaskBuilder<OutputT, MetricsT> addProducer(String id, MeasuredProducerWorker<OutputT, MetricsT> worker) {
        if (worker == null) {
            throw new NullPointerException("worker");
        }
        return this.addProducer(id, new SingletonProducerWorkerFactory<OutputT, MetricsT>(worker));
    }

    public <OutputT, MetricsT> ProducerTaskBuilder<OutputT, MetricsT> addProducer(String id, ProducerWorkerFactory<OutputT, MetricsT> workerFactory) {
        if (id == null) {
            throw new NullPointerException("id");
        }
        if (workerFactory == null) {
            throw new NullPointerException("workerFactory");
        }
        if (this.tasks.containsKey(id)) {
            throw new IllegalArgumentException("Task with id " + id + " already exists");
        }
        ProducerTaskBuilder<OutputT, MetricsT> result = new ProducerTaskBuilder<OutputT, MetricsT>(id, workerFactory);
        this.tasks.put(id, result);
        return result;
    }

    public <InputT, OutputT> ProcessorTaskBuilder<InputT, OutputT, NoMetrics> addProcessor(String id, ProcessorWorker<InputT, OutputT> worker) {
        return this.addProcessor(id, MeasuredProcessorWorker.withNoMetrics(worker));
    }

    public <InputT, OutputT, MetricsT> ProcessorTaskBuilder<InputT, OutputT, MetricsT> addProcessor(String id, MeasuredProcessorWorker<InputT, OutputT, MetricsT> worker) {
        if (worker == null) {
            throw new NullPointerException("worker");
        }
        return this.addProcessor(id, new SingletonProcessorWorkerFactory<InputT, OutputT, MetricsT>(worker));
    }

    public <InputT, OutputT, MetricsT> ProcessorTaskBuilder<InputT, OutputT, MetricsT> addProcessor(String id, ProcessorWorkerFactory<InputT, OutputT, MetricsT> workerFactory) {
        if (id == null) {
            throw new NullPointerException("id");
        }
        if (workerFactory == null) {
            throw new NullPointerException("workerFactory");
        }
        if (this.tasks.containsKey(id)) {
            throw new IllegalArgumentException("Task with id " + id + " already exists");
        }
        ProcessorTaskBuilder<InputT, OutputT, MetricsT> result = new ProcessorTaskBuilder<InputT, OutputT, MetricsT>(id, workerFactory);
        this.tasks.put(id, result);
        return result;
    }

    public <InputT> ConsumerTaskBuilder<InputT, NoMetrics> addConsumer(String id, ConsumerWorker<InputT> worker) {
        return this.addConsumer(id, MeasuredConsumerWorker.withNoMetrics(worker));
    }

    public <InputT, MetricsT> ConsumerTaskBuilder<InputT, MetricsT> addConsumer(String id, MeasuredConsumerWorker<InputT, MetricsT> worker) {
        if (worker == null) {
            throw new NullPointerException("worker");
        }
        return this.addConsumer(id, new SingletonConsumerWorkerFactory<InputT, MetricsT>(worker));
    }

    public <InputT, MetricsT> ConsumerTaskBuilder<InputT, MetricsT> addConsumer(String id, ConsumerWorkerFactory<InputT, MetricsT> workerFactory) {
        if (id == null) {
            throw new NullPointerException("id");
        }
        if (workerFactory == null) {
            throw new NullPointerException("workerFactory");
        }
        if (this.tasks.containsKey(id)) {
            throw new IllegalArgumentException("Task with id " + id + " already exists");
        }
        ConsumerTaskBuilder<InputT, MetricsT> result = new ConsumerTaskBuilder<InputT, MetricsT>(id, workerFactory);
        this.tasks.put(id, result);
        return result;
    }

    public PipelineBuilder addWrapper(PipelineWrapper wrapper) {
        this.wrappers.add(wrapper);
        return this;
    }

    public PipelineBuilder addLifecycleListener(Pipeline.LifecycleListener listener) {
        this.lifecycleListeners.add(listener);
        return this;
    }

    public Pipeline build() {
        TaskBuilder consumer;
        ProducerTaskBuilder producer;
        ProducerTaskBuilder producer2;
        LinkedHashMap<String, List> subscriptions = new LinkedHashMap<String, List>();
        LinkedHashMap<String, List> subscribers = new LinkedHashMap<String, List>();
        block20: for (TaskBuilder task : this.tasks.values()) {
            Object object;
            Objects.requireNonNull(task);
            switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{ProducerTaskBuilder.class, ProcessorTaskBuilder.class, ConsumerTaskBuilder.class}, (Object)object, 0)) {
                case 0: {
                    producer2 = (ProducerTaskBuilder)object;
                    for (String subscriber : producer2.subscribers) {
                        DefaultChannel channel = new DefaultChannel();
                        subscriptions.computeIfAbsent(producer2.id, k -> new ArrayList()).add(channel);
                        subscribers.computeIfAbsent(subscriber, k -> new ArrayList()).add(channel);
                    }
                    continue block20;
                }
                case 1: {
                    ProcessorTaskBuilder processor = (ProcessorTaskBuilder)object;
                    for (String subscriber : processor.subscribers) {
                        DefaultChannel channel = new DefaultChannel();
                        subscriptions.computeIfAbsent(processor.id, k -> new ArrayList()).add(channel);
                        subscribers.computeIfAbsent(subscriber, k -> new ArrayList()).add(channel);
                    }
                    continue block20;
                }
                case 2: {
                    ConsumerTaskBuilder consumer2 = (ConsumerTaskBuilder)object;
                    break;
                }
                default: {
                    throw new MatchException(null, null);
                }
            }
        }
        LinkedHashMap topics = new LinkedHashMap();
        for (TaskBuilder task : this.tasks.values()) {
            Objects.requireNonNull(task);
            switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{ProducerTaskBuilder.class, ProcessorTaskBuilder.class, ConsumerTaskBuilder.class}, (Object)producer2, 0)) {
                case 0: {
                    producer = producer2;
                    if (!subscriptions.containsKey(producer.id)) {
                        throw new IllegalArgumentException("Producer " + producer.id + " has no subscribers");
                    }
                    topics.put(producer.id, producer.topic.build((List)subscriptions.get(producer.id)));
                    break;
                }
                case 1: {
                    ProcessorTaskBuilder processor = (ProcessorTaskBuilder)((Object)producer2);
                    if (!subscriptions.containsKey(processor.id)) {
                        throw new IllegalArgumentException("Processor " + processor.id + " has no subscribers");
                    }
                    topics.put(processor.id, processor.topic.build((List)subscriptions.get(processor.id)));
                    break;
                }
                case 2: {
                    ConsumerTaskBuilder consumer3 = (ConsumerTaskBuilder)((Object)producer2);
                    break;
                }
                default: {
                    throw new MatchException(null, null);
                }
            }
        }
        LinkedHashMap queues = new LinkedHashMap();
        for (TaskBuilder task : this.tasks.values()) {
            Objects.requireNonNull(task);
            switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{ProducerTaskBuilder.class, ProcessorTaskBuilder.class, ConsumerTaskBuilder.class}, (Object)producer, 0)) {
                case 0: {
                    ProducerTaskBuilder producer3 = producer;
                    break;
                }
                case 1: {
                    ProcessorTaskBuilder processor = (ProcessorTaskBuilder)((Object)producer);
                    if (!subscribers.containsKey(processor.id)) {
                        throw new IllegalArgumentException("Processor " + processor.id + " has no producers");
                    }
                    queues.put(processor.id, processor.queue.build((List)subscribers.get(processor.id)));
                    break;
                }
                case 2: {
                    consumer = (ConsumerTaskBuilder)((Object)producer);
                    if (!subscribers.containsKey(consumer.id)) {
                        throw new IllegalArgumentException("Consumer " + consumer.id + " has no producers");
                    }
                    queues.put(consumer.id, consumer.queue.build((List)subscribers.get(consumer.id)));
                    break;
                }
                default: {
                    throw new MatchException(null, null);
                }
            }
        }
        ArrayList taskBodies = new ArrayList();
        for (TaskBuilder task : this.tasks.values()) {
            String id = task.getId();
            Objects.requireNonNull(task);
            taskBodies.add(switch (SwitchBootstraps.typeSwitch("typeSwitch", new Object[]{ProducerTaskBuilder.class, ProcessorTaskBuilder.class, ConsumerTaskBuilder.class}, (Object)consumer, 0)) {
                case 0 -> {
                    final ProducerTaskBuilder producer = (ProducerTaskBuilder)consumer;
                    final Topic topic = (Topic)topics.get(id);
                    TaskController controller = producer.controller.build(topic);
                    TaskManager.WorkerBodyFactory<Object> workerBodyFactory = new TaskManager.WorkerBodyFactory<Object>(){

                        @Override
                        public TaskManager.WorkerBody newWorkerBody() {
                            MeasuredProducerWorker body = producer.workerFactory.newProducerWorker();
                            return () -> body.produce(m -> {
                                if (Thread.interrupted()) {
                                    Thread.currentThread().interrupt();
                                    throw new InterruptedException();
                                }
                                topic.publish(m);
                            });
                        }

                        @Override
                        public Object checkMetrics() {
                            return producer.workerFactory.checkMetrics();
                        }

                        @Override
                        public Object flushMetrics() {
                            return producer.workerFactory.flushMetrics();
                        }
                    };
                    yield new TaskManager<Object>(task.getId(), producer.subscribers, this.executor, controller, workerBodyFactory, null, topic);
                }
                case 1 -> {
                    final ProcessorTaskBuilder processor = (ProcessorTaskBuilder)consumer;
                    final Topic topic = (Topic)topics.get(id);
                    final Queue queue = (Queue)queues.get(id);
                    TaskController controller = processor.controller.build(queue, topic);
                    TaskManager.WorkerBodyFactory<Object> workerBodyFactory = new TaskManager.WorkerBodyFactory<Object>(){

                        @Override
                        public TaskManager.WorkerBody newWorkerBody() {
                            MeasuredProcessorWorker body = processor.workerFactory.newProcessorWorker();
                            return () -> body.process(new Source(){

                                public Object tryTake() {
                                    return queue.tryReceive();
                                }

                                public Object take(Duration timeout) throws InterruptedException, TimeoutException {
                                    if (Thread.interrupted()) {
                                        Thread.currentThread().interrupt();
                                        throw new InterruptedException();
                                    }
                                    return queue.receive(timeout);
                                }

                                public Object take() throws InterruptedException {
                                    if (Thread.interrupted()) {
                                        Thread.currentThread().interrupt();
                                        throw new InterruptedException();
                                    }
                                    return queue.receive();
                                }
                            }, m -> {
                                if (Thread.interrupted()) {
                                    Thread.currentThread().interrupt();
                                    throw new InterruptedException();
                                }
                                topic.publish(m);
                            });
                        }

                        @Override
                        public Object checkMetrics() {
                            return processor.workerFactory.checkMetrics();
                        }

                        @Override
                        public Object flushMetrics() {
                            return processor.workerFactory.flushMetrics();
                        }
                    };
                    yield new TaskManager<Object>(task.getId(), processor.subscribers, this.executor, controller, workerBodyFactory, queue, topic);
                }
                case 2 -> {
                    final TaskBuilder consumer = consumer;
                    final Queue queue = (Queue)queues.get(id);
                    TaskController controller = consumer.controller.build(queue);
                    TaskManager.WorkerBodyFactory<Object> workerBodyFactory = new TaskManager.WorkerBodyFactory<Object>(){

                        @Override
                        public TaskManager.WorkerBody newWorkerBody() {
                            MeasuredConsumerWorker body = consumer.workerFactory.newConsumerWorker();
                            return () -> body.consume(new Source(){

                                public Object tryTake() {
                                    return queue.tryReceive();
                                }

                                public Object take(Duration timeout) throws InterruptedException, TimeoutException {
                                    if (Thread.interrupted()) {
                                        Thread.currentThread().interrupt();
                                        throw new InterruptedException();
                                    }
                                    return queue.receive(timeout);
                                }

                                public Object take() throws InterruptedException {
                                    if (Thread.interrupted()) {
                                        Thread.currentThread().interrupt();
                                        throw new InterruptedException();
                                    }
                                    return queue.receive();
                                }
                            });
                        }

                        @Override
                        public Object checkMetrics() {
                            return consumer.workerFactory.checkMetrics();
                        }

                        @Override
                        public Object flushMetrics() {
                            return consumer.workerFactory.flushMetrics();
                        }
                    };
                    yield new TaskManager<Object>(task.getId(), Set.of(), this.executor, controller, workerBodyFactory, queue, null);
                }
                default -> throw new MatchException(null, null);
            });
        }
        Map<String, Set<String>> graph = taskBodies.stream().map(t -> Map.entry(t.getId(), t.getSubscribers())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (!DirectedGraphs.isWeaklyConnected(graph)) {
            throw new IllegalArgumentException("Pipeline contains disconnected node(s)");
        }
        DirectedGraphs.findCycle(graph).ifPresent(cycle -> {
            throw new IllegalArgumentException("Pipeline contains cycle(s): " + String.join((CharSequence)" -> ", cycle));
        });
        PipelineController pipelineController = this.controller.build(graph);
        int id = sequence.getAndIncrement();
        Pipeline result = new DefaultPipeline(new PipelineManager(id, this.executor, pipelineController, taskBodies));
        for (PipelineWrapper wrapper : this.wrappers) {
            result = wrapper.wrapPipeline(result);
        }
        for (Pipeline.LifecycleListener listener : this.lifecycleListeners) {
            result.addLifecycleListener(listener);
        }
        return result;
    }

    public Pipeline buildAndStart() {
        Pipeline result = this.build();
        result.start();
        return result;
    }

    public Future<?> buildAndStartAsFuture() {
        return this.buildAndStartAsCompletableFuture();
    }

    public CompletableFuture<Void> buildAndStartAsCompletableFuture() {
        final Pipeline pipeline = this.build();
        final CompletableFuture<Void> result = new CompletableFuture<Void>(){

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                pipeline.cancel();
                return super.cancel(mayInterruptIfRunning);
            }
        };
        pipeline.addLifecycleListener(new Pipeline.LifecycleListener(){

            @Override
            public void onPipelineCompleted(int pipelineId) {
                result.complete(null);
            }

            @Override
            public void onPipelineFailed(int pipelineId, Throwable cause) {
                result.completeExceptionally(cause);
            }
        });
        pipeline.start();
        return result;
    }
}

