/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.agrona.concurrent.IdleStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.InnerPoller;
import reactor.aeron.MessagePublication;
import reactor.aeron.OnDisposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

public final class AeronEventLoop
implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronEventLoop.class);
    private final IdleStrategy idleStrategy;
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private final Mono<Worker> workerMono;
    private volatile Thread thread;
    private final Queue<CommandTask> commandTasks = new ConcurrentLinkedQueue<CommandTask>();
    private final List<MessagePublication> publications = new ArrayList<MessagePublication>();
    private final List<InnerPoller> subscriptions = new ArrayList<InnerPoller>();

    AeronEventLoop(IdleStrategy idleStrategy) {
        this.idleStrategy = idleStrategy;
        this.workerMono = Mono.fromCallable(this::createWorker).cache();
    }

    private Worker createWorker() {
        ThreadFactory threadFactory = AeronEventLoop.defaultThreadFactory();
        Worker w = new Worker();
        this.thread = threadFactory.newThread(w);
        this.thread.start();
        return w;
    }

    boolean inEventLoop() {
        return this.thread == Thread.currentThread();
    }

    public Mono<Void> execute(Consumer<MonoSink<Void>> consumer) {
        return this.worker().flatMap(w -> this.command(consumer));
    }

    public Mono<Void> register(MessagePublication publication) {
        return this.worker().flatMap(w -> this.command(sink -> this.register(publication, (MonoSink<Void>)sink)));
    }

    private void register(MessagePublication publication, MonoSink<Void> sink) {
        Objects.requireNonNull(publication, "messagePublication must be not null");
        this.publications.add(publication);
        sink.success();
    }

    public Mono<Void> register(InnerPoller innerPoller) {
        return this.worker().flatMap(w -> this.command(sink -> this.register(innerPoller, (MonoSink<Void>)sink)));
    }

    private void register(InnerPoller innerPoller, MonoSink<Void> sink) {
        Objects.requireNonNull(innerPoller, "innerPoller must be not null");
        this.subscriptions.add(innerPoller);
        sink.success();
    }

    public Mono<Void> dispose(MessagePublication publication) {
        return this.worker().flatMap(w -> this.command(sink -> this.dispose(publication, (MonoSink<Void>)sink)));
    }

    private void dispose(MessagePublication publication, MonoSink<Void> sink) {
        this.publications.removeIf(p -> p == publication);
        Optional.ofNullable(publication).ifPresent(MessagePublication::close);
        sink.success();
    }

    public Mono<Void> dispose(InnerPoller innerPoller) {
        return this.worker().flatMap(w -> this.command(sink -> this.dispose(innerPoller, (MonoSink<Void>)sink)));
    }

    private void dispose(InnerPoller innerPoller, MonoSink<Void> sink) {
        this.subscriptions.removeIf(s -> s == innerPoller);
        Optional.ofNullable(innerPoller).ifPresent(InnerPoller::close);
        sink.success();
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    private static ThreadFactory defaultThreadFactory() {
        return r -> {
            Thread thread = new Thread(r);
            thread.setName("aeron-event-loop");
            thread.setUncaughtExceptionHandler((t, e) -> logger.error("Uncaught exception occurred: {}", (Object)e, (Object)e));
            return thread;
        };
    }

    private Mono<Worker> worker() {
        return this.workerMono.takeUntilOther(this.listenDispose());
    }

    private Mono<Void> command(Consumer<MonoSink<Void>> consumer) {
        return Mono.create(sink -> this.commandTasks.add(new CommandTask((MonoSink)sink, consumer)));
    }

    private <T> Mono<T> listenDispose() {
        return this.dispose.map(avoid -> avoid).switchIfEmpty(Mono.error(Exceptions::failWithRejected));
    }

    private static class CommandTask
    implements Runnable {
        private final MonoSink<Void> sink;
        private final Consumer<MonoSink<Void>> consumer;

        private CommandTask(MonoSink<Void> sink, Consumer<MonoSink<Void>> consumer) {
            this.sink = sink;
            this.consumer = consumer;
        }

        @Override
        public void run() {
            try {
                this.consumer.accept(this.sink);
            }
            catch (Exception ex) {
                logger.warn("Exception occurred on command task: {}", (Throwable)ex);
                this.sink.error((Throwable)ex);
            }
        }

        private void cancel() {
            this.sink.error((Throwable)Exceptions.failWithCancel());
        }
    }

    private class Worker
    implements Runnable {
        private Worker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!AeronEventLoop.this.dispose.isDisposed()) {
                int i;
                CommandTask task;
                while ((task = (CommandTask)AeronEventLoop.this.commandTasks.poll()) != null) {
                    task.run();
                }
                int result = 0;
                int n = AeronEventLoop.this.publications.size();
                for (i = 0; i < n; ++i) {
                    result += ((MessagePublication)AeronEventLoop.this.publications.get(i)).proceed();
                }
                n = AeronEventLoop.this.subscriptions.size();
                for (i = 0; i < n; ++i) {
                    result += ((InnerPoller)AeronEventLoop.this.subscriptions.get(i)).poll();
                }
                AeronEventLoop.this.idleStrategy.idle(result);
            }
            try {
                this.disposeCommandTasks();
                this.disposeSubscriptions();
                this.disposePublications();
            }
            finally {
                AeronEventLoop.this.onDispose.onComplete();
            }
        }

        private void disposeCommandTasks() {
            CommandTask task;
            while ((task = (CommandTask)AeronEventLoop.this.commandTasks.poll()) != null) {
                task.cancel();
            }
        }

        private void disposePublications() {
            Iterator it = AeronEventLoop.this.publications.iterator();
            while (it.hasNext()) {
                MessagePublication publication = (MessagePublication)it.next();
                try {
                    publication.close();
                }
                catch (Exception ex) {
                    logger.warn("Exception occurred on closing publication: {}, cause: {}", (Object)publication, (Object)ex);
                }
                it.remove();
            }
        }

        private void disposeSubscriptions() {
            Iterator it = AeronEventLoop.this.subscriptions.iterator();
            while (it.hasNext()) {
                InnerPoller innerPoller = (InnerPoller)it.next();
                try {
                    innerPoller.close();
                }
                catch (Exception ex) {
                    logger.warn("Exception occurred on closing subscription: {}, cause: {}", (Object)innerPoller, (Object)ex);
                }
                it.remove();
            }
        }
    }
}

