/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.CapturedContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.DefaultPriorityQueue;
import io.servicetalk.concurrent.api.MulticastLeafSubscriber;
import io.servicetalk.concurrent.api.PublishAndSubscribeOnPublishers;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.ArrayUtils;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.RejectedSubscribeException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MulticastPublisher<T>
extends AbstractNoHandleSubscribePublisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MulticastPublisher.class);
    static final int DEFAULT_MULTICAST_QUEUE_LIMIT = 64;
    static final Function<Throwable, Completable> DEFAULT_MULTICAST_TERM_RESUB = t -> Completable.completed();
    private static final PublisherSource.Subscriber<?>[] EMPTY_SUBSCRIBERS = new PublisherSource.Subscriber[0];
    private static final AtomicReferenceFieldUpdater<State, PublisherSource.Subscriber[]> newSubscribersUpdater = AtomicReferenceFieldUpdater.newUpdater(State.class, PublisherSource.Subscriber[].class, "subscribers");
    private static final AtomicIntegerFieldUpdater<State> subscribeCountUpdater = AtomicIntegerFieldUpdater.newUpdater(State.class, "subscribeCount");
    private final Publisher<T> original;
    private final Function<Throwable, Completable> terminalResubscribe;
    private final int minSubscribers;
    private final boolean exactlyMinSubscribers;
    private final boolean cancelUpstream;
    @Nullable
    volatile State state;

    MulticastPublisher(Publisher<T> original, int minSubscribers, boolean exactlyMinSubscribers, boolean cancelUpstream, int maxQueueSize, Function<Throwable, Completable> terminalResubscribe) {
        if (minSubscribers < 1) {
            throw new IllegalArgumentException("minSubscribers: " + minSubscribers + " (expected >1)");
        }
        if (maxQueueSize < 1) {
            throw new IllegalArgumentException("maxQueueSize: " + maxQueueSize + " (expected >1)");
        }
        this.original = original;
        this.minSubscribers = minSubscribers;
        this.exactlyMinSubscribers = exactlyMinSubscribers;
        this.cancelUpstream = cancelUpstream;
        this.terminalResubscribe = Objects.requireNonNull(terminalResubscribe);
    }

    static <T> MulticastPublisher<T> newMulticastPublisher(Publisher<T> original, int minSubscribers, boolean exactlyMinSubscribers, boolean cancelUpstream, int maxQueueSize, Function<Throwable, Completable> terminalResubscribe) {
        MulticastPublisher<T> publisher = new MulticastPublisher<T>(original, minSubscribers, exactlyMinSubscribers, cancelUpstream, minSubscribers, terminalResubscribe);
        publisher.resetState(maxQueueSize, minSubscribers);
        return publisher;
    }

    @Override
    final void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber, CapturedContext capturedContext, AsyncContextProvider contextProvider) {
        State cState = this.state;
        assert (cState != null);
        cState.addNewSubscriber(subscriber, capturedContext, contextProvider);
    }

    void resetState(int maxQueueSize, int minSubscribers) {
        this.state = new State(maxQueueSize, minSubscribers);
    }

    static final class TerminalSubscriber<T>
    implements PublisherSource.Subscriber<T> {
        @Nullable
        final Throwable terminalError;

        private TerminalSubscriber(@Nullable Throwable terminalError) {
            this.terminalError = terminalError;
        }

        void terminate(PublisherSource.Subscriber<?> sub) {
            if (this.terminalError == null) {
                SubscriberUtils.deliverCompleteFromSource(sub);
            } else {
                SubscriberUtils.deliverErrorFromSource(sub, (Throwable)this.terminalError);
            }
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void onNext(@Nullable T t) {
            throw new IllegalStateException("terminal signal already received in onNext: " + t, this.terminalError);
        }

        @Override
        public void onError(Throwable t) {
            throw new IllegalStateException("duplicate terminal signal in onError", t);
        }

        @Override
        public void onComplete() {
            throw new IllegalStateException("duplicate terminal signal in onComplete", this.terminalError);
        }
    }

    static final class MulticastFixedSubscriber<T>
    extends MulticastLeafSubscriber<T>
    implements DefaultPriorityQueue.Node {
        private final int index;
        private final State root;
        private final PublisherSource.Subscriber<? super T> subscriber;
        private final PublisherSource.Subscriber<? super T> ctxSubscriber;
        private long initPriorityQueueValue;
        private long priorityQueueValue;
        private int priorityQueueIndex = -1;

        MulticastFixedSubscriber(State root, PublisherSource.Subscriber<? super T> subscriber, CapturedContext capturedContext, AsyncContextProvider contextProvider, int index) {
            this.root = root;
            this.index = index;
            this.subscriber = Objects.requireNonNull(subscriber);
            this.ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, capturedContext);
        }

        @Override
        PublisherSource.Subscriber<? super T> subscriber() {
            return this.subscriber;
        }

        @Override
        PublisherSource.Subscriber<? super T> subscriberOnSubscriptionThread() {
            return this.ctxSubscriber;
        }

        @Override
        void requestUpstream(long n) {
            this.root.request(this, n);
        }

        @Override
        void cancelUpstream() {
            this.root.cancel(this);
        }

        @Override
        int outstandingDemandLimit() {
            return this.root.maxQueueSize;
        }

        @Override
        public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
            return this.priorityQueueIndex;
        }

        @Override
        public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
            this.priorityQueueIndex = i;
        }

        public String toString() {
            return String.valueOf(this.index);
        }
    }

    private static abstract class MulticastRootSubscriber<T extends MulticastLeafSubscriber<?>> {
        static final AtomicIntegerFieldUpdater<MulticastRootSubscriber> subscriptionLockUpdater = AtomicIntegerFieldUpdater.newUpdater(MulticastRootSubscriber.class, "subscriptionLock");
        private final DelayedSubscription delayedSubscription = new DelayedSubscription();
        final Queue<Object> subscriptionEvents = PlatformDependent.newUnboundedMpscQueue((int)8);
        final int maxQueueSize;
        private volatile int subscriptionLock;

        MulticastRootSubscriber(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
        }

        @Nullable
        abstract TerminalSubscriber<?> addSubscriber(T var1, @Nullable CapturedContext var2, AsyncContextProvider var3);

        abstract boolean removeSubscriber(T var1);

        abstract long processRequestEvent(T var1, long var2);

        abstract long processCancelEvent(T var1);

        abstract boolean processSubscribeEvent(T var1, @Nullable TerminalSubscriber<?> var2);

        abstract void processTerminal(TerminalNotification var1);

        abstract void processOnNextEvent(Object var1);

        void upstreamCancelled() {
        }

        final void onSubscribe0(PublisherSource.Subscription subscription) {
            this.delayedSubscription.delayedSubscription(subscription);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        final void request(T subscriber, long n) {
            if (ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, (Object)this)) {
                try {
                    if (SubscriberUtils.isRequestNValid((long)n)) {
                        long toRequest = this.processRequestEvent(subscriber, n);
                        if (toRequest == 0L) return;
                        this.requestUpstream(toRequest);
                        return;
                    }
                    this.delayedSubscription.request(n);
                    return;
                }
                finally {
                    if (!ConcurrentUtils.releaseLock(subscriptionLockUpdater, (Object)this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.subscriptionEvents.add(new RequestEvent((MulticastLeafSubscriber)subscriber, n, null));
                this.processSubscriptionEvents();
            }
        }

        final void cancel(T subscriber) {
            boolean cancelUpstream = this.removeSubscriber(subscriber);
            if (ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, (Object)this)) {
                try {
                    this.processCancelEventInternal(subscriber, cancelUpstream);
                }
                finally {
                    if (!ConcurrentUtils.releaseLock(subscriptionLockUpdater, (Object)this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.subscriptionEvents.add(new CancelEvent((MulticastLeafSubscriber)subscriber, cancelUpstream, null));
                this.processSubscriptionEvents();
            }
        }

        private void requestUpstream(long n) {
            assert (n > 0L) : n;
            this.delayedSubscription.request(n);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        final void processSubscriptionEvents() {
            boolean tryAcquire = true;
            Throwable delayedCause = null;
            while (tryAcquire && ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, (Object)this)) {
                block11: {
                    try {
                        Object event;
                        long toRequest = 0L;
                        while ((event = this.subscriptionEvents.poll()) != null) {
                            if (event instanceof RequestEvent) {
                                if (toRequest < 0L) continue;
                                RequestEvent rEvent = (RequestEvent)event;
                                toRequest = rEvent.n > 0L ? FlowControlUtils.addWithOverflowProtection((long)toRequest, (long)this.processRequestEvent(rEvent.subscriber, rEvent.n)) : rEvent.n;
                                continue;
                            }
                            if (event instanceof SubscribeEvent) {
                                SubscribeEvent sEvent = (SubscribeEvent)event;
                                this.processSubscribeEventInternal(sEvent.subscriber, sEvent.capturedContext, sEvent.contextProvider);
                                continue;
                            }
                            if (event instanceof CancelEvent) {
                                CancelEvent cEvent = (CancelEvent)event;
                                this.processCancelEventInternal(cEvent.subscriber, cEvent.cancelUpstream);
                                continue;
                            }
                            if (event instanceof TerminalNotification) {
                                this.processTerminal((TerminalNotification)event);
                                continue;
                            }
                            this.processOnNextEvent(event);
                        }
                        if (toRequest == 0L) break block11;
                        this.requestUpstream(toRequest);
                    }
                    catch (Throwable cause) {
                        try {
                            delayedCause = io.servicetalk.concurrent.internal.ThrowableUtils.catchUnexpected(delayedCause, (Throwable)cause);
                        }
                        catch (Throwable throwable) {
                            tryAcquire = !ConcurrentUtils.releaseLock(subscriptionLockUpdater, (Object)this);
                            throw throwable;
                        }
                        tryAcquire = !ConcurrentUtils.releaseLock(subscriptionLockUpdater, (Object)this);
                        continue;
                    }
                }
                tryAcquire = !ConcurrentUtils.releaseLock(subscriptionLockUpdater, (Object)this);
            }
            if (delayedCause != null) {
                ThrowableUtils.throwException(delayedCause);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processCancelEventInternal(T subscriber, boolean cancelUpstream) {
            long result = this.processCancelEvent(subscriber);
            if (result >= 0L) {
                if (cancelUpstream) {
                    try {
                        this.delayedSubscription.cancel();
                    }
                    finally {
                        this.upstreamCancelled();
                    }
                } else if (result > 0L) {
                    this.requestUpstream(result);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processSubscribeEventInternal(T subscriber, @Nullable CapturedContext capturedContext, AsyncContextProvider contextProvider) {
            TerminalSubscriber<?> terminalSubscriber = this.addSubscriber(subscriber, capturedContext, contextProvider);
            if (!this.processSubscribeEvent(subscriber, terminalSubscriber)) {
                return;
            }
            try {
                ((MulticastLeafSubscriber)subscriber).triggerOnSubscribe();
            }
            catch (Throwable cause) {
                try {
                    this.cancel(subscriber);
                }
                finally {
                    SubscriberUtils.safeOnError(subscriber, (Throwable)cause);
                }
            }
        }

        private static final class CancelEvent<T extends MulticastLeafSubscriber<?>> {
            final T subscriber;
            final boolean cancelUpstream;

            private CancelEvent(T subscriber, boolean cancelUpstream) {
                this.subscriber = subscriber;
                this.cancelUpstream = cancelUpstream;
            }

            /* synthetic */ CancelEvent(MulticastLeafSubscriber x0, boolean x1, 1 x2) {
                this(x0, x1);
            }
        }

        private static final class RequestEvent<T extends MulticastLeafSubscriber<?>> {
            final T subscriber;
            final long n;

            private RequestEvent(T subscriber, long n) {
                this.subscriber = subscriber;
                this.n = n == 0L ? -1L : n;
            }

            /* synthetic */ RequestEvent(MulticastLeafSubscriber x0, long x1, 1 x2) {
                this(x0, x1);
            }
        }

        static final class SubscribeEvent<T extends MulticastLeafSubscriber<?>> {
            private final T subscriber;
            @Nullable
            private final CapturedContext capturedContext;
            private final AsyncContextProvider contextProvider;

            SubscribeEvent(T subscriber, @Nullable CapturedContext capturedContext, AsyncContextProvider contextProvider) {
                this.subscriber = subscriber;
                this.capturedContext = capturedContext;
                this.contextProvider = contextProvider;
            }
        }
    }

    class State
    extends MulticastRootSubscriber<MulticastFixedSubscriber<T>>
    implements PublisherSource.Subscriber<T> {
        private final DefaultPriorityQueue<MulticastFixedSubscriber<T>> demandQueue;
        volatile int subscribeCount;
        volatile PublisherSource.Subscriber<? super T>[] subscribers;

        State(int maxQueueSize, int minSubscribers) {
            super(maxQueueSize);
            this.subscribers = EMPTY_SUBSCRIBERS;
            this.demandQueue = new DefaultPriorityQueue<MulticastFixedSubscriber>(Comparator.comparingLong(sub -> ((MulticastFixedSubscriber)sub).priorityQueueValue), minSubscribers);
        }

        @Override
        @Nullable
        final TerminalSubscriber<?> addSubscriber(MulticastFixedSubscriber<T> subscriber, @Nullable CapturedContext capturedContext, AsyncContextProvider contextProvider) {
            PublisherSource.Subscriber[] newSubs;
            PublisherSource.Subscriber<? super T>[] currSubs;
            do {
                if ((currSubs = this.subscribers).length == 1 && currSubs[0] instanceof TerminalSubscriber) {
                    return (TerminalSubscriber)currSubs[0];
                }
                newSubs = (PublisherSource.Subscriber[])Array.newInstance(PublisherSource.Subscriber.class, currSubs.length + 1);
                System.arraycopy(currSubs, 0, newSubs, 0, currSubs.length);
                newSubs[currSubs.length] = subscriber;
            } while (!newSubscribersUpdater.compareAndSet(this, currSubs, newSubs));
            if (capturedContext != null) {
                MulticastPublisher.this.original.delegateSubscribe(this, capturedContext, contextProvider);
            }
            return null;
        }

        @Override
        final boolean removeSubscriber(MulticastFixedSubscriber<T> subscriber) {
            PublisherSource.Subscriber[] newSubs;
            Object[] currSubs;
            do {
                int i;
                if ((i = ArrayUtils.indexOf(subscriber, (Object[])(currSubs = this.subscribers))) < 0) {
                    return false;
                }
                newSubs = (PublisherSource.Subscriber[])Array.newInstance(PublisherSource.Subscriber.class, currSubs.length - 1);
                if (i == 0) {
                    System.arraycopy(currSubs, 1, newSubs, 0, newSubs.length);
                    continue;
                }
                System.arraycopy(currSubs, 0, newSubs, 0, i);
                System.arraycopy(currSubs, i + 1, newSubs, i, newSubs.length - i);
            } while (!newSubscribersUpdater.compareAndSet(this, currSubs, newSubs));
            if (MulticastPublisher.this.cancelUpstream && newSubs.length == 0) {
                try {
                    MulticastPublisher.this.resetState(this.maxQueueSize, MulticastPublisher.this.minSubscribers);
                    return true;
                }
                catch (Throwable cause) {
                    LOGGER.warn("unexpected exception creating new state {}", (Object)MulticastPublisher.this, (Object)cause);
                }
            }
            return false;
        }

        @Override
        final long processRequestEvent(MulticastFixedSubscriber<T> subscriber, long n) {
            assert (n > 0L);
            MulticastFixedSubscriber oldMin = (MulticastFixedSubscriber)this.demandQueue.peek();
            long oldValue = subscriber.priorityQueueValue;
            subscriber.priorityQueueValue = FlowControlUtils.addWithOverflowProtection((long)subscriber.priorityQueueValue, (long)n);
            if (!this.demandQueue.priorityChanged(subscriber) || oldMin != subscriber) {
                return 0L;
            }
            MulticastFixedSubscriber newMin = (MulticastFixedSubscriber)this.demandQueue.peek();
            assert (newMin != null);
            return newMin.priorityQueueValue - oldValue;
        }

        @Override
        final long processCancelEvent(MulticastFixedSubscriber<T> subscriber) {
            MulticastFixedSubscriber min = (MulticastFixedSubscriber)this.demandQueue.peek();
            if (!this.demandQueue.removeTyped(subscriber)) {
                return -1L;
            }
            if (min == subscriber) {
                min = (MulticastFixedSubscriber)this.demandQueue.peek();
                return min == null ? 0L : min.priorityQueueValue - min.initPriorityQueueValue - (subscriber.priorityQueueValue - subscriber.initPriorityQueueValue);
            }
            return 0L;
        }

        @Override
        boolean processSubscribeEvent(MulticastFixedSubscriber<T> subscriber, @Nullable TerminalSubscriber<?> terminalSubscriber) {
            if (terminalSubscriber != null) {
                terminalSubscriber.terminate(subscriber.subscriber);
                return false;
            }
            MulticastFixedSubscriber currMin = (MulticastFixedSubscriber)this.demandQueue.peek();
            if (currMin != null) {
                subscriber.priorityQueueValue = (subscriber.initPriorityQueueValue = currMin.priorityQueueValue);
            }
            this.demandQueue.add(subscriber);
            return true;
        }

        @Override
        void processTerminal(TerminalNotification terminalNotification) {
            throw new UnsupportedOperationException("terminal queuing not supported. terminal=" + terminalNotification);
        }

        @Override
        void processOnNextEvent(Object wrapped) {
            throw new UnsupportedOperationException("onNext queuing not supported. wrapped=" + wrapped);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        final void addNewSubscriber(PublisherSource.Subscriber<? super T> subscriber, CapturedContext capturedContext, AsyncContextProvider contextProvider) {
            int sCount = subscribeCountUpdater.incrementAndGet(this);
            if (MulticastPublisher.this.exactlyMinSubscribers && sCount > MulticastPublisher.this.minSubscribers) {
                PublishAndSubscribeOnPublishers.deliverOnSubscribeAndOnError(subscriber, capturedContext, contextProvider, (Throwable)new RejectedSubscribeException("Only " + MulticastPublisher.this.minSubscribers + " subscribers are allowed!"));
                return;
            }
            MulticastFixedSubscriber multiSubscriber = new MulticastFixedSubscriber(this, subscriber, capturedContext, contextProvider, sCount);
            if (ConcurrentUtils.tryAcquireLock((AtomicIntegerFieldUpdater)subscriptionLockUpdater, (Object)this)) {
                try {
                    this.processSubscribeEventInternal(multiSubscriber, sCount == MulticastPublisher.this.minSubscribers ? capturedContext : null, contextProvider);
                }
                finally {
                    if (!ConcurrentUtils.releaseLock((AtomicIntegerFieldUpdater)subscriptionLockUpdater, (Object)this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.subscriptionEvents.add(new MulticastRootSubscriber.SubscribeEvent(multiSubscriber, sCount == MulticastPublisher.this.minSubscribers ? capturedContext : null, contextProvider));
                this.processSubscriptionEvents();
            }
        }

        @Override
        public final void onSubscribe(PublisherSource.Subscription subscription) {
            this.onSubscribe0(subscription);
        }

        @Override
        public void onNext(@Nullable T t) {
            PublisherSource.Subscriber<? super T>[] subs;
            for (PublisherSource.Subscriber subscriber : subs = this.subscribers) {
                subscriber.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            this.onTerminal(t, PublisherSource.Subscriber::onError);
        }

        @Override
        public void onComplete() {
            this.onTerminal(null, (subscriber, t) -> subscriber.onComplete());
        }

        private void onTerminal(@Nullable Throwable t, BiConsumer<PublisherSource.Subscriber<? super T>, Throwable> terminator) {
            block4: {
                PublisherSource.Subscriber<? super T>[] subs;
                this.safeTerminalStateReset(t).whenFinally(() -> {
                    try {
                        MulticastPublisher.this.resetState(this.maxQueueSize, MulticastPublisher.this.minSubscribers);
                    }
                    catch (Throwable cause) {
                        LOGGER.warn("unexpected exception from terminal resubscribe Completable {}", (Object)MulticastPublisher.this, (Object)cause);
                    }
                }).subscribe();
                PublisherSource.Subscriber[] newSubs = (PublisherSource.Subscriber[])Array.newInstance(PublisherSource.Subscriber.class, 1);
                newSubs[0] = new TerminalSubscriber(t);
                do {
                    subs = this.subscribers;
                } while (!newSubscribersUpdater.compareAndSet(this, subs, newSubs));
                Throwable delayedCause = null;
                for (PublisherSource.Subscriber subscriber : subs) {
                    try {
                        terminator.accept(subscriber, t);
                    }
                    catch (Throwable cause) {
                        delayedCause = io.servicetalk.concurrent.internal.ThrowableUtils.catchUnexpected(delayedCause, (Throwable)cause);
                    }
                }
                if (delayedCause == null) break block4;
                ThrowableUtils.throwException(delayedCause);
            }
        }

        private Completable safeTerminalStateReset(@Nullable Throwable t) {
            try {
                return (Completable)MulticastPublisher.this.terminalResubscribe.apply(t);
            }
            catch (Throwable cause) {
                LOGGER.warn("terminalStateReset {} threw", (Object)MulticastPublisher.this.terminalResubscribe, (Object)cause);
                return Completable.never();
            }
        }
    }
}

