/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractSynchronousPublisherOperator;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.GroupedPublisher;
import io.servicetalk.concurrent.api.MulticastUtils;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractPublisherGroupBy<Key, T>
extends AbstractSynchronousPublisherOperator<T, GroupedPublisher<Key, T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPublisherGroupBy.class);
    final int initialCapacityForGroups;
    final int groupQueueSize;

    AbstractPublisherGroupBy(Publisher<T> original, int groupQueueSize, Executor executor) {
        this(original, groupQueueSize, 4, executor);
    }

    AbstractPublisherGroupBy(Publisher<T> original, int groupQueueSize, int expectedGroupCountHint, Executor executor) {
        super(original, executor);
        if (expectedGroupCountHint <= 0) {
            throw new IllegalArgumentException("expectedGroupCountHint " + expectedGroupCountHint + " (expected >0)");
        }
        this.initialCapacityForGroups = expectedGroupCountHint;
        if (groupQueueSize <= 0) {
            throw new IllegalArgumentException("groupQueueSize " + groupQueueSize + " (expected >0)");
        }
        this.groupQueueSize = groupQueueSize;
    }

    private static abstract class GroupedPublisherSource<Key, T>
    extends GroupedPublisher<Key, T>
    implements PublisherSource<T> {
        GroupedPublisherSource(Executor executor, Key key) {
            super(executor, key);
        }

        public final void subscribe(PublisherSource.Subscriber<? super T> subscriber) {
            this.subscribeInternal(subscriber);
        }
    }

    private static final class GroupSink<Key, T>
    extends MulticastUtils.IndividualMulticastSubscriber<T> {
        final GroupedPublisher<Key, T> groupedPublisher;
        private final AbstractSourceSubscriber<Key, T> sourceSubscriber;

        GroupSink(Executor executor, Key key, int maxQueueSize, AbstractSourceSubscriber<Key, T> sourceSubscriber) {
            super(maxQueueSize);
            this.sourceSubscriber = sourceSubscriber;
            this.groupedPublisher = new GroupedPublisherSource<Key, T>(executor, key){

                @Override
                protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
                    Objects.requireNonNull(subscriber);
                    PublisherSource.Subscriber target = target;
                    if (target != null) {
                        SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new DuplicateSubscribeException((Object)target, subscriber));
                        return;
                    }
                    subscriber.onSubscribe((PublisherSource.Subscription)this);
                    target = subscriber;
                    MulticastUtils.SpscQueue subscriberQueue = this.subscriberQueue();
                    if (subscriberQueue != null) {
                        this.drainPendingFromExternal(subscriberQueue, subscriber);
                    }
                }
            };
        }

        @Override
        String queueIdentifier() {
            return this.groupedPublisher.key().toString();
        }

        @Override
        void requestFromSource(int requestN) {
            this.sourceSubscriber.requestFromGroup(requestN);
        }

        @Override
        void handleInvalidRequestN(long n) {
            this.sourceSubscriber.requestFromGroup(n);
        }

        @Override
        void cancelSourceFromExternal(Throwable cause) {
            this.sourceSubscriber.cancelSourceFromExternal(cause);
            LOGGER.error("Unexpected exception thrown from group {} subscriber", this.groupedPublisher.key(), (Object)cause);
        }

        @Override
        void cancelSourceFromSource(boolean subscriberLockAcquired, Throwable cause) {
            this.sourceSubscriber.removeGroup(this);
            this.sourceSubscriber.cancelSourceFromSource(subscriberLockAcquired, cause);
        }

        public void cancel() {
            this.sourceSubscriber.removeGroup(this);
        }
    }

    static abstract class AbstractSourceSubscriber<Key, T>
    implements PublisherSource.Subscriber<T>,
    PublisherSource.Subscription {
        private static final AtomicLongFieldUpdater<AbstractSourceSubscriber> groupRequestedUpdater = AtomicLongFieldUpdater.newUpdater(AbstractSourceSubscriber.class, "groupRequested");
        private static final AtomicIntegerFieldUpdater<AbstractSourceSubscriber> subscriberStateUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractSourceSubscriber.class, "subscriberState");
        private static final AtomicReferenceFieldUpdater<AbstractSourceSubscriber, Throwable> cancelCauseUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractSourceSubscriber.class, Throwable.class, "cancelCause");
        private boolean terminatedPrematurely;
        @Nullable
        private volatile Throwable cancelCause;
        private volatile long groupRequested;
        private volatile int subscriberState;
        @Nullable
        private volatile PublisherSource.Subscription subscription;
        @Nullable
        private volatile MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> groupQueue;
        private final Executor executor;
        private final PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> target;
        private final Map<Key, GroupSink<Key, T>> groups;

        AbstractSourceSubscriber(Executor executor, int initialCapacityForGroups, PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> target) {
            this.executor = executor;
            this.target = target;
            this.groups = new ConcurrentHashMap<Key, GroupSink<Key, T>>(initialCapacityForGroups, 1.0f, 1);
        }

        public final void onSubscribe(PublisherSource.Subscription s) {
            if (!SubscriberUtils.checkDuplicateSubscription((PublisherSource.Subscription)this.subscription, (PublisherSource.Subscription)s)) {
                return;
            }
            this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)s);
            this.target.onSubscribe((PublisherSource.Subscription)this);
        }

        abstract void onNext0(@Nullable T var1);

        abstract int groupQueueSize();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        final void onNextGroup(Key key, @Nullable T t) {
            GroupSink<Key, T> groupSink = this.groups.get(key);
            if (groupSink == null) {
                int groupSinkQueueSize = this.groupQueueSize();
                if (key instanceof GroupedPublisher.QueueSizeProvider) {
                    try {
                        groupSinkQueueSize = ((GroupedPublisher.QueueSizeProvider)key).calculateMaxQueueSize(this.groupQueueSize());
                        if (groupSinkQueueSize < 0) {
                            throw new IllegalStateException("groupSinkQueueSize: " + groupSinkQueueSize + " (expected >=0)");
                        }
                    }
                    catch (Throwable cause) {
                        this.cancelSourceFromSource(false, cause);
                        return;
                    }
                }
                groupSink = new GroupSink(this.executor, key, groupSinkQueueSize, this);
                GroupSink<Key, T> oldVal = this.groups.put(key, groupSink);
                assert (oldVal == null);
                MulticastUtils.SpscQueue groupQueue = this.groupQueue;
                if (groupQueue != null && !groupQueue.isEmpty()) {
                    if (!groupQueue.offerNext(groupSink.groupedPublisher)) {
                        this.cancelSourceFromSource(false, (Throwable)new QueueFullException("global", this.groupQueueSize()), groupQueue);
                    }
                    this.drainPendingGroupsFromSource(groupQueue);
                } else if (subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                    block26: {
                        try {
                            long groupRequested;
                            do {
                                if ((groupRequested = this.groupRequested) != 0L) continue;
                                if (groupQueue == null) {
                                    groupQueue = new MulticastUtils.SpscQueue(this.groupQueueSize());
                                    this.groupQueue = groupQueue;
                                }
                                if (!groupQueue.offerNext(groupSink.groupedPublisher)) {
                                    this.cancelSourceFromSource(true, (Throwable)new QueueFullException("global", this.groupQueueSize()), groupQueue);
                                }
                                break block26;
                            } while (!groupRequestedUpdater.compareAndSet(this, groupRequested, groupRequested - 1L));
                            try {
                                this.target.onNext(groupSink.groupedPublisher);
                            }
                            catch (Throwable cause) {
                                this.cancelSourceFromSource(true, new IllegalStateException("Unexpected exception thrown from onNext", cause), groupQueue);
                            }
                        }
                        finally {
                            Throwable cause = this.cancelCause;
                            if (cause == null) {
                                this.subscriberState = 0;
                                cause = this.cancelCause;
                                if (cause != null && subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                                    this.sendErrorToAllGroups(cause);
                                    this.subscriberState = 0;
                                }
                            } else {
                                this.sendErrorToAllGroups(cause);
                                this.subscriberState = 0;
                            }
                        }
                    }
                    if (groupQueue == null) {
                        groupQueue = this.groupQueue;
                    }
                    if (groupQueue != null && !groupQueue.isEmpty()) {
                        this.drainPendingGroupsFromSource(groupQueue);
                    }
                } else {
                    if (groupQueue == null) {
                        groupQueue = new MulticastUtils.SpscQueue(this.groupQueueSize());
                        this.groupQueue = groupQueue;
                    }
                    if (!groupQueue.offerNext(groupSink.groupedPublisher)) {
                        this.cancelSourceFromSource(false, (Throwable)new QueueFullException("global", this.groupQueueSize()), groupQueue);
                    }
                    this.drainPendingGroupsFromSource(groupQueue);
                }
            }
            groupSink.onNext(t);
        }

        public final void onNext(T t) {
            if (this.terminatedPrematurely) {
                return;
            }
            this.onNext0(t);
        }

        public final void onError(Throwable t) {
            if (this.terminatedPrematurely) {
                return;
            }
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q = this.groupQueue;
            if (q == null || q.isEmpty() && subscriberStateUpdater.compareAndSet(this, 0, 2)) {
                try {
                    this.target.onError(t);
                }
                finally {
                    this.sendErrorToAllGroups(t);
                }
            } else {
                q.addTerminal(TerminalNotification.error((Throwable)t));
                this.drainPendingGroupsFromSource(q);
            }
        }

        public final void onComplete() {
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q = this.groupQueue;
            if (q == null || q.isEmpty() && subscriberStateUpdater.compareAndSet(this, 0, 2)) {
                try {
                    this.target.onComplete();
                }
                finally {
                    this.sendCompleteToAllGroups();
                }
            } else {
                q.addTerminal(TerminalNotification.complete());
                this.drainPendingGroupsFromSource(q);
            }
        }

        public final void request(long n) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null) : "Subscription can not be null";
            if (!SubscriberUtils.isRequestNValid((long)n)) {
                s.request(n);
                return;
            }
            groupRequestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
            MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q = this.groupQueue;
            if (q == null) {
                s.request(n);
            } else {
                long delivered = this.drainPendingGroupsFromSubscription(q);
                if (delivered >= 0L && delivered < n) {
                    s.request(n - delivered);
                }
            }
        }

        final void requestFromGroup(long n) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null) : "Subscription can not be null";
            s.request(n);
        }

        public final void cancel() {
            this.cancelSourceFromExternal(new CancellationException("Group subscriber cancelled its subscription"));
        }

        final void cancelSourceFromExternal(Throwable cause) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null) : "Subscription can not be null";
            s.cancel();
            if (cancelCauseUpdater.compareAndSet(this, null, cause) && subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                this.sendErrorToAllGroups(cause);
                this.subscriberState = 0;
                MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> groupQueue = this.groupQueue;
                if (groupQueue != null) {
                    this.drainPendingGroupsFromSubscription(groupQueue);
                }
            }
        }

        final void removeGroup(GroupSink<Key, T> groupSink) {
            this.groups.remove(groupSink.groupedPublisher.key(), groupSink);
        }

        final void cancelSourceFromSource(Throwable throwable) {
            this.cancelSourceFromSource(true, throwable);
        }

        final void cancelSourceFromSource(boolean subscriberLockAcquired, Throwable throwable) {
            this.cancelSourceFromSource(subscriberLockAcquired, throwable, this.groupQueue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancelSourceFromSource(boolean subscriberLockAcquired, Throwable throwable, @Nullable MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> pendingGroupsQ) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null) : "Subscription can not be null in cancel()";
            this.terminatedPrematurely = true;
            s.cancel();
            if (pendingGroupsQ != null && !pendingGroupsQ.isEmpty()) {
                pendingGroupsQ.addTerminal(TerminalNotification.error((Throwable)throwable));
                this.drainPendingGroupsFromSource(pendingGroupsQ);
            } else if (subscriberLockAcquired || subscriberStateUpdater.compareAndSet(this, 0, 1)) {
                try {
                    this.target.onError(throwable);
                }
                catch (Throwable onErrorError) {
                    LOGGER.error("Subscriber {} threw from onError for exception {}", new Object[]{this.target, throwable, onErrorError});
                }
                finally {
                    this.sendErrorToAllGroups(throwable);
                    if (!subscriberLockAcquired) {
                        this.subscriberState = 0;
                    }
                }
            } else {
                if (pendingGroupsQ == null) {
                    pendingGroupsQ = new MulticastUtils.SpscQueue(this.groupQueueSize());
                    this.groupQueue = pendingGroupsQ;
                }
                pendingGroupsQ.addTerminal(TerminalNotification.error((Throwable)throwable));
                this.drainPendingGroupsFromSource(pendingGroupsQ);
            }
        }

        private void cancelSourceFromSubscription(Throwable cause) {
            PublisherSource.Subscription s = this.subscription;
            assert (s != null) : "Subscription can not be null in cancel()";
            s.cancel();
            assert (this.subscriberState != 0);
            this.sendErrorToAllGroups(cause);
            LOGGER.error("Unexpected exception thrown from subscriber", cause);
        }

        private void drainPendingGroupsFromSource(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q) {
            this.drainPendingGroups(q, this::cancelSourceFromSource);
        }

        private long drainPendingGroupsFromSubscription(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q) {
            return this.drainPendingGroups(q, this::cancelSourceFromSubscription);
        }

        private long drainPendingGroups(MulticastUtils.SpscQueue<GroupedPublisher<Key, T>> q, Consumer<Throwable> nonTerminalErrorConsumer) {
            return MulticastUtils.drainToSubscriber(q, this.target, subscriberStateUpdater, () -> groupRequestedUpdater.get(this), terminalNotification -> {
                Throwable cause = terminalNotification.cause();
                if (cause == null) {
                    this.sendCompleteToAllGroups();
                } else {
                    this.sendErrorToAllGroups(cause);
                }
            }, nonTerminalErrorConsumer, this::drainPendingGroupsDecrementRequestN, this);
        }

        private void drainPendingGroupsDecrementRequestN(int onNextCount) {
            assert (onNextCount > 0);
            groupRequestedUpdater.addAndGet(this, -onNextCount);
        }

        private void sendErrorToAllGroups(Throwable cause) {
            Iterator<GroupSink<Key, T>> iterator = this.groups.values().iterator();
            while (iterator.hasNext()) {
                GroupSink<Key, T> sink = iterator.next();
                iterator.remove();
                sink.onError(cause);
            }
        }

        private void sendCompleteToAllGroups() {
            Iterator<GroupSink<Key, T>> iterator = this.groups.values().iterator();
            while (iterator.hasNext()) {
                GroupSink<Key, T> sink = iterator.next();
                iterator.remove();
                sink.onComplete();
            }
        }
    }
}

