/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.BufferStrategy;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ImmediateExecutor;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.utils.internal.NumberUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nullable;

public final class BufferStrategies {
    private BufferStrategies() {
    }

    public static <T> BufferStrategy<T, BufferStrategy.Accumulator<T, Iterable<T>>, Iterable<T>> forCountOrTime(int count, Duration duration) {
        return BufferStrategies.forCountOrTime(count, duration, ImmediateExecutor.IMMEDIATE_EXECUTOR);
    }

    public static <T> BufferStrategy<T, BufferStrategy.Accumulator<T, Iterable<T>>, Iterable<T>> forCountOrTime(int count, Duration duration, Executor executor) {
        return BufferStrategies.forCountOrTime(count, duration, () -> new BufferStrategy.Accumulator<T, Iterable<T>>(){
            private final List accumulate = new ArrayList();

            @Override
            public void accumulate(T t) {
                this.accumulate.add(t);
            }

            @Override
            public Iterable<T> finish() {
                return this.accumulate;
            }
        }, executor);
    }

    public static <T, BC extends BufferStrategy.Accumulator<T, B>, B> BufferStrategy<T, BufferStrategy.Accumulator<T, B>, B> forCountOrTime(int count, Duration duration, Supplier<BC> accumulatorSupplier) {
        return BufferStrategies.forCountOrTime(count, duration, accumulatorSupplier, ImmediateExecutor.IMMEDIATE_EXECUTOR);
    }

    public static <T, BC extends BufferStrategy.Accumulator<T, B>, B> BufferStrategy<T, BufferStrategy.Accumulator<T, B>, B> forCountOrTime(final int count, final Duration duration, final Supplier<BC> accumulatorSupplier, final Executor executor) {
        NumberUtils.ensurePositive(count, "count");
        Objects.requireNonNull(duration);
        Objects.requireNonNull(accumulatorSupplier);
        Objects.requireNonNull(executor);
        return new BufferStrategy<T, BufferStrategy.Accumulator<T, B>, B>(){

            @Override
            public Publisher<BufferStrategy.Accumulator<T, B>> boundaries() {
                return Publisher.defer(() -> this.lambda$boundaries$2((Supplier)accumulatorSupplier, count, executor, duration));
            }

            @Override
            public int bufferSizeHint() {
                return count;
            }

            private /* synthetic */ Publisher lambda$boundaries$2(Supplier accumulatorSupplier2, int count2, Executor executor2, Duration duration2) {
                final State state = new State();
                CountingAccumulator firstAccum = new CountingAccumulator(state, (BufferStrategy.Accumulator)accumulatorSupplier2.get(), count2);
                state.beforeNewAccumulatorEmitted(firstAccum);
                return Single.succeeded(firstAccum).concat(new Completable(){

                    @Override
                    protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
                        try {
                            subscriber.onSubscribe(Cancellable.IGNORE_CANCEL);
                        }
                        catch (Throwable t) {
                            SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, t);
                            return;
                        }
                        state.countCompletableSubscribed(subscriber);
                    }
                }.ambWith(executor2.timer(duration2)).toSingle().map(arg_0 -> 2.lambda$null$0(state, (Supplier)accumulatorSupplier2, count2, arg_0)).repeat(__ -> true));
            }

            private static /* synthetic */ CountingAccumulator lambda$null$0(State state, Supplier accumulatorSupplier2, int count2, Void __) {
                CountingAccumulator accum = new CountingAccumulator(state, (BufferStrategy.Accumulator)accumulatorSupplier2.get(), count2);
                state.beforeNewAccumulatorEmitted(accum);
                return accum;
            }
        };
    }

    private static final class AccumulatorAndSubscriber<T, B> {
        final BufferStrategy.Accumulator<T, B> accumulator;
        final CompletableSource.Subscriber subscriber;

        AccumulatorAndSubscriber(BufferStrategy.Accumulator<T, B> accumulator, CompletableSource.Subscriber subscriber) {
            this.accumulator = accumulator;
            this.subscriber = subscriber;
        }
    }

    static final class CountingAccumulator<T, B>
    implements BufferStrategy.Accumulator<T, B> {
        @Nullable
        private final State<T, B> state;
        private final BufferStrategy.Accumulator<T, B> delegate;
        private final int sizeThreshold;
        private int size;

        CountingAccumulator(BufferStrategy.Accumulator<T, B> delegate) {
            this.state = null;
            this.delegate = delegate;
            this.sizeThreshold = -1;
        }

        CountingAccumulator(State<T, B> state, BufferStrategy.Accumulator<T, B> delegate, int sizeThreshold) {
            this.state = state;
            this.delegate = delegate;
            this.sizeThreshold = sizeThreshold;
        }

        @Override
        public void accumulate(@Nullable T item) {
            ++this.size;
            this.delegate.accumulate(item);
            if (this.size == this.sizeThreshold) {
                assert (this.state != null);
                this.state.countThresholdBreached(this);
            }
        }

        @Override
        public B finish() {
            return this.delegate.finish();
        }

        boolean isEmpty() {
            return this.size == 0;
        }
    }

    private static final class State<T, B> {
        private static final Object THRESHOLD_BREACHED_BEFORE_SUBSCRIBE = new Object();
        private static final AtomicReferenceFieldUpdater<State, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(State.class, Object.class, "state");
        @Nullable
        private volatile Object state;

        private State() {
        }

        void countThresholdBreached(BufferStrategy.Accumulator<T, B> breachedAccumulator) {
            AccumulatorAndSubscriber accumulatorAndSubscriber;
            while (true) {
                Object cState;
                if ((cState = this.state) == THRESHOLD_BREACHED_BEFORE_SUBSCRIBE || cState == null) {
                    return;
                }
                if (cState instanceof BufferStrategy.Accumulator) {
                    if (cState == breachedAccumulator && !stateUpdater.compareAndSet(this, cState, THRESHOLD_BREACHED_BEFORE_SUBSCRIBE)) continue;
                    return;
                }
                if (cState instanceof CompletableSource.Subscriber) {
                    if (!stateUpdater.compareAndSet(this, cState, null)) continue;
                    CompletableSource.Subscriber subscriber = (CompletableSource.Subscriber)cState;
                    SubscriberUtils.safeOnComplete(subscriber);
                    return;
                }
                if (!(cState instanceof AccumulatorAndSubscriber)) continue;
                accumulatorAndSubscriber = (AccumulatorAndSubscriber)cState;
                if (accumulatorAndSubscriber.accumulator != breachedAccumulator) {
                    return;
                }
                if (stateUpdater.compareAndSet(this, cState, null)) break;
            }
            SubscriberUtils.safeOnComplete(accumulatorAndSubscriber.subscriber);
        }

        void countCompletableSubscribed(CompletableSource.Subscriber countSubscriber) {
            while (true) {
                BufferStrategy.Accumulator accumulator;
                Object cState;
                if ((cState = this.state) == THRESHOLD_BREACHED_BEFORE_SUBSCRIBE) {
                    if (!stateUpdater.compareAndSet(this, cState, null)) continue;
                    SubscriberUtils.safeOnComplete(countSubscriber);
                    return;
                }
                if (cState instanceof BufferStrategy.Accumulator ? stateUpdater.compareAndSet(this, cState, new AccumulatorAndSubscriber(accumulator = (BufferStrategy.Accumulator)cState, countSubscriber)) : stateUpdater.compareAndSet(this, cState, countSubscriber)) break;
            }
        }

        void beforeNewAccumulatorEmitted(BufferStrategy.Accumulator<T, B> newAccumulator) {
            this.state = newAccumulator;
        }
    }
}

