/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class MulticastUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(MulticastUtils.class);

    private MulticastUtils() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static <T, R> long drainToSubscriber(SpscQueue<T> toDrain, PublisherSource.Subscriber<? super T> target, AtomicIntegerFieldUpdater<R> subscriberStateUpdater, LongSupplier requestedSupplier, Consumer<TerminalNotification> terminalConsumer, Consumer<Throwable> nonTerminalErrorConsumer, IntConsumer onNextCountConsumer, R flagOwner) {
        long totalDrainCount = 0L;
        long requestCount = requestedSupplier.getAsLong();
        while (subscriberStateUpdater.compareAndSet(flagOwner, 0, 1)) {
            block21: {
                int i = 0;
                try {
                    Object next;
                    while ((long)i < requestCount && (next = ((SpscQueue)toDrain).unboundedSpsc.poll()) != null) {
                        if (next instanceof TerminalNotification) {
                            TerminalNotification terminalNotification = (TerminalNotification)next;
                            try {
                                terminalNotification.terminate(target);
                            }
                            catch (Throwable throwable) {
                                LOGGER.error("Error from terminal callbacks to subscriber {}", target, (Object)throwable);
                            }
                            finally {
                                terminalConsumer.accept(terminalNotification);
                            }
                            long throwable = -(totalDrainCount += (long)i);
                            return throwable;
                        }
                        Object t = next == SubscriberApiUtils.NULL_TOKEN ? null : next;
                        ++i;
                        toDrain.decrementSize();
                        try {
                            target.onNext(t);
                        }
                        catch (Throwable cause) {
                            nonTerminalErrorConsumer.accept(cause);
                            break;
                        }
                    }
                    totalDrainCount += (long)i;
                    if (!(((SpscQueue)toDrain).unboundedSpsc.peek() instanceof TerminalNotification)) break block21;
                    TerminalNotification terminalNotification = (TerminalNotification)((SpscQueue)toDrain).unboundedSpsc.poll();
                    try {
                        terminalNotification.terminate(target);
                    }
                    catch (Throwable throwable) {
                        LOGGER.error("Error from terminal callbacks to subscriber {}", target, (Object)throwable);
                    }
                    finally {
                        terminalConsumer.accept(terminalNotification);
                    }
                    long l = -totalDrainCount;
                    return l;
                }
                finally {
                    if (i != 0) {
                        onNextCountConsumer.accept(i);
                    }
                    subscriberStateUpdater.set(flagOwner, 0);
                }
            }
            if (!toDrain.isEmpty() && (requestCount = requestedSupplier.getAsLong()) != 0L) continue;
        }
        return totalDrainCount;
    }

    static final class SpscQueue<T> {
        private static final AtomicIntegerFieldUpdater<SpscQueue> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(SpscQueue.class, "size");
        private final int maxCapacity;
        private final Queue<Object> unboundedSpsc;
        private volatile int size;

        SpscQueue(int maxCapacity) {
            this.maxCapacity = maxCapacity;
            this.unboundedSpsc = PlatformDependent.newUnboundedSpscQueue((int)2);
        }

        boolean offerNext(@Nullable T item) {
            int currentSize;
            do {
                if ((currentSize = this.size) != this.maxCapacity) continue;
                return false;
            } while (!sizeUpdater.compareAndSet(this, currentSize, currentSize + 1));
            this.unboundedSpsc.offer(item == null ? SubscriberApiUtils.NULL_TOKEN : item);
            return true;
        }

        void addTerminal(TerminalNotification terminalNotification) {
            this.unboundedSpsc.offer(terminalNotification);
        }

        void decrementSize() {
            sizeUpdater.decrementAndGet(this);
        }

        boolean isEmpty() {
            return this.unboundedSpsc.isEmpty();
        }
    }

    static abstract class IndividualMulticastSubscriber<T>
    implements PublisherSource.Subscription {
        private static final Logger LOGGER = LoggerFactory.getLogger(IndividualMulticastSubscriber.class);
        private static final AtomicIntegerFieldUpdater<IndividualMulticastSubscriber> subscriberStateUpdater = AtomicIntegerFieldUpdater.newUpdater(IndividualMulticastSubscriber.class, "subscriberState");
        private static final AtomicLongFieldUpdater<IndividualMulticastSubscriber> requestedUpdater = AtomicLongFieldUpdater.newUpdater(IndividualMulticastSubscriber.class, "requested");
        private static final AtomicLongFieldUpdater<IndividualMulticastSubscriber> sourceRequestedUpdater = AtomicLongFieldUpdater.newUpdater(IndividualMulticastSubscriber.class, "sourceRequested");
        private static final AtomicLongFieldUpdater<IndividualMulticastSubscriber> sourceEmittedUpdater = AtomicLongFieldUpdater.newUpdater(IndividualMulticastSubscriber.class, "sourceEmitted");
        private boolean terminatedPrematurely;
        private volatile int subscriberState;
        private volatile long requested;
        private volatile long sourceRequested;
        private volatile long sourceEmitted;
        @Nullable
        volatile PublisherSource.Subscriber<? super T> target;
        @Nullable
        private volatile SpscQueue<T> subscriberQueue;
        private final int maxQueueSize;

        IndividualMulticastSubscriber(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
        }

        IndividualMulticastSubscriber(int maxQueueSize, PublisherSource.Subscriber<? super T> target) {
            this.maxQueueSize = maxQueueSize;
            this.target = Objects.requireNonNull(target);
        }

        @Nullable
        final SpscQueue<T> subscriberQueue() {
            return this.subscriberQueue;
        }

        private void drainPendingFromSource(SpscQueue<T> subscriberQueue) {
            this.drainPendingFromSource(subscriberQueue, this.target);
        }

        private void drainPendingFromSource(SpscQueue<T> subscriberQueue, @Nullable PublisherSource.Subscriber<? super T> groupSinkTarget) {
            if (groupSinkTarget == null) {
                return;
            }
            long drainCount = MulticastUtils.drainToSubscriber(subscriberQueue, groupSinkTarget, subscriberStateUpdater, () -> this.requested - this.sourceEmitted, terminalNotification -> {}, this::cancelSourceFromSource, this::drainPendingHandleEmitted, this);
            if (drainCount > 0L) {
                this.updateRequestN(drainCount);
            }
        }

        final void drainPendingFromExternal(SpscQueue<T> subscriberQueue, PublisherSource.Subscriber<? super T> groupSinkTarget) {
            this.updateRequestN(MulticastUtils.drainToSubscriber(subscriberQueue, groupSinkTarget, subscriberStateUpdater, () -> this.requested - this.sourceEmitted, terminalNotification -> {}, this::cancelSourceFromExternal, this::drainPendingHandleEmitted, this));
        }

        private void drainPendingHandleEmitted(int onNextCount) {
            sourceEmittedUpdater.addAndGet(this, onNextCount);
        }

        private void cancelSourceFromSource(Throwable cause) {
            SpscQueue<T> subscriberQueue = this.subscriberQueue;
            assert (subscriberQueue != null);
            this.cancelSourceFromSource(true, cause, this.target, subscriberQueue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancelSourceFromSource(boolean subscriberLockAcquired, Throwable cause, @Nullable PublisherSource.Subscriber<? super T> groupSinkTarget, @Nullable SpscQueue<T> groupSinkQueue) {
            this.terminatedPrematurely = true;
            this.cancelSourceFromSource(subscriberLockAcquired, cause);
            if (groupSinkTarget == null) {
                assert (groupSinkQueue != null);
                groupSinkQueue.addTerminal(TerminalNotification.error((Throwable)cause));
                this.drainPendingFromSource(groupSinkQueue);
            } else if (groupSinkQueue == null || groupSinkQueue.isEmpty()) {
                if (subscriberLockAcquired || subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                    try {
                        groupSinkTarget.onError(cause);
                    }
                    catch (Throwable onErrorError) {
                        LOGGER.error("Subscriber {} threw from onError for exception {}", new Object[]{groupSinkTarget, cause, onErrorError});
                    }
                    finally {
                        if (!subscriberLockAcquired) {
                            this.subscriberState = 0;
                        }
                    }
                }
            } else {
                groupSinkQueue.addTerminal(TerminalNotification.error((Throwable)cause));
                this.drainPendingFromSource(groupSinkQueue);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public final void onNext(@Nullable T next) {
            if (this.terminatedPrematurely) {
                return;
            }
            PublisherSource.Subscriber<? super T> target = this.target;
            SpscQueue<T> subscriberQueue = this.subscriberQueue;
            if (target == null) {
                if (subscriberQueue == null) {
                    subscriberQueue = new SpscQueue(this.maxQueueSize);
                    this.subscriberQueue = subscriberQueue;
                }
                if (!subscriberQueue.offerNext(next)) {
                    this.cancelSourceFromSource(false, (Throwable)new QueueFullException(this.queueIdentifier(), this.maxQueueSize), this.target, subscriberQueue);
                }
                this.drainPendingFromSource(subscriberQueue);
            } else if (subscriberQueue != null && !subscriberQueue.isEmpty()) {
                if (!subscriberQueue.offerNext(next)) {
                    this.cancelSourceFromSource(false, (Throwable)new QueueFullException(this.queueIdentifier(), this.maxQueueSize), this.target, subscriberQueue);
                }
                this.drainPendingFromSource(subscriberQueue, target);
            } else if (subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                if (this.sourceEmitted != this.requested) {
                    try {
                        sourceEmittedUpdater.incrementAndGet(this);
                        target.onNext(next);
                        this.updateRequestN(1L);
                    }
                    catch (Throwable cause) {
                        this.cancelSourceFromSource(true, new IllegalStateException("Unexpected exception thrown from onNext for identifier " + this.queueIdentifier(), cause), target, this.subscriberQueue);
                    }
                    finally {
                        this.subscriberState = 0;
                    }
                    if (subscriberQueue == null) {
                        subscriberQueue = this.subscriberQueue;
                    }
                } else {
                    this.subscriberState = 0;
                    if (subscriberQueue == null) {
                        subscriberQueue = new SpscQueue(this.maxQueueSize);
                        this.subscriberQueue = subscriberQueue;
                    }
                    if (!subscriberQueue.offerNext(next)) {
                        this.cancelSourceFromSource(true, (Throwable)new QueueFullException(this.queueIdentifier(), this.maxQueueSize), this.target, subscriberQueue);
                    }
                }
                if (subscriberQueue != null && !subscriberQueue.isEmpty()) {
                    this.drainPendingFromSource(subscriberQueue, target);
                }
            } else {
                if (subscriberQueue == null) {
                    subscriberQueue = new SpscQueue(this.maxQueueSize);
                    this.subscriberQueue = subscriberQueue;
                }
                if (!subscriberQueue.offerNext(next)) {
                    this.cancelSourceFromSource(false, (Throwable)new QueueFullException(this.queueIdentifier(), this.maxQueueSize), this.target, subscriberQueue);
                }
                this.drainPendingFromSource(subscriberQueue, target);
            }
        }

        public final void onError(Throwable cause) {
            this.terminateFromSource(TerminalNotification.error((Throwable)cause));
        }

        public final void onComplete() {
            this.terminateFromSource(TerminalNotification.complete());
        }

        private void terminateFromSource(TerminalNotification terminalNotification) {
            if (this.terminatedPrematurely) {
                return;
            }
            PublisherSource.Subscriber<? super T> target = this.target;
            SpscQueue<T> subscriberQueue = this.subscriberQueue;
            if (target == null) {
                if (subscriberQueue == null) {
                    subscriberQueue = new SpscQueue(this.maxQueueSize);
                    this.subscriberQueue = subscriberQueue;
                }
                subscriberQueue.addTerminal(terminalNotification);
                this.drainPendingFromSource(subscriberQueue);
            } else if (subscriberQueue != null && !subscriberQueue.isEmpty()) {
                subscriberQueue.addTerminal(terminalNotification);
                this.drainPendingFromSource(subscriberQueue);
            } else if (subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                try {
                    terminalNotification.terminate(target);
                }
                catch (Throwable onErrorError) {
                    LOGGER.error("Subscriber {} threw for terminal {}", new Object[]{target, terminalNotification, onErrorError});
                }
            } else {
                if (subscriberQueue == null) {
                    subscriberQueue = new SpscQueue(this.maxQueueSize);
                    this.subscriberQueue = subscriberQueue;
                }
                subscriberQueue.addTerminal(terminalNotification);
                this.drainPendingFromSource(subscriberQueue, target);
            }
        }

        private void updateRequestN(long drainedCount) {
            int actualSourceRequestN = SubscriberUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, (int)this.maxQueueSize, (Object)this);
            if ((long)actualSourceRequestN > drainedCount) {
                this.requestFromSource(actualSourceRequestN - (int)drainedCount);
            }
        }

        final long sourceRequested() {
            return this.sourceRequested;
        }

        abstract String queueIdentifier();

        abstract void requestFromSource(int var1);

        abstract void handleInvalidRequestN(long var1);

        abstract void cancelSourceFromExternal(Throwable var1);

        abstract void cancelSourceFromSource(boolean var1, Throwable var2);

        public void request(long n) {
            PublisherSource.Subscriber<? super T> target;
            if (!SubscriberUtils.isRequestNValid((long)n)) {
                this.handleInvalidRequestN(n);
                return;
            }
            requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
            SpscQueue<T> subscriberQueue = this.subscriberQueue;
            if (subscriberQueue != null && (target = this.target) != null) {
                this.drainPendingFromExternal(subscriberQueue, target);
            } else {
                this.updateRequestN(0L);
            }
        }
    }
}

