package io.airlift.concurrent;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToLongFunction;

@ThreadSafe
/* loaded from: input_file:io/airlift/concurrent/DynamicSizeBoundQueue.class */
public class DynamicSizeBoundQueue<T> {
    private final AtomicLong size;
    private final Queue<ElementAndSize<T>> queue;
    private final AtomicReference<SettableFuture<Void>> enqueueFuture;
    private final AtomicReference<SettableFuture<Void>> dequeueFuture;
    private final long maxSize;
    private final ToLongFunction<T> elementSizeFunction;
    private final Ticker ticker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize.class */
    public static final class ElementAndSize<T> extends Record {
        private final T element;
        private final long size;

        private ElementAndSize(T t, long j) {
            Objects.requireNonNull(t, "element is null");
            Preconditions.checkArgument(j > 0, "size must be positive");
            this.element = t;
            this.size = j;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ElementAndSize.class), ElementAndSize.class, "element;size", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->element:Ljava/lang/Object;", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->size:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ElementAndSize.class), ElementAndSize.class, "element;size", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->element:Ljava/lang/Object;", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->size:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ElementAndSize.class, Object.class), ElementAndSize.class, "element;size", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->element:Ljava/lang/Object;", "FIELD:Lio/airlift/concurrent/DynamicSizeBoundQueue$ElementAndSize;->size:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public T element() {
            return this.element;
        }

        public long size() {
            return this.size;
        }
    }

    public DynamicSizeBoundQueue(long j, ToLongFunction<T> toLongFunction) {
        this(j, toLongFunction, Ticker.systemTicker());
    }

    public DynamicSizeBoundQueue(long j, ToLongFunction<T> toLongFunction, Ticker ticker) {
        this.size = new AtomicLong();
        this.queue = new ConcurrentLinkedQueue();
        this.enqueueFuture = new AtomicReference<>();
        this.dequeueFuture = new AtomicReference<>();
        Preconditions.checkArgument(j > 0, "maxSize must be positive");
        this.maxSize = j;
        this.elementSizeFunction = (ToLongFunction) Objects.requireNonNull(toLongFunction, "elementSizeFunction is null");
        this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker is null");
    }

    public long getMaxSize() {
        return this.maxSize;
    }

    public long getSize() {
        return this.size.get();
    }

    public boolean offer(T t) {
        return offer(t, this.elementSizeFunction.applyAsLong(t));
    }

    private boolean offer(T t, long j) {
        Objects.requireNonNull(t, "element is null");
        Preconditions.checkArgument(j > 0, "element size must be positive");
        if (!tryAcquireSizeReservation(j)) {
            return false;
        }
        this.queue.add(new ElementAndSize<>(t, j));
        notifyIfNecessary(this.enqueueFuture);
        return true;
    }

    private boolean tryAcquireSizeReservation(long j) {
        if (this.size.get() >= this.maxSize) {
            return false;
        }
        try {
            if (getAndAddOverflowChecked(this.size, j) < this.maxSize) {
                return true;
            }
            Verify.verify(this.size.addAndGet(-j) >= 0);
            return false;
        } catch (ArithmeticException e) {
            return false;
        }
    }

    private static long getAndAddOverflowChecked(AtomicLong atomicLong, long j) {
        return atomicLong.getAndAccumulate(j, Math::addExact);
    }

    public boolean offer(T t, long j, TimeUnit timeUnit) throws InterruptedException {
        long applyAsLong = this.elementSizeFunction.applyAsLong(t);
        long nanos = timeUnit.toNanos(j);
        while (true) {
            long j2 = nanos;
            if (offer(t, applyAsLong)) {
                return true;
            }
            ListenableFuture<Void> orCreateFuture = getOrCreateFuture(this.dequeueFuture);
            if (offer(t, applyAsLong)) {
                return true;
            }
            long read = this.ticker.read();
            if (j2 <= 0 || !awaitDequeueFuture(orCreateFuture, j2, TimeUnit.NANOSECONDS)) {
                return false;
            }
            nanos = j2 - (this.ticker.read() - read);
        }
    }

    public void put(T t) throws InterruptedException {
        long applyAsLong = this.elementSizeFunction.applyAsLong(t);
        while (!offer(t, applyAsLong)) {
            ListenableFuture<Void> orCreateFuture = getOrCreateFuture(this.dequeueFuture);
            if (offer(t, applyAsLong)) {
                return;
            } else {
                awaitDequeueFuture(orCreateFuture);
            }
        }
    }

    public Optional<ListenableFuture<Void>> offerWithBackoff(T t) {
        long applyAsLong = this.elementSizeFunction.applyAsLong(t);
        if (offer(t, applyAsLong)) {
            return Optional.empty();
        }
        return offer(t, applyAsLong) ? Optional.empty() : Optional.of(Futures.nonCancellationPropagating(getOrCreateFuture(this.dequeueFuture)));
    }

    public void forcePut(T t) {
        long applyAsLong = this.elementSizeFunction.applyAsLong(t);
        Preconditions.checkArgument(applyAsLong > 0, "element size must be positive");
        try {
            getAndAddOverflowChecked(this.size, applyAsLong);
            this.queue.add(new ElementAndSize<>(t, applyAsLong));
            notifyIfNecessary(this.enqueueFuture);
        } catch (ArithmeticException e) {
            throw new IllegalStateException("Forced element triggered queue size numeric overflow");
        }
    }

    @Nullable
    public T poll() {
        ElementAndSize<T> poll = this.queue.poll();
        if (poll == null) {
            return null;
        }
        Verify.verify(this.size.addAndGet(-poll.size()) >= 0);
        notifyIfNecessary(this.dequeueFuture);
        return poll.element();
    }

    public T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        long nanos = timeUnit.toNanos(j);
        while (true) {
            long j2 = nanos;
            T poll = poll();
            if (poll != null) {
                return poll;
            }
            ListenableFuture<Void> orCreateFuture = getOrCreateFuture(this.enqueueFuture);
            T poll2 = poll();
            if (poll2 != null) {
                return poll2;
            }
            long read = this.ticker.read();
            if (j2 <= 0 || !awaitEnqueueFuture(orCreateFuture, j2, TimeUnit.NANOSECONDS)) {
                return null;
            }
            nanos = j2 - (this.ticker.read() - read);
        }
    }

    public T take() throws InterruptedException {
        while (true) {
            T poll = poll();
            if (poll != null) {
                return poll;
            }
            ListenableFuture<Void> orCreateFuture = getOrCreateFuture(this.enqueueFuture);
            T poll2 = poll();
            if (poll2 != null) {
                return poll2;
            }
            awaitEnqueueFuture(orCreateFuture);
        }
    }

    private static ListenableFuture<Void> getOrCreateFuture(AtomicReference<SettableFuture<Void>> atomicReference) {
        return atomicReference.updateAndGet(settableFuture -> {
            return (SettableFuture) Objects.requireNonNullElseGet(settableFuture, SettableFuture::create);
        });
    }

    private static void notifyIfNecessary(AtomicReference<SettableFuture<Void>> atomicReference) {
        SettableFuture<Void> andSet = atomicReference.getAndSet(null);
        if (andSet != null) {
            andSet.set((Object) null);
        }
    }

    @VisibleForTesting
    void preEnqueueAwaitHook() {
    }

    @VisibleForTesting
    void preDequeueAwaitHook() {
    }

    private void awaitDequeueFuture(Future<?> future) throws InterruptedException {
        preDequeueAwaitHook();
        awaitFutureUnchecked(future);
    }

    private boolean awaitDequeueFuture(Future<?> future, long j, TimeUnit timeUnit) throws InterruptedException {
        preDequeueAwaitHook();
        return awaitFutureUnchecked(future, j, timeUnit);
    }

    private void awaitEnqueueFuture(Future<?> future) throws InterruptedException {
        preEnqueueAwaitHook();
        awaitFutureUnchecked(future);
    }

    private boolean awaitEnqueueFuture(Future<?> future, long j, TimeUnit timeUnit) throws InterruptedException {
        preEnqueueAwaitHook();
        return awaitFutureUnchecked(future, j, timeUnit);
    }

    private static void awaitFutureUnchecked(Future<?> future) throws InterruptedException {
        try {
            future.get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static boolean awaitFutureUnchecked(Future<?> future, long j, TimeUnit timeUnit) throws InterruptedException {
        try {
            future.get(j, timeUnit);
            return true;
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e2) {
            return false;
        }
    }
}
