/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.api.ReplayAccumulator;
import io.servicetalk.concurrent.api.ReplayStrategyBuilder;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nullable;

public final class ReplayStrategies {
    private ReplayStrategies() {
    }

    public static <T> ReplayStrategyBuilder<T> historyBuilder(int history) {
        return new ReplayStrategyBuilder(() -> new MostRecentReplayAccumulator(history));
    }

    public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Duration ttl, Executor executor) {
        return ReplayStrategies.historyTtlBuilder(historyHint, ttl, executor, true);
    }

    public static <T> ReplayStrategyBuilder<T> historyTtlBuilder(int historyHint, Duration ttl, Executor executor, boolean lazyEviction) {
        return new ReplayStrategyBuilder(lazyEviction ? () -> new LazyTimeLimitedReplayAccumulator(historyHint, ttl, executor) : () -> new EagerTimeLimitedReplayAccumulator(historyHint, ttl, executor));
    }

    private static final class TimeStampSignal<T> {
        final long timeStamp;
        @Nullable
        final T signal;

        private TimeStampSignal(long timeStamp, @Nullable T signal) {
            this.timeStamp = timeStamp;
            this.signal = signal;
        }
    }

    private static final class EagerTimeLimitedReplayAccumulator<T>
    implements ReplayAccumulator<T> {
        private static final Cancellable CANCELLED = () -> {};
        private static final AtomicReferenceFieldUpdater<EagerTimeLimitedReplayAccumulator, Cancellable> timerCancellableUpdater = AtomicReferenceFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, Cancellable.class, "timerCancellable");
        private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueLockUpdater = AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueLock");
        private static final AtomicIntegerFieldUpdater<EagerTimeLimitedReplayAccumulator> queueSizeUpdater = AtomicIntegerFieldUpdater.newUpdater(EagerTimeLimitedReplayAccumulator.class, "queueSize");
        private final Executor executor;
        private final Queue<TimeStampSignal<T>> items;
        private final long ttlNanos;
        private final int maxItems;
        private volatile int queueSize;
        private volatile int queueLock;
        @Nullable
        private volatile Cancellable timerCancellable;

        EagerTimeLimitedReplayAccumulator(int maxItems, Duration ttl, Executor executor) {
            if (ttl.isNegative()) {
                throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
            }
            if (maxItems <= 0) {
                throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
            }
            this.executor = Objects.requireNonNull(executor);
            this.ttlNanos = ttl.toNanos();
            this.maxItems = maxItems;
            this.items = new ConcurrentLinkedQueue<TimeStampSignal<T>>();
        }

        @Override
        public void accumulate(@Nullable T t) {
            long scheduleTimerNanos = -1L;
            TimeStampSignal signal = new TimeStampSignal(this.executor.currentTime(TimeUnit.NANOSECONDS), t);
            if (ConcurrentUtils.tryAcquireLock(queueLockUpdater, this)) {
                block7: {
                    while (true) {
                        int qSize;
                        if ((qSize = this.queueSize) < this.maxItems) {
                            if (!queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) continue;
                            this.items.add(signal);
                            if (qSize == 0) {
                                scheduleTimerNanos = this.ttlNanos;
                            }
                            break block7;
                        }
                        if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) break;
                    }
                    this.items.poll();
                    this.items.add(signal);
                }
                if (!ConcurrentUtils.releaseLock(queueLockUpdater, this)) {
                    scheduleTimerNanos = this.tryDrainQueue();
                }
            } else {
                queueSizeUpdater.incrementAndGet(this);
                this.items.add(signal);
                scheduleTimerNanos = this.tryDrainQueue();
            }
            if (scheduleTimerNanos >= 0L) {
                this.schedulerTimer(scheduleTimerNanos);
            }
        }

        @Override
        public void deliverAccumulation(Consumer<T> consumer) {
            for (TimeStampSignal timeStampSignal : this.items) {
                consumer.accept(timeStampSignal.signal);
            }
        }

        @Override
        public void cancelAccumulation() {
            Cancellable cancellable = timerCancellableUpdater.getAndSet(this, CANCELLED);
            if (cancellable != null) {
                cancellable.cancel();
            }
        }

        private long tryDrainQueue() {
            long scheduleTimerNanos = -1L;
            boolean tryAcquire = true;
            while (tryAcquire && ConcurrentUtils.tryAcquireLock(queueLockUpdater, this)) {
                int qSize;
                while ((qSize = this.queueSize) > this.maxItems) {
                    if (!queueSizeUpdater.compareAndSet(this, qSize, qSize - 1)) continue;
                    this.items.poll();
                }
                scheduleTimerNanos = this.doExpire();
                tryAcquire = !ConcurrentUtils.releaseLock(queueLockUpdater, this);
            }
            return scheduleTimerNanos;
        }

        private void schedulerTimer(long nanos) {
            Cancellable currentCancellable;
            while ((currentCancellable = this.timerCancellable) != CANCELLED) {
                Cancellable nextCancellable = this.executor.schedule(this::timerFire, nanos, TimeUnit.NANOSECONDS);
                if (timerCancellableUpdater.compareAndSet(this, currentCancellable, nextCancellable)) {
                    if (currentCancellable == null) break;
                    currentCancellable.cancel();
                    break;
                }
                nextCancellable.cancel();
            }
        }

        private long doExpire() {
            long delta;
            long nanoTime = this.executor.currentTime(TimeUnit.NANOSECONDS);
            while (true) {
                TimeStampSignal<T> item;
                if ((item = this.items.peek()) == null) {
                    return -1L;
                }
                delta = nanoTime - item.timeStamp;
                if (delta < this.ttlNanos) break;
                int qSize = queueSizeUpdater.decrementAndGet(this);
                assert (qSize >= 0);
                this.items.poll();
            }
            return delta <= 0L ? this.ttlNanos : this.ttlNanos - delta;
        }

        private void timerFire() {
            long scheduleTimerNanos;
            if (ConcurrentUtils.tryAcquireLock(queueLockUpdater, this)) {
                scheduleTimerNanos = this.doExpire();
                if (!ConcurrentUtils.releaseLock(queueLockUpdater, this)) {
                    scheduleTimerNanos = this.tryDrainQueue();
                }
            } else {
                scheduleTimerNanos = this.tryDrainQueue();
            }
            if (scheduleTimerNanos >= 0L) {
                this.schedulerTimer(scheduleTimerNanos);
            }
        }
    }

    private static final class LazyTimeLimitedReplayAccumulator<T>
    implements ReplayAccumulator<T> {
        private final Executor executor;
        private final long ttlNanos;
        private final int maxItems;
        private final Deque<TimeStampSignal<T>> items;

        LazyTimeLimitedReplayAccumulator(int maxItems, Duration ttl, Executor executor) {
            if (ttl.isNegative()) {
                throw new IllegalArgumentException("ttl: " + ttl + "(expected non-negative)");
            }
            if (maxItems <= 0) {
                throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
            }
            this.maxItems = maxItems;
            this.executor = Objects.requireNonNull(executor);
            this.ttlNanos = ttl.toNanos();
            this.items = new ArrayDeque<TimeStampSignal<T>>(Math.min(maxItems, 16));
        }

        @Override
        public void accumulate(@Nullable T t) {
            long nanoTime = this.executor.currentTime(TimeUnit.NANOSECONDS);
            this.trimExpired(nanoTime);
            if (this.items.size() >= this.maxItems) {
                this.items.poll();
            }
            this.items.add(new TimeStampSignal(nanoTime, t));
        }

        @Override
        public void deliverAccumulation(Consumer<T> consumer) {
            if (this.items.isEmpty()) {
                return;
            }
            this.trimExpired(this.executor.currentTime(TimeUnit.NANOSECONDS));
            for (TimeStampSignal<T> next : this.items) {
                consumer.accept(next.signal);
            }
        }

        private void trimExpired(long nanoTime) {
            TimeStampSignal<T> next;
            while ((next = this.items.peek()) != null && nanoTime - next.timeStamp >= this.ttlNanos) {
                this.items.poll();
            }
        }
    }

    private static final class MostRecentReplayAccumulator<T>
    implements ReplayAccumulator<T> {
        private final int maxItems;
        private final Deque<Object> items;

        MostRecentReplayAccumulator(int maxItems) {
            if (maxItems <= 0) {
                throw new IllegalArgumentException("maxItems: " + maxItems + "(expected >0)");
            }
            this.maxItems = maxItems;
            this.items = new ArrayDeque<Object>(Math.min(maxItems, 16));
        }

        @Override
        public void accumulate(@Nullable T t) {
            if (this.items.size() >= this.maxItems) {
                this.items.poll();
            }
            this.items.add(SubscriberApiUtils.wrapNull(t));
        }

        @Override
        public void deliverAccumulation(Consumer<T> consumer) {
            for (Object item : this.items) {
                consumer.accept(SubscriberApiUtils.unwrapNullUnchecked(item));
            }
        }
    }
}

