/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.GroupedPublisher;
import io.servicetalk.concurrent.api.MulticastLeafSubscriber;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.context.api.ContextMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;

abstract class AbstractPublisherGroupBy<Key, T>
extends AbstractNoHandleSubscribePublisher<GroupedPublisher<Key, T>> {
    final Publisher<T> original;
    final int initialCapacityForGroups;
    final int queueLimit;

    AbstractPublisherGroupBy(Publisher<T> original, int queueLimit) {
        this(original, queueLimit, 4);
    }

    AbstractPublisherGroupBy(Publisher<T> original, int queueLimit, int expectedGroupCountHint) {
        if (expectedGroupCountHint <= 0) {
            throw new IllegalArgumentException("expectedGroupCountHint " + expectedGroupCountHint + " (expected >0)");
        }
        this.initialCapacityForGroups = expectedGroupCountHint;
        if (queueLimit <= 0) {
            throw new IllegalArgumentException("queueLimit " + queueLimit + " (expected >0)");
        }
        this.queueLimit = queueLimit;
        this.original = original;
    }

    private static final class DefaultGroupedPublisher<Key, T>
    extends GroupedPublisher<Key, T>
    implements PublisherSource<T> {
        private final GroupMulticastSubscriber<Key, T> groupSink;
        private final ContextMap contextMap;
        private final AsyncContextProvider contextProvider;

        DefaultGroupedPublisher(Key key, GroupMulticastSubscriber<Key, T> groupSink, ContextMap contextMap, AsyncContextProvider contextProvider) {
            super(key);
            this.groupSink = groupSink;
            this.contextMap = contextMap;
            this.contextProvider = contextProvider;
        }

        @Override
        public void subscribe(PublisherSource.Subscriber<? super T> subscriber) {
            this.subscribeInternal(subscriber);
        }

        @Override
        protected void handleSubscribe(PublisherSource.Subscriber<? super T> sub) {
            this.groupSink.subscriber(sub, true, this.contextMap, this.contextProvider);
        }
    }

    private static final class GroupMulticastSubscriber<Key, T>
    extends MulticastLeafSubscriber<T> {
        private static final AtomicIntegerFieldUpdater<GroupMulticastSubscriber> subscriberStateUpdater = AtomicIntegerFieldUpdater.newUpdater(GroupMulticastSubscriber.class, "subscriberState");
        private final AbstractGroupBySubscriber<?, ?> root;
        private final Key key;
        private volatile int subscriberState;
        @Nullable
        private PublisherSource.Subscriber<? super T> subscriber;
        @Nullable
        private PublisherSource.Subscriber<? super T> ctxSubscriber;

        GroupMulticastSubscriber(AbstractGroupBySubscriber<?, ?> root, Key key) {
            this.root = root;
            this.key = key;
        }

        public String toString() {
            return this.key.toString();
        }

        void subscriber(PublisherSource.Subscriber<? super T> subscriber, boolean triggerOnSubscribe, ContextMap contextMap, AsyncContextProvider contextProvider) {
            if (!triggerOnSubscribe) {
                assert (this.subscriber == null && this.ctxSubscriber == null);
                this.subscriber = subscriber;
                this.ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, contextMap);
            } else if (subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                this.subscriber = subscriber;
                this.ctxSubscriber = contextProvider.wrapPublisherSubscriber(subscriber, contextMap);
                this.triggerOnSubscribe();
            } else {
                SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new DuplicateSubscribeException(this.subscriber, subscriber));
            }
        }

        @Override
        @Nullable
        PublisherSource.Subscriber<? super T> subscriber() {
            return this.subscriber;
        }

        @Override
        @Nullable
        PublisherSource.Subscriber<? super T> subscriberOnSubscriptionThread() {
            return this.ctxSubscriber;
        }

        @Override
        void requestUpstream(long n) {
            ((AbstractGroupBySubscriber)this.root).requestUpstream(n);
        }

        @Override
        void cancelUpstream() {
            ((AbstractGroupBySubscriber)this.root).removeSubscriber(this);
        }

        @Override
        int outstandingDemandLimit() {
            return ((AbstractGroupBySubscriber)this.root).queueLimit;
        }
    }

    static abstract class AbstractGroupBySubscriber<Key, T>
    implements PublisherSource.Subscriber<T> {
        private boolean rootCancelled;
        private final int queueLimit;
        private final ContextMap contextMap;
        private final AsyncContextProvider contextProvider;
        private final Map<Key, GroupMulticastSubscriber<Key, T>> groups;
        private final GroupMulticastSubscriber<String, GroupedPublisher<Key, T>> target;
        @Nullable
        private PublisherSource.Subscription subscription;

        AbstractGroupBySubscriber(PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> target, int queueLimit, int initialCapacityForGroups, ContextMap contextMap, AsyncContextProvider contextProvider) {
            this.queueLimit = queueLimit;
            this.contextMap = contextMap;
            this.contextProvider = contextProvider;
            this.target = new GroupMulticastSubscriber(this, "root");
            this.target.subscriber(target, false, contextMap, contextProvider);
            this.groups = new ConcurrentHashMap<Key, GroupMulticastSubscriber<Key, T>>(initialCapacityForGroups);
        }

        @Override
        public final void onSubscribe(PublisherSource.Subscription subscription) {
            if (SubscriberUtils.checkDuplicateSubscription(this.subscription, subscription)) {
                this.subscription = ConcurrentSubscription.wrap(subscription);
                this.target.triggerOnSubscribe();
            }
        }

        @Override
        public final void onError(Throwable t) {
            Throwable delayedCause = this.onTerminal(t, MulticastLeafSubscriber::onError);
            this.target.onError(delayedCause == null ? t : delayedCause);
        }

        @Override
        public final void onComplete() {
            Throwable delayedCause = this.onTerminal(null, (groupSink, t) -> groupSink.onComplete());
            if (delayedCause == null) {
                this.target.onComplete();
            } else {
                this.target.onError(delayedCause);
            }
        }

        final void onNext(Key key, @Nullable T t) {
            GroupMulticastSubscriber<Key, T> groupSub = this.groups.get(key);
            if (groupSub != null) {
                groupSub.onNext(t);
            } else {
                groupSub = new GroupMulticastSubscriber(this, key);
                DefaultGroupedPublisher<Key, T> groupedPublisher = new DefaultGroupedPublisher<Key, T>(key, groupSub, this.contextMap, this.contextProvider);
                GroupMulticastSubscriber<Key, T> oldVal = this.groups.put(key, groupSub);
                assert (oldVal == null);
                groupSub.onNext(t);
                this.target.onNext(groupedPublisher);
            }
        }

        private void requestUpstream(long n) {
            assert (this.subscription != null);
            this.subscription.request(n);
        }

        private void removeSubscriber(GroupMulticastSubscriber<?, ?> subscriber) {
            assert (this.subscription != null);
            if (subscriber == this.target) {
                this.rootCancelled = true;
                if (this.groups.isEmpty()) {
                    this.subscription.cancel();
                }
            } else {
                GroupMulticastSubscriber<?, ?> sub = subscriber;
                if (this.groups.remove(((GroupMulticastSubscriber)sub).key, sub) && this.rootCancelled && this.groups.isEmpty()) {
                    this.subscription.cancel();
                }
            }
        }

        @Nullable
        private Throwable onTerminal(@Nullable Throwable t, BiConsumer<GroupMulticastSubscriber<Key, T>, Throwable> terminator) {
            Throwable delayedCause = null;
            for (GroupMulticastSubscriber<Key, T> groupSink : this.groups.values()) {
                try {
                    terminator.accept(groupSink, t);
                }
                catch (Throwable cause) {
                    delayedCause = ThrowableUtils.catchUnexpected(delayedCause, cause);
                }
            }
            return delayedCause;
        }
    }
}

