/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractAsynchronousPublisherOperator;
import io.servicetalk.concurrent.api.BufferStrategies;
import io.servicetalk.concurrent.api.BufferStrategy;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.DelayedSubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class PublisherBuffer<T, B>
extends AbstractAsynchronousPublisherOperator<T, B> {
    private final BufferStrategy<T, ?, B> bufferStrategy;

    PublisherBuffer(Publisher<T> original, BufferStrategy<T, ?, B> bufferStrategy) {
        super(original);
        this.bufferStrategy = Objects.requireNonNull(bufferStrategy);
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(final PublisherSource.Subscriber<? super B> subscriber) {
        final int bufferSizeHint = this.bufferStrategy.bufferSizeHint();
        if (bufferSizeHint <= 0) {
            return new PublisherSource.Subscriber<T>(){

                @Override
                public void onSubscribe(PublisherSource.Subscription subscription) {
                    try {
                        subscription.cancel();
                    }
                    finally {
                        SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new IllegalArgumentException("bufferSizeHint: " + bufferSizeHint + " (expected > 0)"));
                    }
                }

                @Override
                public void onNext(@Nullable T t) {
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            };
        }
        return new ItemsSubscriber(this.bufferStrategy.boundaries(), subscriber, bufferSizeHint);
    }

    private static <T, B> BufferStrategies.CountingAccumulator<T, B> toCounting(BufferStrategy.Accumulator<T, B> accumulator) {
        return BufferStrategies.CountingAccumulator.class.equals(accumulator.getClass()) ? (BufferStrategies.CountingAccumulator)accumulator : new BufferStrategies.CountingAccumulator<T, B>(accumulator);
    }

    private static final class ItemsTerminated<T, B> {
        final BufferStrategies.CountingAccumulator<T, B> accumulator;
        final TerminalNotification terminalNotification;

        ItemsTerminated(BufferStrategies.CountingAccumulator<T, B> accumulator, TerminalNotification terminalNotification) {
            this.accumulator = accumulator;
            this.terminalNotification = terminalNotification;
        }
    }

    private static final class NextAccumulatorHolder<T, B> {
        final BufferStrategies.CountingAccumulator<T, B> accumulator;

        NextAccumulatorHolder(BufferStrategy.Accumulator<T, B> accumulator) {
            this.accumulator = PublisherBuffer.toCounting(accumulator);
        }
    }

    private static final class State {
        private static final Object ADDING = new Object();
        private static final Object TERMINATED = new Object();
        private static final AtomicReferenceFieldUpdater<State, Object> maybeAccumulatorUpdater = AtomicReferenceFieldUpdater.newUpdater(State.class, Object.class, "maybeAccumulator");
        private static final long NEED_TERMINATE = Long.MIN_VALUE;
        private static final long NEED_REQUEST_ITEMS = -9223372036854775807L;
        private static final long LAST_SPECIAL_STATE = -9223372036854775807L;
        private static final AtomicLongFieldUpdater<State> pendingUpdater = AtomicLongFieldUpdater.newUpdater(State.class, "pending");
        @Nullable
        private volatile Object maybeAccumulator;
        private volatile long pending;
        final int itemsRequestN;

        State(int itemsRequestN) {
            this.itemsRequestN = itemsRequestN;
        }

        <T, B> boolean requested(long n, PublisherSource.Subscriber<? super B> target) {
            assert (SubscriberUtils.isRequestNValid(n));
            long oldPending = pendingUpdater.getAndAccumulate(this, n, (prev, nValue) -> prev <= -9223372036854775807L ? nValue : FlowControlUtils.addWithOverflowProtection(prev, nValue));
            if (oldPending == Long.MIN_VALUE) {
                ItemsTerminated it = (ItemsTerminated)this.maybeAccumulator;
                assert (it != null);
                this.maybeAccumulator = TERMINATED;
                this.terminateTarget(it.accumulator, target, it.terminalNotification);
            }
            return oldPending == -9223372036854775807L;
        }

        boolean requestMore() {
            long pending = pendingUpdater.accumulateAndGet(this, -9223372036854775807L, (prev, next) -> prev > 0L || prev == Long.MIN_VALUE ? prev : next);
            return pending > 0L;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        <T, B> void accumulate(@Nullable T item, PublisherSource.Subscriber<? super B> target) {
            Object cMaybeAccumulator;
            do {
                cMaybeAccumulator = this.maybeAccumulator;
                assert (cMaybeAccumulator != null);
                assert (!(cMaybeAccumulator instanceof ItemsTerminated));
                if (cMaybeAccumulator == TERMINATED) {
                    return;
                }
                assert (cMaybeAccumulator != ADDING);
            } while (!maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, ADDING));
            BufferStrategies.CountingAccumulator accumulator = NextAccumulatorHolder.class.equals(cMaybeAccumulator.getClass()) ? ((NextAccumulatorHolder)cMaybeAccumulator).accumulator : (BufferStrategies.CountingAccumulator)cMaybeAccumulator;
            accumulator.accumulate(item);
            Object nextState = maybeAccumulatorUpdater.accumulateAndGet(this, accumulator, (prev, next) -> prev == ADDING ? next : prev);
            if (nextState == accumulator || nextState == TERMINATED) {
                return;
            }
            NextAccumulatorHolder holder = (NextAccumulatorHolder)nextState;
            try {
                this.deliverOnNext(accumulator, target);
            }
            finally {
                this.unwrapHolderState(holder);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        <T, B> void nextAccumulator(BufferStrategy.Accumulator<T, B> nextAccumulator, PublisherSource.Subscriber<? super B> target, PublisherSource.Subscription bSubscription, PublisherSource.Subscription tSubscription) {
            Object nextState;
            NextAccumulatorHolder<T, B> holder;
            Object cMaybeAccumulator;
            Objects.requireNonNull(nextAccumulator);
            while (true) {
                if ((cMaybeAccumulator = this.maybeAccumulator) == TERMINATED) {
                    return;
                }
                if (cMaybeAccumulator == null) {
                    if (!maybeAccumulatorUpdater.compareAndSet(this, null, PublisherBuffer.toCounting(nextAccumulator))) continue;
                    tSubscription.request(this.itemsRequestN);
                    bSubscription.request(1L);
                    return;
                }
                if (ItemsTerminated.class.equals(cMaybeAccumulator.getClass())) {
                    return;
                }
                if (cMaybeAccumulator == ADDING) {
                    if (!maybeAccumulatorUpdater.compareAndSet(this, ADDING, new NextAccumulatorHolder<T, B>(nextAccumulator))) continue;
                    return;
                }
                if (NextAccumulatorHolder.class.equals(cMaybeAccumulator.getClass())) {
                    bSubscription.request(1L);
                    return;
                }
                assert (cMaybeAccumulator instanceof BufferStrategies.CountingAccumulator);
                holder = new NextAccumulatorHolder<T, B>(nextAccumulator);
                if (maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, holder)) break;
            }
            BufferStrategy.Accumulator oldAccumulator = (BufferStrategy.Accumulator)cMaybeAccumulator;
            try {
                this.deliverOnNext(oldAccumulator, target);
            }
            finally {
                nextState = this.unwrapHolderState(holder);
            }
            if (ItemsTerminated.class.equals(nextState.getClass())) {
                ItemsTerminated it = (ItemsTerminated)nextState;
                this.terminateIfPossible(it.accumulator, target, it.terminalNotification);
            }
        }

        <T, B> Object unwrapHolderState(NextAccumulatorHolder<T, B> holder) {
            return maybeAccumulatorUpdater.accumulateAndGet(this, holder.accumulator, (prev, next) -> prev == holder ? next : prev);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        <T, B> void itemsTerminated(TerminalNotification terminalNotification, PublisherSource.Subscriber<? super B> target, Cancellable bCancellable) {
            BufferStrategies.CountingAccumulator accumulator;
            while (true) {
                Object cMaybeAccumulator = this.maybeAccumulator;
                assert (cMaybeAccumulator != ADDING);
                if (cMaybeAccumulator == TERMINATED) {
                    return;
                }
                if (cMaybeAccumulator != null && NextAccumulatorHolder.class.equals(cMaybeAccumulator.getClass())) {
                    ItemsTerminated itemsTerminated = new ItemsTerminated(((NextAccumulatorHolder)cMaybeAccumulator).accumulator, terminalNotification);
                    if (!maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, itemsTerminated)) continue;
                    bCancellable.cancel();
                    return;
                }
                if (cMaybeAccumulator == null) {
                    if (!maybeAccumulatorUpdater.compareAndSet(this, null, TERMINATED)) continue;
                    try {
                        bCancellable.cancel();
                    }
                    finally {
                        this.terminateTarget(null, target, terminalNotification);
                    }
                    return;
                }
                accumulator = (BufferStrategies.CountingAccumulator)cMaybeAccumulator;
                if (maybeAccumulatorUpdater.compareAndSet(this, cMaybeAccumulator, new ItemsTerminated(accumulator, terminalNotification))) break;
            }
            try {
                bCancellable.cancel();
            }
            finally {
                this.terminateIfPossible(accumulator, target, terminalNotification);
            }
        }

        void boundariesTerminated(Throwable cause, PublisherSource.Subscriber<?> target) {
            this.maybeAccumulator = TERMINATED;
            SubscriberUtils.safeOnError(target, cause);
        }

        private <T, B> void deliverOnNext(BufferStrategy.Accumulator<T, B> accumulator, PublisherSource.Subscriber<? super B> target) {
            long pending = pendingUpdater.accumulateAndGet(this, 1L, (prev, decrement) -> prev <= -9223372036854775807L ? prev : prev - decrement);
            assert (pending >= 0L || pending == Long.MIN_VALUE);
            target.onNext(accumulator.finish());
        }

        private <T, B> void terminateIfPossible(BufferStrategies.CountingAccumulator<T, B> accumulator, PublisherSource.Subscriber<? super B> target, TerminalNotification terminalNotification) {
            if (accumulator.isEmpty()) {
                this.maybeAccumulator = TERMINATED;
                this.terminateTarget(null, target, terminalNotification);
            } else {
                long demand = pendingUpdater.accumulateAndGet(this, Long.MIN_VALUE, (prev, next) -> prev > 0L ? prev : next);
                if (demand > 0L) {
                    this.maybeAccumulator = TERMINATED;
                    this.terminateTarget(accumulator, target, terminalNotification);
                }
            }
        }

        private <T, B> void terminateTarget(@Nullable BufferStrategy.Accumulator<T, B> accumulator, PublisherSource.Subscriber<? super B> target, TerminalNotification terminalNotification) {
            Throwable cause;
            if (accumulator != null) {
                try {
                    this.deliverOnNext(accumulator, target);
                }
                catch (Throwable t) {
                    SubscriberUtils.safeOnError(target, t);
                    return;
                }
            }
            if ((cause = terminalNotification.cause()) == null) {
                SubscriberUtils.safeOnComplete(target);
            } else {
                SubscriberUtils.safeOnError(target, cause);
            }
        }
    }

    private static final class BoundariesSubscriber<T, B>
    implements PublisherSource.Subscriber<BufferStrategy.Accumulator<T, B>> {
        private final State state;
        private final PublisherSource.Subscriber<? super B> target;
        private final DelayedSubscription bSubscription;
        private final PublisherSource.Subscription tSubscription;

        BoundariesSubscriber(State state, PublisherSource.Subscriber<? super B> target, DelayedSubscription bSubscription, PublisherSource.Subscription tSubscription) {
            this.state = state;
            this.target = target;
            this.bSubscription = bSubscription;
            this.tSubscription = tSubscription;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
            final ConcurrentSubscription cs = ConcurrentSubscription.wrap(subscription);
            this.bSubscription.delayedSubscription(new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    cs.request(n);
                }

                @Override
                public void cancel() {
                    try {
                        cs.cancel();
                    }
                    finally {
                        tSubscription.cancel();
                    }
                }
            });
            this.target.onSubscribe(new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    if (SubscriberUtils.isRequestNValid(n)) {
                        boolean needRequestItems = state.requested(n, target);
                        bSubscription.request(n);
                        if (needRequestItems) {
                            tSubscription.request(((BoundariesSubscriber)this).state.itemsRequestN);
                        }
                    } else {
                        bSubscription.request(n);
                    }
                }

                @Override
                public void cancel() {
                    bSubscription.cancel();
                }
            });
        }

        @Override
        public void onNext(@Nonnull BufferStrategy.Accumulator<T, B> accumulator) {
            this.state.nextAccumulator(accumulator, this.target, this.bSubscription, this.tSubscription);
        }

        @Override
        public void onError(Throwable t) {
            try {
                this.tSubscription.cancel();
            }
            finally {
                this.state.boundariesTerminated(t, this.target);
            }
        }

        @Override
        public void onComplete() {
            try {
                this.tSubscription.cancel();
            }
            finally {
                this.state.boundariesTerminated(new IllegalStateException("Boundaries source completed unexpectedly."), this.target);
            }
        }
    }

    private static final class ItemsSubscriber<T, B>
    implements PublisherSource.Subscriber<T> {
        private static final AtomicIntegerFieldUpdater<ItemsSubscriber> itemsPendingUpdater = AtomicIntegerFieldUpdater.newUpdater(ItemsSubscriber.class, "itemsPending");
        private final State state;
        private final PublisherSource.Subscriber<? super B> target;
        private final DelayedSubscription bSubscription;
        private final DelayedSubscription tSubscription;
        private final int bufferSizeHint;
        private volatile int itemsPending;

        ItemsSubscriber(Publisher<? extends BufferStrategy.Accumulator<T, B>> boundaries, PublisherSource.Subscriber<? super B> target, int bufferSizeHint) {
            this.state = new State(bufferSizeHint);
            this.target = target;
            this.bSubscription = new DelayedSubscription();
            this.tSubscription = new DelayedSubscription();
            this.bufferSizeHint = bufferSizeHint;
            SourceAdapters.toSource(boundaries).subscribe(new BoundariesSubscriber(this.state, target, this.bSubscription, this.tSubscription));
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
            final ConcurrentSubscription cs = ConcurrentSubscription.wrap(subscription);
            this.tSubscription.delayedSubscription(new PublisherSource.Subscription(){

                @Override
                public void request(long n) {
                    assert (n == (long)bufferSizeHint);
                    itemsPending = (int)n;
                    cs.request(n);
                }

                @Override
                public void cancel() {
                    cs.cancel();
                }
            });
        }

        @Override
        public void onNext(@Nullable T t) {
            int cItemsPending = itemsPendingUpdater.decrementAndGet(this);
            assert (cItemsPending >= 0);
            try {
                this.state.accumulate(t, this.target);
            }
            finally {
                if (cItemsPending == 0 && this.state.requestMore()) {
                    this.tSubscription.request(this.bufferSizeHint);
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            this.state.itemsTerminated(TerminalNotification.error(t), this.target, this.bSubscription);
        }

        @Override
        public void onComplete() {
            this.state.itemsTerminated(TerminalNotification.complete(), this.target, this.bSubscription);
        }
    }
}

