/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractAsynchronousPublisherOperator;
import io.servicetalk.concurrent.api.BufferStrategy;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class PublisherBuffer<T, B>
extends AbstractAsynchronousPublisherOperator<T, B> {
    private final BufferStrategy<T, ?, B> bufferStrategy;

    PublisherBuffer(Publisher<T> original, Executor executor, BufferStrategy<T, ?, B> bufferStrategy) {
        super(original, executor);
        this.bufferStrategy = Objects.requireNonNull(bufferStrategy);
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(final PublisherSource.Subscriber<? super B> subscriber) {
        final int bufferSizeHint = this.bufferStrategy.bufferSizeHint();
        if (bufferSizeHint <= 0) {
            return new PublisherSource.Subscriber<T>(){

                @Override
                public void onSubscribe(PublisherSource.Subscription subscription) {
                    subscription.cancel();
                    SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new IllegalArgumentException("bufferSizeHint: " + bufferSizeHint + " (expected > 0)"));
                }

                @Override
                public void onNext(@Nullable T t) {
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            };
        }
        return new ItemsSubscriber(this.bufferStrategy.boundaries(), subscriber, bufferSizeHint);
    }

    private static final class ItemsTerminated<T, B> {
        @Nullable
        final BufferStrategy.Accumulator<T, B> accumulator;
        final TerminalNotification terminalNotification;

        ItemsTerminated(@Nullable Object maybeAccumulator, TerminalNotification terminalNotification) {
            BufferStrategy.Accumulator accumulator;
            this.accumulator = maybeAccumulator instanceof BufferStrategy.Accumulator ? (accumulator = (BufferStrategy.Accumulator)maybeAccumulator) : null;
            this.terminalNotification = terminalNotification;
        }
    }

    private static final class State {
        private static final Object ADDING = new Object();
        private static final Object TERMINATED = new Object();
        private static final AtomicReferenceFieldUpdater<State, Object> maybeAccumulatorUpdater = AtomicReferenceFieldUpdater.newUpdater(State.class, Object.class, "maybeAccumulator");
        private final int firstItemsRequestN;
        @Nullable
        private volatile Object maybeAccumulator;

        State(int firstItemsRequestN) {
            this.firstItemsRequestN = firstItemsRequestN;
        }

        <T, B> void accumulate(@Nullable T item) {
            Object cMaybeAccumulator;
            do {
                cMaybeAccumulator = this.maybeAccumulator;
                assert (cMaybeAccumulator != null);
                if (cMaybeAccumulator == TERMINATED || cMaybeAccumulator instanceof ItemsTerminated) {
                    return;
                }
                assert (cMaybeAccumulator != ADDING);
            } while (!maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, ADDING));
            BufferStrategy.Accumulator accumulator = (BufferStrategy.Accumulator)cMaybeAccumulator;
            accumulator.accumulate(item);
            maybeAccumulatorUpdater.accumulateAndGet(this, accumulator, (prev, next) -> prev == TERMINATED ? TERMINATED : next);
        }

        <T, B> void nextAccumulator(BufferStrategy.Accumulator<T, B> nextAccumulator, PublisherSource.Subscriber<? super B> target, PublisherSource.Subscription bSubscription, PublisherSource.Subscription itemsSubscription) {
            Object cMaybeAccumulator;
            Objects.requireNonNull(nextAccumulator);
            while (true) {
                if ((cMaybeAccumulator = this.maybeAccumulator) == TERMINATED) {
                    return;
                }
                if (cMaybeAccumulator == null) {
                    if (!maybeAccumulatorUpdater.compareAndSet(this, null, nextAccumulator)) continue;
                    itemsSubscription.request(this.firstItemsRequestN);
                    bSubscription.request(1L);
                    return;
                }
                if (cMaybeAccumulator == ADDING) {
                    if (!maybeAccumulatorUpdater.compareAndSet(this, ADDING, nextAccumulator)) continue;
                    bSubscription.request(1L);
                    return;
                }
                if (cMaybeAccumulator instanceof ItemsTerminated) {
                    bSubscription.cancel();
                    if (!maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, TERMINATED)) continue;
                    ItemsTerminated itemsTerminated = (ItemsTerminated)cMaybeAccumulator;
                    State.terminateTarget(itemsTerminated.accumulator, target, itemsTerminated.terminalNotification, bSubscription);
                    return;
                }
                assert (cMaybeAccumulator instanceof BufferStrategy.Accumulator);
                if (maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, nextAccumulator)) break;
            }
            BufferStrategy.Accumulator oldAccumulator = (BufferStrategy.Accumulator)cMaybeAccumulator;
            target.onNext(oldAccumulator.finish());
        }

        <T, B> void itemsTerminated(TerminalNotification terminalNotification) {
            ItemsTerminated itemsTerminated;
            Object cMaybeAccumulator;
            do {
                if ((cMaybeAccumulator = this.maybeAccumulator) == TERMINATED) {
                    return;
                }
                assert (!(cMaybeAccumulator instanceof ItemsTerminated));
            } while (!maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, itemsTerminated = new ItemsTerminated(cMaybeAccumulator, terminalNotification)));
        }

        void boundariesTerminated(Throwable cause, PublisherSource.Subscriber<?> target) {
            this.maybeAccumulator = TERMINATED;
            target.onError(cause);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private static <T, B> void terminateTarget(@Nullable BufferStrategy.Accumulator<T, B> accumulator, PublisherSource.Subscriber<? super B> target, TerminalNotification terminalNotification, Cancellable bCancellable) {
            try {
                Throwable cause;
                if (accumulator != null) {
                    try {
                        target.onNext(accumulator.finish());
                    }
                    catch (Throwable t) {
                        SubscriberUtils.safeOnError(target, t);
                        bCancellable.cancel();
                        return;
                    }
                }
                if ((cause = terminalNotification.cause()) == null) {
                    SubscriberUtils.safeOnComplete(target);
                } else {
                    SubscriberUtils.safeOnError(target, cause);
                }
            }
            finally {
                bCancellable.cancel();
            }
        }
    }

    private static final class BoundariesSubscriber<T, B>
    implements PublisherSource.Subscriber<BufferStrategy.Accumulator<T, B>> {
        private final State state;
        private final PublisherSource.Subscriber<? super B> target;
        private final PublisherSource.Subscription tSubscription;
        @Nullable
        private ConcurrentSubscription subscription;

        BoundariesSubscriber(State state, PublisherSource.Subscriber<? super B> target, PublisherSource.Subscription tSubscription) {
            this.state = state;
            this.target = target;
            this.tSubscription = tSubscription;
        }

        @Override
        public void onSubscribe(final PublisherSource.Subscription bSubscription) {
            this.subscription = ConcurrentSubscription.wrap(new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    bSubscription.request(n);
                }

                @Override
                public void cancel() {
                    try {
                        bSubscription.cancel();
                    }
                    finally {
                        tSubscription.cancel();
                    }
                }
            });
            this.target.onSubscribe(this.subscription);
        }

        @Override
        public void onNext(@Nonnull BufferStrategy.Accumulator<T, B> accumulator) {
            assert (this.subscription != null);
            this.state.nextAccumulator(accumulator, this.target, this.subscription, this.tSubscription);
        }

        @Override
        public void onError(Throwable t) {
            try {
                this.state.boundariesTerminated(t, this.target);
            }
            finally {
                this.tSubscription.cancel();
            }
        }

        @Override
        public void onComplete() {
            try {
                this.state.boundariesTerminated(new IllegalStateException("Boundaries source completed unexpectedly."), this.target);
            }
            finally {
                this.tSubscription.cancel();
            }
        }
    }

    private static final class ItemsSubscriber<T, B>
    implements PublisherSource.Subscriber<T> {
        private final State state;
        private final DelayedSubscription tSubscription;
        private final int bufferSizeHint;
        @Nullable
        private ConcurrentSubscription subscription;
        private int itemsPending;

        ItemsSubscriber(Publisher<? extends BufferStrategy.Accumulator<T, B>> boundaries, PublisherSource.Subscriber<? super B> target, int bufferSizeHint) {
            this.state = new State(bufferSizeHint);
            this.tSubscription = new DelayedSubscription();
            this.bufferSizeHint = bufferSizeHint;
            this.itemsPending = bufferSizeHint;
            SourceAdapters.toSource(boundaries).subscribe(new BoundariesSubscriber(this.state, target, this.tSubscription));
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
            this.subscription = ConcurrentSubscription.wrap(subscription);
            this.tSubscription.delayedSubscription(this.subscription);
        }

        @Override
        public void onNext(@Nullable T t) {
            assert (this.itemsPending > 0);
            --this.itemsPending;
            this.state.accumulate(t);
            if (this.itemsPending == 0) {
                assert (this.subscription != null);
                this.itemsPending = this.bufferSizeHint;
                this.subscription.request(this.bufferSizeHint);
            }
        }

        @Override
        public void onError(Throwable t) {
            this.state.itemsTerminated(TerminalNotification.error(t));
        }

        @Override
        public void onComplete() {
            this.state.itemsTerminated(TerminalNotification.complete());
        }
    }
}

