/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.TimeoutPublisher;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.context.api.ContextMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class TimeoutDemandPublisher<T>
extends AbstractNoHandleSubscribePublisher<T> {
    private final Publisher<T> original;
    private final Executor timeoutExecutor;
    private final long durationNs;

    TimeoutDemandPublisher(Publisher<T> original, long duration, TimeUnit unit, Executor timeoutExecutor) {
        this.original = Objects.requireNonNull(original);
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor);
        this.durationNs = Math.max(0L, unit.toNanos(duration));
    }

    @Override
    void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber, ContextMap contextMap, AsyncContextProvider contextProvider) {
        this.original.delegateSubscribe(TimeoutDemandSubscriber.newInstance(this, subscriber, contextMap, contextProvider), contextMap, contextProvider);
    }

    private static final class TimeoutDemandSubscriber<X>
    extends TimeoutPublisher.AbstractTimeoutSubscriber<X> {
        private static final long DEMAND_TIMER_FIRED = -1L;
        private static final AtomicLongFieldUpdater<TimeoutDemandSubscriber> demandUpdater = AtomicLongFieldUpdater.newUpdater(TimeoutDemandSubscriber.class, "demand");
        private final TimeoutDemandPublisher<X> parent;
        private volatile long demand;

        private TimeoutDemandSubscriber(TimeoutDemandPublisher<X> parent, PublisherSource.Subscriber<? super X> target, AsyncContextProvider contextProvider) {
            super(target, contextProvider);
            this.parent = parent;
        }

        static <X> TimeoutDemandSubscriber<X> newInstance(TimeoutDemandPublisher<X> parent, PublisherSource.Subscriber<? super X> target, ContextMap contextMap, AsyncContextProvider contextProvider) {
            TimeoutDemandSubscriber<X> s = new TimeoutDemandSubscriber<X>(parent, target, contextProvider);
            s.initTimer(((TimeoutDemandPublisher)parent).durationNs, ((TimeoutDemandPublisher)parent).timeoutExecutor, contextMap);
            return s;
        }

        @Override
        public void onNext(X x) {
            this.target.onNext(x);
            if (demandUpdater.decrementAndGet(this) == 0L) {
                this.startTimer();
            }
        }

        @Override
        public void request(long n) {
            PublisherSource.Subscription subscription = this.subscription;
            assert (subscription != null);
            if (n > 0L && demandUpdater.getAndAccumulate(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative) == 0L) {
                this.stopTimer(false);
            }
            subscription.request(n);
        }

        @Override
        void timerFires() {
            long currDemand;
            while ((currDemand = this.demand) == 0L) {
                if (!demandUpdater.compareAndSet(this, currDemand, -1L)) continue;
                try {
                    this.stopTimer(true);
                    break;
                }
                finally {
                    this.offloadTimeout(new TimeoutException("no demand timeout after " + TimeUnit.NANOSECONDS.toMillis(((TimeoutDemandPublisher)this.parent).durationNs) + "ms"), ((TimeoutDemandPublisher)this.parent).timeoutExecutor);
                }
            }
        }

        private void startTimer() {
            Cancellable cancellable;
            block0: while ((cancellable = this.timerCancellable) != LOCAL_IGNORE_CANCEL) {
                Cancellable nextTimer = ((TimeoutDemandPublisher)this.parent).timeoutExecutor.schedule(this::timerFires, ((TimeoutDemandPublisher)this.parent).durationNs, TimeUnit.NANOSECONDS);
                if (timerCancellableUpdater.compareAndSet(this, cancellable, nextTimer)) {
                    long currDemand;
                    assert (cancellable == null);
                    do {
                        if ((currDemand = this.demand) <= 0L) continue;
                        nextTimer.cancel();
                        timerCancellableUpdater.compareAndSet(this, nextTimer, null);
                        break block0;
                    } while (!demandUpdater.compareAndSet(this, currDemand, currDemand));
                    break;
                }
                nextTimer.cancel();
            }
        }

        @Override
        void stopTimer(boolean terminal) {
            Cancellable cancellable;
            while ((cancellable = this.timerCancellable) != LOCAL_IGNORE_CANCEL) {
                if (!timerCancellableUpdater.compareAndSet(this, cancellable, terminal ? LOCAL_IGNORE_CANCEL : null)) continue;
                if (cancellable == null) break;
                cancellable.cancel();
                break;
            }
        }
    }
}

