/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextMap;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.MulticastUtils;
import io.servicetalk.concurrent.api.PublishAndSubscribeOnPublishers;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.RejectedSubscribeException;
import io.servicetalk.concurrent.internal.SignalOffloader;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceArray;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class MulticastPublisher<T>
extends AbstractNoHandleSubscribePublisher<T>
implements PublisherSource.Subscriber<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MulticastPublisher.class);
    private static final AtomicIntegerFieldUpdater<MulticastPublisher> notCancelledCountUpdater = AtomicIntegerFieldUpdater.newUpdater(MulticastPublisher.class, "notCancelledCount");
    private static final AtomicIntegerFieldUpdater<MulticastPublisher> subscriberCountUpdater = AtomicIntegerFieldUpdater.newUpdater(MulticastPublisher.class, "subscriberCount");
    private static final AtomicLongFieldUpdater<MulticastPublisher> sourceRequestedUpdater = AtomicLongFieldUpdater.newUpdater(MulticastPublisher.class, "sourceRequested");
    private static final PublisherSource.Subscriber NOOP_SUBSCRIBER = new PublisherSource.Subscriber(){

        public void onSubscribe(PublisherSource.Subscription s) {
            s.cancel();
        }

        public void onNext(Object o) {
        }

        public void onError(Throwable ignore) {
        }

        public void onComplete() {
        }
    };
    private boolean terminatedPrematurely;
    private boolean inOnNext;
    @Nullable
    private Queue<Object> reentryQueue;
    private final DelayedSubscription delayedSubscription = new DelayedSubscription();
    private final ConcurrentSubscription subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)this.delayedSubscription);
    private volatile int notCancelledCount;
    private volatile int subscriberCount;
    private volatile long pendingSourceRequested;
    private volatile long sourceRequested;
    private final int maxQueueSize;
    private final AtomicReferenceArray<PublisherSource.Subscriber<? super T>> subscribers;
    private final Publisher<T> original;

    MulticastPublisher(Publisher<T> original, int expectedSubscribers, Executor executor) {
        this(original, expectedSubscribers, 10, executor);
    }

    MulticastPublisher(Publisher<T> original, int expectedSubscribers, int maxQueueSize, Executor executor) {
        super(executor);
        if (expectedSubscribers < 2) {
            throw new IllegalArgumentException("expectedSubscribers: " + expectedSubscribers + " (expected >=2)");
        }
        if (maxQueueSize < 1) {
            throw new IllegalArgumentException("maxQueueSize: " + maxQueueSize + " (expected >=1)");
        }
        this.original = original;
        this.notCancelledCount = expectedSubscribers;
        this.maxQueueSize = maxQueueSize;
        this.subscribers = new AtomicReferenceArray(expectedSubscribers);
    }

    @Override
    void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber, SignalOffloader signalOffloader, AsyncContextMap contextMap, AsyncContextProvider contextProvider) {
        block1: {
            int subscriberCount;
            do {
                if ((subscriberCount = this.subscriberCount) != this.subscribers.length() && subscriberCount >= 0) continue;
                PublishAndSubscribeOnPublishers.deliverOnSubscribeAndOnError(subscriber, signalOffloader, contextMap, contextProvider, (Throwable)new RejectedSubscribeException("Only " + this.subscribers.length() + " subscribers are allowed!"));
                break block1;
            } while (!subscriberCountUpdater.compareAndSet(this, subscriberCount, subscriberCount + 1));
            MulticastSubscriber<T> multicastSubscriber = new MulticastSubscriber<T>(this, subscriber, subscriberCount);
            this.subscribers.set(subscriberCount, multicastSubscriber);
            multicastSubscriber.onSubscribe((PublisherSource.Subscription)this.subscription);
            if (subscriberCount != this.subscribers.length() - 1) break block1;
            this.original.delegateSubscribe(this, signalOffloader, contextMap, contextProvider);
        }
    }

    public void onSubscribe(PublisherSource.Subscription s) {
        this.delayedSubscription.delayedSubscription(s);
    }

    private boolean offerNext(@Nullable Object o) {
        assert (this.reentryQueue != null);
        return this.reentryQueue.size() < this.maxQueueSize && this.reentryQueue.offer(o == null ? SubscriberApiUtils.NULL_TOKEN : o);
    }

    private void offerTerminal(TerminalNotification notification) {
        assert (this.reentryQueue != null);
        this.reentryQueue.offer(notification);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(T t) {
        block18: {
            if (this.terminatedPrematurely) {
                return;
            }
            if (this.inOnNext) {
                if (this.reentryQueue == null) {
                    this.reentryQueue = new ArrayDeque<Object>(Math.min(4, this.maxQueueSize));
                }
                if (!this.offerNext(t)) {
                    this.terminatedPrematurely = true;
                    this.subscription.cancel();
                    this.offerTerminal(TerminalNotification.error((Throwable)new QueueFullException("global", this.maxQueueSize)));
                }
            } else {
                this.inOnNext = true;
                try {
                    this.onNext0(t);
                    if (this.reentryQueue == null || this.reentryQueue.isEmpty()) break block18;
                    while (true) {
                        TerminalNotification terminalNotification;
                        Object next;
                        if ((next = this.reentryQueue.poll()) != null) {
                            if (next instanceof TerminalNotification) {
                                terminalNotification = (TerminalNotification)next;
                                try {
                                    this.terminateFromQueuedEvent(terminalNotification);
                                }
                                catch (Throwable throwable) {
                                    LOGGER.error("Error from terminal callbacks to subscriber {}", (Object)this, (Object)throwable);
                                }
                                return;
                            }
                            Object nextT = next == SubscriberApiUtils.NULL_TOKEN ? null : next;
                            this.onNext0(nextT);
                            continue;
                        }
                        if (this.reentryQueue.peek() instanceof TerminalNotification) {
                            terminalNotification = (TerminalNotification)this.reentryQueue.poll();
                            try {
                                this.terminateFromQueuedEvent(terminalNotification);
                            }
                            catch (Throwable throwable) {
                                LOGGER.error("Error from terminal callbacks to subscriber {}", (Object)this, (Object)throwable);
                            }
                            return;
                        }
                        if (this.reentryQueue.isEmpty()) break;
                    }
                }
                finally {
                    this.inOnNext = false;
                }
            }
        }
    }

    private void terminateFromQueuedEvent(TerminalNotification terminalNotification) {
        if (terminalNotification.cause() == null) {
            this.onComplete0();
        } else {
            this.onError0(terminalNotification.cause());
        }
    }

    public void onError(Throwable t) {
        if (this.terminatedPrematurely) {
            return;
        }
        if (this.inOnNext && this.reentryQueue != null && !this.reentryQueue.isEmpty()) {
            this.offerTerminal(TerminalNotification.error((Throwable)t));
        } else {
            this.onError0(t);
        }
    }

    private void onError0(Throwable t) {
        Throwable overallCause = null;
        for (int i = 0; i < this.subscribers.length(); ++i) {
            PublisherSource.Subscriber subscriber = this.subscribers.getAndSet(i, MulticastPublisher.noopSubscriber());
            try {
                subscriber.onError(t);
                continue;
            }
            catch (Throwable cause) {
                if (overallCause == null) {
                    overallCause = cause;
                    continue;
                }
                overallCause.addSuppressed(cause);
            }
        }
        if (overallCause != null) {
            PlatformDependent.throwException(overallCause);
        }
    }

    public void onComplete() {
        if (this.terminatedPrematurely) {
            return;
        }
        if (this.inOnNext && this.reentryQueue != null && !this.reentryQueue.isEmpty()) {
            this.offerTerminal(TerminalNotification.complete());
        } else {
            this.onComplete0();
        }
    }

    private void onComplete0() {
        Throwable overallCause = null;
        for (int i = 0; i < this.subscribers.length(); ++i) {
            PublisherSource.Subscriber subscriber = this.subscribers.getAndSet(i, MulticastPublisher.noopSubscriber());
            try {
                subscriber.onComplete();
                continue;
            }
            catch (Throwable cause) {
                if (overallCause == null) {
                    overallCause = cause;
                    continue;
                }
                overallCause.addSuppressed(cause);
            }
        }
        if (overallCause != null) {
            PlatformDependent.throwException(overallCause);
        }
    }

    private void onNext0(@Nullable T t) {
        for (int i = 0; i < this.subscribers.length(); ++i) {
            this.subscribers.get(i).onNext(t);
        }
    }

    void requestIndividualSubscriber(MulticastSubscriber<T> subscriber) {
        long sourceRequested;
        long individualSourceRequested = subscriber.sourceRequested();
        while ((sourceRequested = this.sourceRequested) < individualSourceRequested) {
            if (!sourceRequestedUpdater.compareAndSet(this, sourceRequested, individualSourceRequested)) continue;
            this.subscription.request(individualSourceRequested - sourceRequested);
            break;
        }
    }

    void cancelIndividualSubscriber(int subscriberIndex) {
        PublisherSource.Subscriber subscriber = this.subscribers.getAndSet(subscriberIndex, MulticastPublisher.noopSubscriber());
        if (subscriber != NOOP_SUBSCRIBER && notCancelledCountUpdater.decrementAndGet(this) == 0) {
            this.subscription.cancel();
        }
    }

    private static <X> PublisherSource.Subscriber<X> noopSubscriber() {
        return NOOP_SUBSCRIBER;
    }

    private static final class MulticastSubscriber<T>
    extends MulticastUtils.IndividualMulticastSubscriber<T>
    implements PublisherSource.Subscriber<T> {
        private static final Logger LOGGER = LoggerFactory.getLogger(MulticastSubscriber.class);
        private final MulticastPublisher<T> source;
        final int subscriberIndex;

        MulticastSubscriber(MulticastPublisher<T> source, PublisherSource.Subscriber<? super T> target, int subscriberIndex) {
            super(((MulticastPublisher)source).maxQueueSize, target);
            this.source = source;
            this.subscriberIndex = subscriberIndex;
        }

        public void onSubscribe(PublisherSource.Subscription s) {
            PublisherSource.Subscriber target = this.target;
            assert (target != null);
            target.onSubscribe((PublisherSource.Subscription)this);
        }

        @Override
        String queueIdentifier() {
            return this.source + " " + this.subscriberIndex;
        }

        @Override
        void requestFromSource(int requestN) {
            this.source.requestIndividualSubscriber(this);
        }

        @Override
        void handleInvalidRequestN(long n) {
            ((MulticastPublisher)this.source).subscription.request(n);
        }

        @Override
        void cancelSourceFromExternal(Throwable cause) {
            LOGGER.error("Unexpected exception thrown from {} subscriber", (Object)this.queueIdentifier(), (Object)cause);
            this.source.cancelIndividualSubscriber(this.subscriberIndex);
        }

        @Override
        void cancelSourceFromSource(boolean subscriberLockAcquired, Throwable cause) {
            LOGGER.error("Unexpected exception thrown from {} subscriber", (Object)this.queueIdentifier(), (Object)cause);
            this.source.cancelIndividualSubscriber(this.subscriberIndex);
        }

        public void cancel() {
            this.source.cancelIndividualSubscriber(this.subscriberIndex);
        }
    }
}

