/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.CapturedContext;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.TaskBasedAsyncCompletableOperator;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class TaskBasedAsyncPublisherOperator<T>
extends AbstractNoHandleSubscribePublisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskBasedAsyncPublisherOperator.class);
    private static final Object NULL_WRAPPER = new Object(){

        public String toString() {
            return "NULL_WRAPPER";
        }
    };
    private final Publisher<T> original;
    private final BooleanSupplier shouldOffload;
    private final Executor executor;

    TaskBasedAsyncPublisherOperator(Publisher<T> original, BooleanSupplier shouldOffload, Executor executor) {
        this.original = original;
        this.shouldOffload = Objects.requireNonNull(shouldOffload, "shouldOffload");
        this.executor = Objects.requireNonNull(executor, "executor");
    }

    final BooleanSupplier shouldOffload() {
        return this.shouldOffload;
    }

    final Executor executor() {
        return this.executor;
    }

    @Override
    void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber, CapturedContext capturedContext, AsyncContextProvider contextProvider) {
        this.original.delegateSubscribe(subscriber, capturedContext, contextProvider);
    }

    private static final class OffloadedSubscription
    implements PublisherSource.Subscription {
        private static final int STATE_IDLE = 0;
        private static final int STATE_ENQUEUED = 1;
        private static final int STATE_EXECUTING = 2;
        public static final int CANCELLED = -1;
        public static final int TERMINATED = -2;
        private static final AtomicIntegerFieldUpdater<OffloadedSubscription> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OffloadedSubscription.class, "state");
        private static final AtomicLongFieldUpdater<OffloadedSubscription> requestedUpdater = AtomicLongFieldUpdater.newUpdater(OffloadedSubscription.class, "requested");
        private final Executor executor;
        private final BooleanSupplier shouldOffload;
        private final PublisherSource.Subscription target;
        private volatile int state = 0;
        private volatile long requested;
        private boolean hasOffloaded;

        OffloadedSubscription(Executor executor, BooleanSupplier shouldOffload, PublisherSource.Subscription target) {
            this.executor = executor;
            this.shouldOffload = shouldOffload;
            this.target = Objects.requireNonNull(target);
        }

        private boolean shouldOffload() {
            if (!this.hasOffloaded) {
                if (!TaskBasedAsyncCompletableOperator.safeShouldOffload(this.shouldOffload)) {
                    return false;
                }
                this.hasOffloaded = true;
            }
            return true;
        }

        @Override
        public void request(long n) {
            if (!SubscriberUtils.isRequestNValid((long)n) && requestedUpdater.getAndSet(this, n < -2L ? n : Long.MIN_VALUE) >= 0L || requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative) > 0L) {
                this.enqueueTaskIfRequired(true);
            }
        }

        @Override
        public void cancel() {
            long oldVal = requestedUpdater.getAndSet(this, -1L);
            if (oldVal != -1L) {
                this.enqueueTaskIfRequired(false);
            }
        }

        private void enqueueTaskIfRequired(boolean forRequestN) {
            int oldState = stateUpdater.getAndSet(this, 1);
            if (oldState == 0) {
                try {
                    if (this.shouldOffload()) {
                        this.executor.execute(this::executeTask);
                    } else {
                        this.executeTask();
                    }
                }
                catch (Throwable t) {
                    if (forRequestN) {
                        LOGGER.warn("Failed to execute task on the executor {}. Invoking Subscription (request()) in the caller thread. Subscription {}.", new Object[]{this.executor, this.target, t});
                        this.target.request(requestedUpdater.getAndSet(this, 0L));
                    }
                    this.requested = -2L;
                    LOGGER.warn("Failed to execute task on the executor {}. Invoking Subscription (cancel()) in the caller thread. Subscription {}.", new Object[]{this.executor, this.target, t});
                    this.target.cancel();
                }
            }
        }

        private void executeTask() {
            this.state = 2;
            block5: while (true) {
                long r;
                if ((r = requestedUpdater.getAndSet(this, 0L)) > 0L) {
                    try {
                        this.target.request(r);
                        continue;
                    }
                    catch (Throwable t) {
                        r = -1L;
                        this.requested = -1L;
                        LOGGER.warn("Unexpected exception from request(). Subscription {}.", (Object)this.target, (Object)t);
                    }
                }
                if (r == -1L) {
                    this.requested = -2L;
                    SubscriberUtils.safeCancel((Cancellable)this.target);
                    return;
                }
                if (r == -2L) {
                    return;
                }
                if (r != 0L) {
                    this.requested = -2L;
                    try {
                        this.target.request(r);
                    }
                    catch (IllegalArgumentException t) {
                    }
                    catch (Throwable t) {
                        LOGGER.warn("Ignoring unexpected exception from request(). Subscription {}.", (Object)this.target, (Object)t);
                    }
                    return;
                }
                while (true) {
                    int cState;
                    if ((cState = this.state) == 2) {
                        if (!stateUpdater.compareAndSet(this, 2, 0)) continue;
                        return;
                    }
                    if (cState != 1) break block5;
                    if (stateUpdater.compareAndSet(this, 1, 2)) break;
                }
            }
        }
    }

    static final class OffloadedSubscriptionSubscriber<T>
    implements PublisherSource.Subscriber<T> {
        private final PublisherSource.Subscriber<T> subscriber;
        private final BooleanSupplier shouldOffload;
        private final Executor executor;

        OffloadedSubscriptionSubscriber(PublisherSource.Subscriber<T> subscriber, BooleanSupplier shouldOffload, Executor executor) {
            this.subscriber = Objects.requireNonNull(subscriber);
            this.shouldOffload = shouldOffload;
            this.executor = executor;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscriber.onSubscribe(new OffloadedSubscription(this.executor, this.shouldOffload, s));
        }

        @Override
        public void onNext(@Nullable T t) {
            this.subscriber.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
        }
    }

    static final class OffloadedSubscriber<T>
    implements PublisherSource.Subscriber<T> {
        private static final int STATE_IDLE = 0;
        private static final int STATE_ENQUEUED = 1;
        private static final int STATE_EXECUTING = 2;
        private static final int STATE_TERMINATING = 3;
        private static final int STATE_TERMINATED = 4;
        private static final AtomicIntegerFieldUpdater<OffloadedSubscriber> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OffloadedSubscriber.class, "state");
        private volatile int state = 0;
        private final PublisherSource.Subscriber<? super T> target;
        private final BooleanSupplier shouldOffload;
        private final Executor executor;
        private final Queue<Object> signals;
        @Nullable
        private PublisherSource.Subscription subscription;
        private boolean hasOffloaded;

        OffloadedSubscriber(PublisherSource.Subscriber<? super T> target, BooleanSupplier shouldOffload, Executor executor) {
            this(target, shouldOffload, executor, 2);
        }

        OffloadedSubscriber(PublisherSource.Subscriber<? super T> target, BooleanSupplier shouldOffload, Executor executor, int publisherSignalQueueInitialCapacity) {
            this.target = target;
            this.shouldOffload = shouldOffload;
            this.executor = executor;
            this.signals = PlatformDependent.newUnboundedSpscQueue((int)publisherSignalQueueInitialCapacity);
        }

        private boolean shouldOffload() {
            if (!this.hasOffloaded) {
                try {
                    if (!this.shouldOffload.getAsBoolean()) {
                        return false;
                    }
                    this.hasOffloaded = true;
                }
                catch (Throwable throwable) {
                    LOGGER.warn("Offloading hint BooleanSupplier {} threw", (Object)this.shouldOffload, (Object)throwable);
                    throw throwable;
                }
            }
            return true;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscription = s;
            this.offerSignal(s);
        }

        @Override
        public void onNext(@Nullable T t) {
            this.offerSignal(t == null ? NULL_WRAPPER : t);
        }

        @Override
        public void onError(Throwable t) {
            this.offerSignal(TerminalNotification.error((Throwable)t));
        }

        @Override
        public void onComplete() {
            this.offerSignal(TerminalNotification.complete());
        }

        void deliverSignals() {
            this.state = 2;
            block4: while (true) {
                Object signal;
                if ((signal = this.signals.poll()) != null) {
                    if (signal instanceof PublisherSource.Subscription) {
                        PublisherSource.Subscription subscription = (PublisherSource.Subscription)signal;
                        try {
                            this.target.onSubscribe(subscription);
                        }
                        catch (Throwable t) {
                            this.clearSignalsFromExecutorThread();
                            SubscriberUtils.safeOnError(this.target, (Throwable)t);
                            SubscriberUtils.safeCancel((Cancellable)subscription);
                            return;
                        }
                    }
                    if (signal instanceof TerminalNotification) {
                        this.state = 4;
                        Throwable cause = ((TerminalNotification)signal).cause();
                        if (cause != null) {
                            SubscriberUtils.safeOnError(this.target, (Throwable)cause);
                        } else {
                            SubscriberUtils.safeOnComplete(this.target);
                        }
                        return;
                    }
                    Object t = signal == NULL_WRAPPER ? null : signal;
                    try {
                        this.target.onNext(t);
                    }
                    catch (Throwable th) {
                        this.clearSignalsFromExecutorThread();
                        SubscriberUtils.safeOnError(this.target, (Throwable)th);
                        assert (this.subscription != null);
                        SubscriberUtils.safeCancel((Cancellable)this.subscription);
                        return;
                    }
                }
                while (true) {
                    int cState;
                    if ((cState = this.state) == 2) {
                        if (!stateUpdater.compareAndSet(this, 2, 0)) continue;
                        return;
                    }
                    if (cState != 1) break block4;
                    if (stateUpdater.compareAndSet(this, 1, 2)) break;
                }
            }
        }

        private void clearSignalsFromExecutorThread() {
            do {
                this.state = 3;
                this.signals.clear();
            } while (!stateUpdater.compareAndSet(this, 3, 4));
        }

        private void offerSignal(Object signal) {
            int cState;
            if (!this.signals.offer(signal)) {
                throw new QueueFullException("signals");
            }
            do {
                if ((cState = this.state) == 4) {
                    this.signals.clear();
                    return;
                }
                if (cState != 3) continue;
                if (stateUpdater.getAndSet(this, 4) == 4) {
                    this.signals.clear();
                }
                return;
            } while (!stateUpdater.compareAndSet(this, cState, 1));
            if (cState != 0) {
                return;
            }
            try {
                if (this.shouldOffload()) {
                    this.executor.execute(this::deliverSignals);
                } else {
                    this.deliverSignals();
                }
            }
            catch (Throwable t) {
                this.state = 4;
                try {
                    if (signal instanceof PublisherSource.Subscription) {
                        this.target.onSubscribe(EmptySubscriptions.EMPTY_SUBSCRIPTION);
                    }
                }
                finally {
                    SubscriberUtils.safeOnError(this.target, (Throwable)t);
                }
                this.signals.clear();
                assert (this.subscription != null);
                SubscriberUtils.safeCancel((Cancellable)this.subscription);
            }
        }
    }
}

