/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.PublisherToSingleOperator;
import io.servicetalk.concurrent.api.StacklessCancellationException;
import io.servicetalk.concurrent.api.SubscribableSources;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class FirstAndTailToPackedSingle<Packed, T>
implements PublisherToSingleOperator<T, Packed> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FirstAndTailToPackedSingle.class);
    private final BiFunction<T, Publisher<T>, Packed> packer;

    FirstAndTailToPackedSingle(BiFunction<T, Publisher<T>, Packed> packer) {
        this.packer = Objects.requireNonNull(packer);
    }

    @Override
    public PublisherSource.Subscriber<T> apply(SingleSource.Subscriber<? super Packed> subscriber) {
        return new SplicingSubscriber(this, subscriber);
    }

    private static final class SplicingSubscriber<Data, T>
    implements PublisherSource.Subscriber<T> {
        private static final AtomicReferenceFieldUpdater<SplicingSubscriber, Object> maybeTailSubUpdater = AtomicReferenceFieldUpdater.newUpdater(SplicingSubscriber.class, Object.class, "maybeTailSub");
        private static final String CANCELED = "CANCELED";
        private static final String PENDING = "PENDING";
        private static final String EMPTY_COMPLETED = "EMPTY_COMPLETED";
        private static final String EMPTY_COMPLETED_DELIVERED = "EMPTY_COMPLETED_DELIVERED";
        @Nullable
        private volatile Object maybeTailSub;
        @Nullable
        private PublisherSource.Subscriber<T> tailSubscriber;
        private boolean firstElementSeenInOnNext;
        @Nullable
        private PublisherSource.Subscription rawSubscription;
        private boolean onSubscribeSent;
        private final FirstAndTailToPackedSingle<Data, T> parent;
        private final SingleSource.Subscriber<? super Data> dataSubscriber;

        private SplicingSubscriber(FirstAndTailToPackedSingle<Data, T> parent, SingleSource.Subscriber<? super Data> dataSubscriber) {
            this.parent = parent;
            this.dataSubscriber = dataSubscriber;
        }

        private void cancelData(PublisherSource.Subscription subscription) {
            Object current = maybeTailSubUpdater.getAndUpdate(this, curr -> curr == null || curr == PENDING ? CANCELED : curr);
            if (current == null || current == PENDING) {
                subscription.cancel();
            }
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription inStreamSubscription) {
            if (!SubscriberUtils.checkDuplicateSubscription((PublisherSource.Subscription)this.rawSubscription, (PublisherSource.Subscription)inStreamSubscription)) {
                return;
            }
            this.rawSubscription = inStreamSubscription;
            this.rawSubscription.request(1L);
            if (!this.onSubscribeSent) {
                this.onSubscribeSent = true;
                this.dataSubscriber.onSubscribe(() -> this.cancelData(inStreamSubscription));
            }
        }

        @Override
        public void onNext(@Nullable T next) {
            if (this.firstElementSeenInOnNext) {
                if (this.tailSubscriber != null) {
                    this.tailSubscriber.onNext(next);
                } else {
                    Object subscriber = this.maybeTailSub;
                    if (subscriber instanceof PublisherSource.Subscriber) {
                        this.tailSubscriber = (PublisherSource.Subscriber)subscriber;
                        this.tailSubscriber.onNext(next);
                    }
                }
            } else {
                Object data;
                this.ensureResultSubscriberOnSubscribe();
                this.firstElementSeenInOnNext = true;
                try {
                    Publisher<T> tail;
                    if (maybeTailSubUpdater.compareAndSet(this, null, PENDING)) {
                        tail = this.newTailPublisher();
                    } else {
                        Object maybeTailSub = this.maybeTailSub;
                        assert (maybeTailSub == CANCELED) : "Expected CANCELED but got: " + maybeTailSub;
                        boolean cas = maybeTailSubUpdater.compareAndSet(this, CANCELED, EMPTY_COMPLETED_DELIVERED);
                        assert (cas) : "Could not transition from CANCELED to EMPTY_COMPLETED_DELIVERED";
                        tail = Publisher.failed(StacklessCancellationException.newInstance("Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " + maybeTailSub, this.getClass(), "onNext(...)"));
                    }
                    data = ((FirstAndTailToPackedSingle)this.parent).packer.apply(next, tail);
                    assert (data != null) : "Packer function must return non-null Data";
                }
                catch (Throwable t) {
                    assert (this.rawSubscription != null) : "Expected rawSubscription but got null";
                    this.rawSubscription.cancel();
                    this.dataSubscriber.onError(t);
                    return;
                }
                this.dataSubscriber.onSuccess(data);
            }
        }

        @Nonnull
        private Publisher<T> newTailPublisher() {
            return new SubscribableSources.SubscribablePublisher<T>(){

                @Override
                protected void handleSubscribe(PublisherSource.Subscriber<? super T> newSubscriber) {
                    DelayedSubscription delayedSubscription = new DelayedSubscription();
                    try {
                        newSubscriber.onSubscribe((PublisherSource.Subscription)delayedSubscription);
                    }
                    catch (Throwable t) {
                        SubscriberUtils.handleExceptionFromOnSubscribe(newSubscriber, (Throwable)t);
                        if (maybeTailSubUpdater.compareAndSet(this, SplicingSubscriber.PENDING, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            PublisherSource.Subscription subscription = rawSubscription;
                            assert (subscription != null) : "Expected rawSubscription but got null";
                            subscription.cancel();
                        }
                        return;
                    }
                    if (maybeTailSubUpdater.compareAndSet(this, SplicingSubscriber.PENDING, newSubscriber)) {
                        assert (rawSubscription != null) : "Expected rawSubscription but got null";
                        delayedSubscription.delayedSubscription(rawSubscription);
                    } else {
                        Object maybeSubscriber = maybeTailSub;
                        delayedSubscription.delayedSubscription(EmptySubscriptions.EMPTY_SUBSCRIPTION);
                        if (maybeSubscriber == SplicingSubscriber.EMPTY_COMPLETED && maybeTailSubUpdater.compareAndSet(this, SplicingSubscriber.EMPTY_COMPLETED, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            newSubscriber.onComplete();
                        } else if (maybeSubscriber instanceof Throwable && maybeTailSubUpdater.compareAndSet(this, maybeSubscriber, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            newSubscriber.onError((Throwable)maybeSubscriber);
                        } else if (maybeSubscriber == SplicingSubscriber.CANCELED && maybeTailSubUpdater.compareAndSet(this, maybeSubscriber, SplicingSubscriber.EMPTY_COMPLETED_DELIVERED)) {
                            newSubscriber.onError(new CancellationException("Canceled prematurely from SplicingSubscriber.cancelData(..), current state: " + maybeSubscriber));
                        } else {
                            newSubscriber.onError((Throwable)new DuplicateSubscribeException(maybeSubscriber, newSubscriber, "tail can only be subscribed to once"));
                        }
                    }
                }
            };
        }

        @Override
        public void onError(Throwable t) {
            if (this.tailSubscriber != null) {
                this.tailSubscriber.onError(t);
            } else {
                Object maybeSubscriber = maybeTailSubUpdater.getAndSet(this, t);
                if (!this.firstElementSeenInOnNext) {
                    this.ensureResultSubscriberOnSubscribe();
                    this.dataSubscriber.onError(t);
                } else if (maybeSubscriber instanceof PublisherSource.Subscriber) {
                    if (maybeTailSubUpdater.compareAndSet(this, t, EMPTY_COMPLETED_DELIVERED)) {
                        ((PublisherSource.Subscriber)maybeSubscriber).onError(t);
                    } else {
                        this.terminateWithIllegalStateException((PublisherSource.Subscriber)maybeSubscriber);
                    }
                } else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
                    LOGGER.debug("Discarding a terminal error from upstream because the tail publisher was already terminated", t);
                } else {
                    LOGGER.debug("Terminal error queued for delayed delivery to the tail publisher. If the tail is not subscribed, this event will not be delivered.", t);
                }
            }
        }

        @Override
        public void onComplete() {
            if (this.tailSubscriber != null) {
                this.tailSubscriber.onComplete();
            } else {
                Object maybeSubscriber = maybeTailSubUpdater.getAndSet(this, EMPTY_COMPLETED);
                if (maybeSubscriber instanceof PublisherSource.Subscriber) {
                    if (maybeTailSubUpdater.compareAndSet(this, EMPTY_COMPLETED, EMPTY_COMPLETED_DELIVERED)) {
                        ((PublisherSource.Subscriber)maybeSubscriber).onComplete();
                    } else {
                        this.terminateWithIllegalStateException((PublisherSource.Subscriber)maybeSubscriber);
                    }
                } else if (!this.firstElementSeenInOnNext) {
                    this.ensureResultSubscriberOnSubscribe();
                    this.dataSubscriber.onError(new IllegalStateException("Stream unexpectedly completed without emitting any items"));
                } else if (maybeSubscriber == EMPTY_COMPLETED_DELIVERED) {
                    LOGGER.debug("Discarding a terminal complete from upstream because the tail publisher was already terminated");
                }
            }
        }

        private void ensureResultSubscriberOnSubscribe() {
            assert (!this.firstElementSeenInOnNext) : "Already seen first element";
            if (!this.onSubscribeSent) {
                this.onSubscribeSent = true;
                this.dataSubscriber.onSubscribe(Cancellable.IGNORE_CANCEL);
            }
        }

        private void terminateWithIllegalStateException(PublisherSource.Subscriber<T> subscriber) {
            subscriber.onError(new IllegalStateException("Duplicate Subscribers are not allowed. Existing: " + subscriber + ", failed the race with a duplicate, but neither has seen onNext()"));
        }
    }
}

