/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.AbstractAsynchronousPublisherOperator;
import io.servicetalk.concurrent.api.CancellableSet;
import io.servicetalk.concurrent.api.CompositeExceptionUtils;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PublisherFlatMapSingle<T, R>
extends AbstractAsynchronousPublisherOperator<T, R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PublisherFlatMapSingle.class);
    private final Function<? super T, ? extends Single<? extends R>> mapper;
    private final int maxConcurrency;
    private final int maxDelayedErrors;

    PublisherFlatMapSingle(Publisher<T> original, Function<? super T, ? extends Single<? extends R>> mapper, boolean delayError) {
        this(original, mapper, delayError, 16);
    }

    PublisherFlatMapSingle(Publisher<T> original, Function<? super T, ? extends Single<? extends R>> mapper, boolean delayError, int maxConcurrency) {
        this(original, mapper, CompositeExceptionUtils.maxDelayedErrors(delayError), maxConcurrency);
    }

    PublisherFlatMapSingle(Publisher<T> original, Function<? super T, ? extends Single<? extends R>> mapper, int maxDelayedErrors, int maxConcurrency) {
        super(original);
        if (maxConcurrency <= 0) {
            throw new IllegalArgumentException("maxConcurrency: " + maxConcurrency + " (expected > 0)");
        }
        if (maxDelayedErrors < 0) {
            throw new IllegalArgumentException("maxDelayedErrors: " + maxDelayedErrors + " (expected >=0)");
        }
        this.mapper = Objects.requireNonNull(mapper);
        this.maxConcurrency = maxConcurrency;
        this.maxDelayedErrors = maxDelayedErrors;
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(PublisherSource.Subscriber<? super R> subscriber) {
        return new FlatMapSubscriber(this, subscriber);
    }

    private static final class FlatMapSubscriber<T, R>
    implements PublisherSource.Subscriber<T>,
    PublisherSource.Subscription {
        private static final AtomicReferenceFieldUpdater<FlatMapSubscriber, Throwable> pendingErrorUpdater = AtomicReferenceFieldUpdater.newUpdater(FlatMapSubscriber.class, Throwable.class, "pendingError");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> pendingErrorCountUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "pendingErrorCount");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> emittingUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "emitting");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> requestedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "requested");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> sourceRequestedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "sourceRequested");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> sourceEmittedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "sourceEmitted");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> activeMappedSourcesUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "activeMappedSources");
        private static final Object SINGLE_ERROR = new Object();
        @Nullable
        private volatile Throwable pendingError;
        private volatile int pendingErrorCount;
        private volatile int emitting;
        private volatile long requested;
        private volatile long sourceEmitted;
        private volatile long sourceRequested;
        private volatile int activeMappedSources;
        private boolean targetTerminated;
        @Nullable
        private PublisherSource.Subscription subscription;
        private final Queue<Object> pending;
        private final CancellableSet cancellableSet = new CancellableSet();
        private final PublisherFlatMapSingle<T, R> source;
        private final PublisherSource.Subscriber<? super R> target;

        FlatMapSubscriber(PublisherFlatMapSingle<T, R> source, PublisherSource.Subscriber<? super R> target) {
            this.source = source;
            this.target = target;
            this.pending = PlatformDependent.newUnboundedMpscQueue((int)Math.min(2, ((PublisherFlatMapSingle)source).maxConcurrency));
        }

        @Override
        public void request(long n) {
            assert (this.subscription != null);
            if (SubscriberUtils.isRequestNValid((long)n)) {
                requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
                long actualSourceRequestN = ConcurrentUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, (int)((PublisherFlatMapSingle)this.source).maxConcurrency, (Object)this);
                if (actualSourceRequestN != 0L) {
                    this.subscription.request(actualSourceRequestN);
                }
            } else {
                this.subscription.request(n);
                this.enqueueAndDrain(TerminalNotification.error((Throwable)SubscriberUtils.newExceptionForInvalidRequestN((long)n)));
            }
        }

        @Override
        public void cancel() {
            this.doCancel(true);
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            if (!SubscriberUtils.checkDuplicateSubscription((PublisherSource.Subscription)this.subscription, (PublisherSource.Subscription)s)) {
                return;
            }
            this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)s);
            this.target.onSubscribe(this);
        }

        @Override
        public void onNext(T t) {
            int currValue;
            Single next = (Single)Objects.requireNonNull(((PublisherFlatMapSingle)this.source).mapper.apply(t), () -> "Mapper " + ((PublisherFlatMapSingle)this.source).mapper + " returned null");
            do {
                if ((currValue = this.activeMappedSources) < 0) {
                    throw new IllegalStateException("onNext(" + t + ") after terminal signal delivered to " + this);
                }
                if (currValue != Integer.MAX_VALUE) continue;
                throw new IllegalStateException("Overflow of mapped Publishers for " + this);
            } while (!activeMappedSourcesUpdater.compareAndSet(this, currValue, currValue + 1));
            next.subscribeInternal(new FlatMapSingleSubscriber());
        }

        @Override
        public void onError(Throwable t) {
            this.onError0(t, false);
        }

        @Override
        public void onComplete() {
            if (this.terminateActiveMappedSources()) {
                this.enqueueAndDrain(TerminalNotification.complete());
            }
        }

        private boolean terminateActiveMappedSources() {
            int prevActiveSources;
            do {
                prevActiveSources = this.activeMappedSources;
                assert (prevActiveSources >= 0);
            } while (!activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, -prevActiveSources));
            return prevActiveSources == 0;
        }

        private boolean decrementActiveMappedSources() {
            int prevActiveSources;
            while (true) {
                prevActiveSources = this.activeMappedSources;
                assert (prevActiveSources != 0);
                if (prevActiveSources > 0) {
                    if (!activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, prevActiveSources - 1)) continue;
                    return false;
                }
                if (activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, prevActiveSources + 1)) break;
            }
            return prevActiveSources == -1;
        }

        private void onError0(Throwable throwable, boolean cancelUpstream) {
            try {
                this.doCancel(cancelUpstream);
            }
            finally {
                this.enqueueAndDrain(TerminalNotification.error((Throwable)throwable));
            }
        }

        private void onErrorHoldingLock(Throwable cause) {
            try {
                this.doCancel(true);
            }
            finally {
                this.sendToTarget(TerminalNotification.error((Throwable)cause));
            }
        }

        private void tryEmitItem(Object item) {
            if (ConcurrentUtils.tryAcquireLock(emittingUpdater, (Object)this)) {
                try {
                    this.sendToTarget(item);
                    this.doDrainPostProcessing(1L);
                }
                catch (Throwable cause) {
                    this.onErrorHoldingLock(cause);
                    return;
                }
                if (!ConcurrentUtils.releaseLock(emittingUpdater, (Object)this)) {
                    this.drainPending();
                }
            } else {
                this.enqueueAndDrain(item);
            }
        }

        private void enqueueAndDrain(Object item) {
            if (!this.pending.offer(item)) {
                FlatMapSubscriber.enqueueFailed(item);
            }
            this.drainPending();
        }

        private void drainPending() {
            boolean tryAcquire = true;
            while (tryAcquire && ConcurrentUtils.tryAcquireLock(emittingUpdater, (Object)this)) {
                long drainCount = 0L;
                try {
                    Object t;
                    while ((t = this.pending.poll()) != null) {
                        ++drainCount;
                        this.sendToTarget(t);
                    }
                    if (drainCount != 0L) {
                        this.doDrainPostProcessing(drainCount);
                    }
                }
                catch (Throwable cause) {
                    this.onErrorHoldingLock(cause);
                    return;
                }
                tryAcquire = !ConcurrentUtils.releaseLock(emittingUpdater, (Object)this);
            }
        }

        private void doDrainPostProcessing(long drainCount) {
            assert (this.subscription != null);
            sourceEmittedUpdater.addAndGet(this, drainCount);
            long actualSourceRequestN = ConcurrentUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, (int)((PublisherFlatMapSingle)this.source).maxConcurrency, (Object)this);
            if (actualSourceRequestN != 0L) {
                this.subscription.request(actualSourceRequestN);
            }
        }

        private static void enqueueFailed(Object item) {
            LOGGER.error("Queue should be unbounded, but an offer failed for item {}!", item);
            throw new QueueFullException("pending");
        }

        private void doCancel(boolean cancelUpstream) {
            try {
                if (cancelUpstream) {
                    PublisherSource.Subscription subscription = this.subscription;
                    assert (subscription != null);
                    subscription.cancel();
                }
            }
            finally {
                this.cancellableSet.cancel();
            }
        }

        private void sendToTarget(Object item) {
            if (this.targetTerminated || item == SINGLE_ERROR) {
                return;
            }
            if (item instanceof TerminalNotification) {
                this.targetTerminated = true;
                Throwable currPendingError = this.pendingError;
                if (currPendingError != null) {
                    this.target.onError(currPendingError);
                } else {
                    ((TerminalNotification)item).terminate(this.target);
                }
            } else {
                this.target.onNext(SubscriberApiUtils.unwrapNullUnchecked(item));
            }
        }

        private final class FlatMapSingleSubscriber
        implements SingleSource.Subscriber<R> {
            @Nullable
            private Cancellable singleCancellable;

            private FlatMapSingleSubscriber() {
            }

            @Override
            public void onSubscribe(Cancellable singleCancellable) {
                this.singleCancellable = singleCancellable;
                FlatMapSubscriber.this.cancellableSet.add(singleCancellable);
            }

            @Override
            public void onSuccess(@Nullable R result) {
                if (this.singleCancellable == null) {
                    SubscriberUtils.logDuplicateTerminalOnSuccess((SingleSource.Subscriber)this, result);
                    return;
                }
                FlatMapSubscriber.this.cancellableSet.remove(this.singleCancellable);
                this.singleCancellable = null;
                FlatMapSubscriber.this.tryEmitItem(SubscriberApiUtils.wrapNull(result));
                if (FlatMapSubscriber.this.decrementActiveMappedSources()) {
                    FlatMapSubscriber.this.enqueueAndDrain(TerminalNotification.complete());
                }
            }

            @Override
            public void onError(Throwable t) {
                if (this.singleCancellable == null) {
                    SubscriberUtils.logDuplicateTerminal((SingleSource.Subscriber)this, (Throwable)t);
                    return;
                }
                FlatMapSubscriber.this.cancellableSet.remove(this.singleCancellable);
                this.singleCancellable = null;
                Throwable currPendingError = FlatMapSubscriber.this.pendingError;
                if (FlatMapSubscriber.this.source.maxDelayedErrors == 0) {
                    if (currPendingError == null && pendingErrorUpdater.compareAndSet(FlatMapSubscriber.this, null, t)) {
                        FlatMapSubscriber.this.onError0(t, true);
                    }
                } else {
                    if (currPendingError == null) {
                        if (pendingErrorUpdater.compareAndSet(FlatMapSubscriber.this, null, t)) {
                            currPendingError = t;
                        } else {
                            currPendingError = FlatMapSubscriber.this.pendingError;
                            assert (currPendingError != null);
                            CompositeExceptionUtils.addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, FlatMapSubscriber.this.source.maxDelayedErrors, currPendingError, t);
                        }
                    } else {
                        CompositeExceptionUtils.addPendingError(pendingErrorCountUpdater, FlatMapSubscriber.this, FlatMapSubscriber.this.source.maxDelayedErrors, currPendingError, t);
                    }
                    if (FlatMapSubscriber.this.decrementActiveMappedSources()) {
                        FlatMapSubscriber.this.enqueueAndDrain(TerminalNotification.error((Throwable)currPendingError));
                    } else {
                        FlatMapSubscriber.this.tryEmitItem(SINGLE_ERROR);
                    }
                }
            }
        }
    }
}

