/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribeSingle;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.PublishAndSubscribeOnSingles;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SignalOffloader;
import io.servicetalk.context.api.ContextMap;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;

final class ReduceSingle<R, T>
extends AbstractNoHandleSubscribeSingle<R> {
    private final Publisher<T> source;
    private final Supplier<? extends R> resultFactory;
    private final BiFunction<? super R, ? super T, R> reducer;

    ReduceSingle(Publisher<T> source, Supplier<? extends R> resultFactory, BiFunction<? super R, ? super T, R> reducer) {
        super(source.executor());
        this.source = Objects.requireNonNull(source);
        this.resultFactory = Objects.requireNonNull(resultFactory);
        this.reducer = Objects.requireNonNull(reducer);
    }

    @Override
    void handleSubscribe(SingleSource.Subscriber<? super R> singleSubscriber, SignalOffloader signalOffloader, ContextMap contextMap, AsyncContextProvider contextProvider) {
        R r;
        try {
            r = this.resultFactory.get();
        }
        catch (Throwable t) {
            PublishAndSubscribeOnSingles.deliverOnSubscribeAndOnError(singleSubscriber, signalOffloader, contextMap, contextProvider, t);
            return;
        }
        PublisherSource.Subscriber offloadedSubscription = signalOffloader.offloadSubscription(contextProvider.wrapSubscription(new ReduceSubscriber<R, T>(r, this.reducer, singleSubscriber), contextMap));
        this.source.delegateSubscribe(offloadedSubscription, signalOffloader, contextMap, contextProvider);
    }

    private static final class ReduceSubscriber<R, T>
    extends DelayedCancellable
    implements PublisherSource.Subscriber<T> {
        private final BiFunction<? super R, ? super T, R> reducer;
        private final SingleSource.Subscriber<? super R> subscriber;
        @Nullable
        private R result;

        ReduceSubscriber(@Nullable R result, BiFunction<? super R, ? super T, R> reducer, SingleSource.Subscriber<? super R> subscriber) {
            this.result = result;
            this.reducer = reducer;
            this.subscriber = subscriber;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscriber.onSubscribe(this);
            s.request(Long.MAX_VALUE);
            this.delayedCancellable(s);
        }

        @Override
        public void onNext(T t) {
            this.result = this.reducer.apply(this.result, t);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.subscriber.onSuccess(this.result);
        }
    }
}

