/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.AbstractAsynchronousPublisherOperator;
import io.servicetalk.concurrent.api.CompositeException;
import io.servicetalk.concurrent.api.DynamicCompositeCancellable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.MapDynamicCompositeCancellable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PublisherFlatMapSingle<T, R>
extends AbstractAsynchronousPublisherOperator<T, R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PublisherFlatMapSingle.class);
    private final Function<? super T, ? extends Single<? extends R>> mapper;
    private final int maxConcurrency;
    private final boolean delayError;

    PublisherFlatMapSingle(Publisher<T> original, Function<? super T, ? extends Single<? extends R>> mapper, boolean delayError, Executor executor) {
        this(original, mapper, 16, delayError, executor);
    }

    PublisherFlatMapSingle(Publisher<T> original, Function<? super T, ? extends Single<? extends R>> mapper, int maxConcurrency, boolean delayError, Executor executor) {
        super(original, executor);
        this.mapper = Objects.requireNonNull(mapper);
        if (maxConcurrency <= 0) {
            throw new IllegalArgumentException("maxConcurrency: " + maxConcurrency + " (expected > 0)");
        }
        this.maxConcurrency = maxConcurrency;
        this.delayError = delayError;
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(PublisherSource.Subscriber<? super R> subscriber) {
        return new FlatMapSubscriber(this, subscriber);
    }

    private static final class FlatMapSubscriber<T, R>
    implements PublisherSource.Subscriber<T>,
    PublisherSource.Subscription {
        private static final AtomicReferenceFieldUpdater<FlatMapSubscriber, CompositeException> delayedErrorUpdater = AtomicReferenceFieldUpdater.newUpdater(FlatMapSubscriber.class, CompositeException.class, "delayedError");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> emittingUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "emitting");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> requestedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "requested");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> sourceRequestedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "sourceRequested");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> sourceEmittedUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "sourceEmitted");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> activeUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "active");
        private static final AtomicReferenceFieldUpdater<FlatMapSubscriber, TerminalNotification> terminalNotificationUpdater = AtomicReferenceFieldUpdater.newUpdater(FlatMapSubscriber.class, TerminalNotification.class, "terminalNotification");
        @Nullable
        private volatile CompositeException delayedError;
        private volatile int emitting;
        private volatile long requested;
        private volatile long sourceEmitted;
        private volatile long sourceRequested;
        private volatile int active;
        @Nullable
        private volatile PublisherSource.Subscription subscription;
        @Nullable
        private volatile TerminalNotification terminalNotification;
        private boolean targetTerminated;
        private final Queue<Object> pending;
        private final DynamicCompositeCancellable cancellable = new MapDynamicCompositeCancellable();
        private final PublisherFlatMapSingle<T, R> source;
        private final PublisherSource.Subscriber<? super R> target;
        private static final Object SINGLE_ERROR = new Object();

        FlatMapSubscriber(PublisherFlatMapSingle<T, R> source, PublisherSource.Subscriber<? super R> target) {
            this.source = source;
            this.target = target;
            this.pending = PlatformDependent.newUnboundedMpscQueue((int)Math.min(2, ((PublisherFlatMapSingle)source).maxConcurrency));
        }

        public void request(long n) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null);
            if (!SubscriberUtils.isRequestNValid((long)n)) {
                s.request(n);
                return;
            }
            requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
            int actualSourceRequestN = SubscriberUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, (int)((PublisherFlatMapSingle)this.source).maxConcurrency, (Object)this);
            if (actualSourceRequestN != 0) {
                s.request((long)actualSourceRequestN);
            }
        }

        public void cancel() {
            this.doCancel(true);
        }

        public void onSubscribe(PublisherSource.Subscription s) {
            if (!SubscriberUtils.checkDuplicateSubscription((PublisherSource.Subscription)this.subscription, (PublisherSource.Subscription)s)) {
                return;
            }
            this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)s);
            this.target.onSubscribe((PublisherSource.Subscription)this);
        }

        public void onNext(T t) {
            Single next = (Single)Objects.requireNonNull(((PublisherFlatMapSingle)this.source).mapper.apply(t));
            activeUpdater.incrementAndGet(this);
            next.subscribeInternal(new FlatMapSingleSubscriber());
        }

        public void onError(Throwable t) {
            if (!this.onError0(t, false, false)) {
                LOGGER.debug("Already terminated/cancelled, ignoring error notification.", t);
            }
        }

        public void onComplete() {
            if (SubscriberUtils.trySetTerminal((TerminalNotification)TerminalNotification.complete(), (boolean)false, terminalNotificationUpdater, (Object)this) && this.active == 0) {
                this.enqueueAndDrain(TerminalNotification.complete());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean onError0(Throwable throwable, boolean overrideComplete, boolean cancelSubscriberIfNecessary) {
            TerminalNotification notification = TerminalNotification.error((Throwable)throwable);
            if (SubscriberUtils.trySetTerminal((TerminalNotification)notification, (boolean)overrideComplete, terminalNotificationUpdater, (Object)this)) {
                try {
                    this.doCancel(cancelSubscriberIfNecessary);
                }
                finally {
                    this.enqueueAndDrain(notification);
                }
                return true;
            }
            return false;
        }

        private void enqueueAndDrain(Object item) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null);
            if (!this.pending.offer(item)) {
                QueueFullException exception = new QueueFullException("pending");
                if (item instanceof TerminalNotification) {
                    LOGGER.error("Queue should be unbounded, but an offer failed!", (Throwable)exception);
                    throw exception;
                }
                this.onError0((Throwable)exception, true, true);
            }
            this.drainPending(s);
        }

        private void drainPending(PublisherSource.Subscription subscription) {
            long drainedCount = ConcurrentUtils.drainSingleConsumerQueue(this.pending, this::sendToTarget, emittingUpdater, (Object)this);
            if (drainedCount != 0L) {
                sourceEmittedUpdater.addAndGet(this, drainedCount);
                int actualSourceRequestN = SubscriberUtils.calculateSourceRequested(requestedUpdater, sourceRequestedUpdater, sourceEmittedUpdater, (int)((PublisherFlatMapSingle)this.source).maxConcurrency, (Object)this);
                if (actualSourceRequestN != 0) {
                    subscription.request((long)actualSourceRequestN);
                }
            }
        }

        private void doCancel(boolean cancelSubscription) {
            this.cancellable.cancel();
            if (cancelSubscription) {
                PublisherSource.Subscription subscription = this.subscription;
                assert (subscription != null);
                subscription.cancel();
            }
        }

        private void sendToTarget(Object item) {
            if (this.targetTerminated || item == SINGLE_ERROR) {
                return;
            }
            if (item instanceof TerminalNotification) {
                this.targetTerminated = true;
                TerminalNotification terminalNotification = this.terminalNotification;
                assert (terminalNotification != null);
                CompositeException de = this.delayedError;
                if (de != null) {
                    de.addAllPendingSuppressed();
                    if (terminalNotification.cause() == de) {
                        terminalNotification.terminate(this.target);
                    } else {
                        terminalNotification.terminate(this.target, (Throwable)de);
                    }
                } else {
                    terminalNotification.terminate(this.target);
                }
            } else if (item == SubscriberApiUtils.NULL_TOKEN) {
                this.target.onNext(null);
            } else {
                Object rItem = item;
                this.target.onNext(rItem);
            }
        }

        private final class FlatMapSingleSubscriber
        implements SingleSource.Subscriber<R> {
            @Nullable
            private Cancellable singleCancellable;

            private FlatMapSingleSubscriber() {
            }

            public void onSubscribe(Cancellable singleCancellable) {
                this.singleCancellable = singleCancellable;
                FlatMapSubscriber.this.cancellable.add(singleCancellable);
            }

            public void onSuccess(@Nullable R result) {
                FlatMapSubscriber.this.enqueueAndDrain(result == null ? SubscriberApiUtils.NULL_TOKEN : result);
                if (this.onSingleTerminated()) {
                    FlatMapSubscriber.this.enqueueAndDrain(TerminalNotification.complete());
                }
            }

            public void onError(Throwable t) {
                if (!FlatMapSubscriber.this.source.delayError) {
                    FlatMapSubscriber.this.onError0(t, true, true);
                } else {
                    CompositeException de = FlatMapSubscriber.this.delayedError;
                    if (de == null) {
                        de = new CompositeException(t);
                        if (!delayedErrorUpdater.compareAndSet(FlatMapSubscriber.this, null, de)) {
                            de = FlatMapSubscriber.this.delayedError;
                            assert (de != null);
                            de.add(t);
                        }
                    } else {
                        de.add(t);
                    }
                    if (this.onSingleTerminated()) {
                        if (SubscriberUtils.trySetTerminal((TerminalNotification)TerminalNotification.error((Throwable)de), (boolean)true, (AtomicReferenceFieldUpdater)terminalNotificationUpdater, (Object)FlatMapSubscriber.this)) {
                            FlatMapSubscriber.this.enqueueAndDrain(TerminalNotification.complete());
                        }
                    } else {
                        FlatMapSubscriber.this.enqueueAndDrain(SINGLE_ERROR);
                    }
                }
            }

            private boolean onSingleTerminated() {
                assert (this.singleCancellable != null);
                FlatMapSubscriber.this.cancellable.remove(this.singleCancellable);
                return activeUpdater.decrementAndGet(FlatMapSubscriber.this) == 0 && FlatMapSubscriber.this.terminalNotification != null;
            }
        }
    }
}

