/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.MulticastPublisher;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.ReplayAccumulator;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

final class ReplayPublisher<T>
extends MulticastPublisher<T> {
    private static final AtomicLongFieldUpdater<ReplayState> signalQueuedUpdater = AtomicLongFieldUpdater.newUpdater(ReplayState.class, "signalsQueued");
    private final Supplier<ReplayAccumulator<T>> accumulatorSupplier;

    private ReplayPublisher(Publisher<T> original, Supplier<ReplayAccumulator<T>> accumulatorSupplier, int minSubscribers, boolean cancelUpstream, int maxQueueSize, Function<Throwable, Completable> terminalResubscribe) {
        super(original, minSubscribers, false, cancelUpstream, maxQueueSize, terminalResubscribe);
        this.accumulatorSupplier = Objects.requireNonNull(accumulatorSupplier);
    }

    static <T> MulticastPublisher<T> newReplayPublisher(Publisher<T> original, Supplier<ReplayAccumulator<T>> accumulatorSupplier, int minSubscribers, boolean cancelUpstream, int maxQueueSize, Function<Throwable, Completable> terminalResubscribe) {
        ReplayPublisher<T> publisher = new ReplayPublisher<T>(original, accumulatorSupplier, minSubscribers, cancelUpstream, minSubscribers, terminalResubscribe);
        publisher.resetState(maxQueueSize, minSubscribers);
        return publisher;
    }

    @Override
    void resetState(int maxQueueSize, int minSubscribers) {
        this.state = new ReplayState(maxQueueSize, minSubscribers, this.accumulatorSupplier.get());
    }

    private final class ReplayState
    extends MulticastPublisher.State {
        private final ReplayAccumulator<T> accumulator;
        volatile long signalsQueued;

        ReplayState(int maxQueueSize, int minSubscribers, ReplayAccumulator<T> accumulator) {
            super(ReplayPublisher.this, maxQueueSize, minSubscribers);
            this.accumulator = Objects.requireNonNull(accumulator);
        }

        @Override
        public void onNext(@Nullable T t) {
            if (this.signalsQueued == 0L && ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, this)) {
                try {
                    this.accumulator.accumulate(t);
                    super.onNext(t);
                }
                finally {
                    if (!ConcurrentUtils.releaseLock(subscriptionLockUpdater, this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.queueOnNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (this.signalsQueued == 0L && ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, this)) {
                try {
                    super.onError(t);
                }
                finally {
                    if (!ConcurrentUtils.releaseLock(subscriptionLockUpdater, this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.queueTerminal(TerminalNotification.error(t));
            }
        }

        @Override
        public void onComplete() {
            if (this.signalsQueued == 0L && ConcurrentUtils.tryAcquireLock(subscriptionLockUpdater, this)) {
                try {
                    super.onComplete();
                }
                finally {
                    if (!ConcurrentUtils.releaseLock(subscriptionLockUpdater, this)) {
                        this.processSubscriptionEvents();
                    }
                }
            } else {
                this.queueTerminal(TerminalNotification.complete());
            }
        }

        @Override
        void processOnNextEvent(Object wrapped) {
            signalQueuedUpdater.decrementAndGet(this);
            Object unwrapped = SubscriberApiUtils.unwrapNullUnchecked(wrapped);
            this.accumulator.accumulate(unwrapped);
            super.onNext(unwrapped);
        }

        @Override
        void processTerminal(TerminalNotification terminalNotification) {
            signalQueuedUpdater.decrementAndGet(this);
            if (terminalNotification.cause() != null) {
                super.onError(terminalNotification.cause());
            } else {
                super.onComplete();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        boolean processSubscribeEvent(MulticastPublisher.MulticastFixedSubscriber<T> subscriber, @Nullable MulticastPublisher.TerminalSubscriber<?> terminalSubscriber) {
            if (terminalSubscriber == null) {
                super.processSubscribeEvent(subscriber, (MulticastPublisher.TerminalSubscriber<?>)null);
            }
            Throwable caughtCause = null;
            try {
                this.accumulator.deliverAccumulation(subscriber::onNext);
            }
            catch (Throwable cause) {
                caughtCause = cause;
            }
            finally {
                if (terminalSubscriber != null) {
                    if (caughtCause != null) {
                        if (terminalSubscriber.terminalError != null) {
                            SubscriberUtils.safeOnError(subscriber, ThrowableUtils.addSuppressed(caughtCause, terminalSubscriber.terminalError));
                        } else {
                            SubscriberUtils.safeOnError(subscriber, caughtCause);
                        }
                    } else if (terminalSubscriber.terminalError != null) {
                        SubscriberUtils.safeOnError(subscriber, terminalSubscriber.terminalError);
                    } else {
                        SubscriberUtils.safeOnComplete(subscriber);
                    }
                } else if (caughtCause != null) {
                    SubscriberUtils.safeOnError(subscriber, caughtCause);
                }
            }
            return true;
        }

        @Override
        void upstreamCancelled() {
            this.accumulator.cancelAccumulation();
        }

        private void queueOnNext(@Nullable T t) {
            signalQueuedUpdater.incrementAndGet(this);
            this.subscriptionEvents.add(SubscriberApiUtils.wrapNull(t));
            this.processSubscriptionEvents();
        }

        private void queueTerminal(TerminalNotification terminalNotification) {
            signalQueuedUpdater.incrementAndGet(this);
            this.subscriptionEvents.add(terminalNotification);
            this.processSubscriptionEvents();
        }
    }
}

