/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractPublisherGroupBy;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.GroupedPublisher;
import io.servicetalk.concurrent.api.Publisher;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.Nullable;

final class PublisherGroupToMany<Key, T>
extends AbstractPublisherGroupBy<Key, T> {
    private final Function<? super T, ? extends Iterator<? extends Key>> keySelector;
    private final Executor executor;

    PublisherGroupToMany(Publisher<T> original, Function<? super T, ? extends Iterator<? extends Key>> keySelector, int groupQueueSize, Executor executor) {
        super(original, groupQueueSize, executor);
        this.keySelector = Objects.requireNonNull(keySelector);
        this.executor = executor;
    }

    PublisherGroupToMany(Publisher<T> original, Function<? super T, ? extends Iterator<? extends Key>> keySelector, int groupQueueSize, int expectedGroupCountHint, Executor executor) {
        super(original, groupQueueSize, expectedGroupCountHint, executor);
        this.keySelector = Objects.requireNonNull(keySelector);
        this.executor = executor;
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> subscriber) {
        return new SourceSubscriber(this.executor, this, subscriber);
    }

    private static final class SourceSubscriber<Key, T>
    extends AbstractPublisherGroupBy.AbstractSourceSubscriber<Key, T> {
        private final PublisherGroupToMany<Key, T> source;

        SourceSubscriber(Executor executor, PublisherGroupToMany<Key, T> source, PublisherSource.Subscriber<? super GroupedPublisher<Key, T>> target) {
            super(executor, source.initialCapacityForGroups, target);
            this.source = source;
        }

        @Override
        void onNext0(@Nullable T t) {
            Iterator keys;
            try {
                keys = (Iterator)Objects.requireNonNull(((PublisherGroupToMany)this.source).keySelector.apply(t));
            }
            catch (Throwable throwable) {
                this.cancelSourceFromSource(false, throwable);
                return;
            }
            keys.forEachRemaining(key -> this.onNextGroup(key, t));
        }

        @Override
        int groupQueueSize() {
            return this.source.groupQueueSize;
        }
    }
}

