/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

abstract class MulticastLeafSubscriber<T>
implements PublisherSource.Subscriber<T>,
PublisherSource.Subscription {
    private static final AtomicLongFieldUpdater<MulticastLeafSubscriber> requestedUpdater = AtomicLongFieldUpdater.newUpdater(MulticastLeafSubscriber.class, "requested");
    private static final AtomicLongFieldUpdater<MulticastLeafSubscriber> sourceRequestedUpdater = AtomicLongFieldUpdater.newUpdater(MulticastLeafSubscriber.class, "sourceRequested");
    private static final AtomicLongFieldUpdater<MulticastLeafSubscriber> sourceEmittedUpdater = AtomicLongFieldUpdater.newUpdater(MulticastLeafSubscriber.class, "sourceEmitted");
    private static final AtomicIntegerFieldUpdater<MulticastLeafSubscriber> emittingLockUpdater = AtomicIntegerFieldUpdater.newUpdater(MulticastLeafSubscriber.class, "emittingLock");
    @Nullable
    private Queue<Object> signalQueue;
    private volatile long requested;
    private volatile long sourceRequested;
    private volatile long sourceEmitted;
    private volatile int emittingLock;
    private boolean cancelled;

    MulticastLeafSubscriber() {
        ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this);
    }

    @Nullable
    abstract PublisherSource.Subscriber<? super T> subscriber();

    @Nullable
    abstract PublisherSource.Subscriber<? super T> subscriberOnSubscriptionThread();

    abstract void requestUpstream(long var1);

    abstract void cancelUpstream();

    abstract int outstandingDemandLimit();

    final void triggerOnSubscribe() {
        PublisherSource.Subscriber<T> subscriber = this.subscriberOnSubscriptionThread();
        assert (subscriber != null);
        try {
            subscriber.onSubscribe(this);
        }
        finally {
            if (!ConcurrentUtils.releaseLock(emittingLockUpdater, this)) {
                this.drainSignalQueue(subscriber);
            }
        }
    }

    @Override
    public final void request(long n) {
        if (this.cancelled) {
            return;
        }
        if (SubscriberUtils.isRequestNValid(n)) {
            requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
            this.drainSignalQueueSupplier(this.subscriberOnSubscriptionThread(), this::subscriberOnSubscriptionThread);
        } else {
            this.requestUpstream(n);
        }
    }

    @Override
    public final void cancel() {
        this.cancelled = true;
        this.cancelUpstream();
    }

    @Override
    public final void onSubscribe(PublisherSource.Subscription subscription) {
        throw new UnsupportedOperationException();
    }

    @Override
    public final void onNext(@Nullable T t) {
        PublisherSource.Subscriber<T> subscriber = this.subscriber();
        if (subscriber == null) {
            this.getOrCreateSignalQueue(8).add(SubscriberApiUtils.wrapNull(t));
            this.drainSignalQueueSupplier(null, this::subscriber);
        } else if (this.hasSignalsQueued()) {
            this.getOrCreateSignalQueue(8).add(SubscriberApiUtils.wrapNull(t));
            this.drainSignalQueue(subscriber);
        } else if (ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
            if (this.sourceEmitted < this.requested) {
                try {
                    sourceEmittedUpdater.getAndIncrement(this);
                    subscriber.onNext(t);
                }
                finally {
                    if (ConcurrentUtils.releaseLock(emittingLockUpdater, this)) {
                        this.updateRequestN();
                    } else {
                        this.drainSignalQueue(subscriber);
                    }
                }
            } else {
                ConcurrentUtils.releaseLock(emittingLockUpdater, this);
                this.getOrCreateSignalQueue(8).add(SubscriberApiUtils.wrapNull(t));
                this.drainSignalQueue(subscriber);
            }
        } else {
            this.getOrCreateSignalQueue(8).add(SubscriberApiUtils.wrapNull(t));
            this.drainSignalQueue(subscriber);
        }
    }

    @Override
    public final void onError(Throwable t) {
        this.onTerminal(t, (cause, sub) -> sub.onError((Throwable)cause), TerminalNotification::error);
    }

    @Override
    public final void onComplete() {
        this.onTerminal(null, (cause, sub) -> sub.onComplete(), cause -> TerminalNotification.complete());
    }

    private void updateRequestN() {
        long actualSourceRequestN = ConcurrentUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, this.outstandingDemandLimit(), this);
        if (actualSourceRequestN > 0L) {
            this.requestUpstream(actualSourceRequestN);
        }
    }

    private Queue<Object> getOrCreateSignalQueue(int size) {
        if (this.signalQueue == null) {
            this.signalQueue = PlatformDependent.newUnboundedSpscQueue(size);
        }
        return this.signalQueue;
    }

    private void onTerminal(@Nullable Throwable t, BiConsumer<Throwable, PublisherSource.Subscriber<? super T>> emitter, Function<Throwable, TerminalNotification> terminalFunc) {
        PublisherSource.Subscriber<T> subscriber = this.subscriber();
        if (subscriber == null) {
            this.getOrCreateSignalQueue(1).add(terminalFunc.apply(t));
            this.drainSignalQueueSupplier(null, this::subscriber);
        } else if (this.hasSignalsQueued()) {
            this.getOrCreateSignalQueue(1).add(terminalFunc.apply(t));
            this.drainSignalQueue(subscriber);
        } else if (ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
            emitter.accept(t, subscriber);
        } else {
            this.getOrCreateSignalQueue(1).add(terminalFunc.apply(t));
            this.drainSignalQueue(subscriber);
        }
    }

    private boolean hasSignalsQueued() {
        return this.signalQueue != null && !this.signalQueue.isEmpty();
    }

    private void drainSignalQueue(PublisherSource.Subscriber<? super T> subscriber) {
        this.drainSignalQueueSupplier(subscriber, () -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainSignalQueueSupplier(@Nullable PublisherSource.Subscriber<? super T> subscriber, Supplier<PublisherSource.Subscriber<? super T>> subFunc) {
        Throwable delayedCause = null;
        boolean tryAcquire = true;
        boolean acquired = false;
        while (tryAcquire && ConcurrentUtils.tryAcquireLock(emittingLockUpdater, this)) {
            block11: {
                try {
                    Object signal;
                    acquired = true;
                    if (subscriber == null) {
                        subscriber = subFunc.get();
                    }
                    if (subscriber == null || this.signalQueue == null || this.signalQueue.isEmpty()) break block11;
                    long innerOnNextCount = 0L;
                    long outstandingDemand = this.requested - this.sourceEmitted;
                    while (innerOnNextCount < outstandingDemand && (signal = this.signalQueue.poll()) != null) {
                        try {
                            if (signal instanceof TerminalNotification) {
                                ((TerminalNotification)signal).terminate(subscriber);
                                continue;
                            }
                            ++innerOnNextCount;
                            subscriber.onNext(SubscriberApiUtils.unwrapNullUnchecked(signal));
                        }
                        catch (Throwable cause) {
                            delayedCause = io.servicetalk.concurrent.internal.ThrowableUtils.catchUnexpected(delayedCause, cause);
                        }
                    }
                    if (innerOnNextCount != 0L) {
                        sourceEmittedUpdater.addAndGet(this, innerOnNextCount);
                    }
                    if (innerOnNextCount != outstandingDemand || !((signal = this.signalQueue.peek()) instanceof TerminalNotification)) break block11;
                    this.signalQueue.poll();
                    ((TerminalNotification)signal).terminate(subscriber);
                }
                catch (Throwable throwable) {
                    tryAcquire = !ConcurrentUtils.releaseLock(emittingLockUpdater, this);
                    throw throwable;
                }
            }
            tryAcquire = !ConcurrentUtils.releaseLock(emittingLockUpdater, this);
        }
        if (acquired) {
            this.updateRequestN();
        }
        if (delayedCause != null) {
            ThrowableUtils.throwException(delayedCause);
        }
    }
}

