/*
 * Decompiled with CFR 0.152.
 */
package io.aleph0.yap.core.transport.queue;

import io.aleph0.yap.core.build.QueueBuilder;
import io.aleph0.yap.core.transport.Channel;
import io.aleph0.yap.core.transport.Queue;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultQueue<T>
implements Queue<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultQueue.class);
    private final java.util.Queue<T> queue;
    private final List<Channel<T>> subscriptions;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = this.lock.newCondition();
    private final Condition notFull = this.lock.newCondition();
    private int numSubscribers;
    private int closedSubscribers = 0;
    private final AtomicLong produced = new AtomicLong(0L);
    private final AtomicLong stalls = new AtomicLong(0L);
    private final AtomicLong consumed = new AtomicLong(0L);
    private final AtomicLong waits = new AtomicLong(0L);

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public DefaultQueue(final int capacity, List<Channel<T>> subscriptions) {
        this.queue = new ArrayDeque<T>(capacity);
        this.subscriptions = Collections.unmodifiableList(subscriptions);
        this.numSubscribers = subscriptions.size();
        for (Channel<T> subscription : subscriptions) {
            subscription.bind(new Channel.Binding<T>(this){
                private boolean closed = false;
                final /* synthetic */ DefaultQueue this$0;
                {
                    this.this$0 = this$0;
                }

                @Override
                public boolean tryPublish(T message) {
                    boolean result;
                    this.this$0.lock.lock();
                    try {
                        if (this.closed) {
                            throw new IllegalStateException("closed");
                        }
                        result = this.this$0.queue.size() < capacity ? this.this$0.queue.offer(message) : false;
                        if (result) {
                            this.this$0.produced.incrementAndGet();
                            this.this$0.notEmpty.signalAll();
                        }
                    }
                    finally {
                        this.this$0.lock.unlock();
                    }
                    LOGGER.atDebug().addKeyValue("message", message).addKeyValue("result", (Object)result).log("tryPublish");
                    return result;
                }

                @Override
                public void publish(T message) throws InterruptedException {
                    block8: {
                        this.this$0.lock.lock();
                        try {
                            if (this.closed) {
                                throw new IllegalStateException("closed");
                            }
                            boolean delivered = this.this$0.queue.size() < capacity ? this.this$0.queue.offer(message) : false;
                            if (!delivered) {
                                this.this$0.stalls.incrementAndGet();
                                do {
                                    if (this.this$0.queue.size() < capacity) {
                                        delivered = this.this$0.queue.offer(message);
                                        continue;
                                    }
                                    this.this$0.notFull.await();
                                } while (!delivered && !this.closed);
                            }
                            if (delivered) {
                                this.this$0.produced.incrementAndGet();
                                this.this$0.notEmpty.signalAll();
                                break block8;
                            }
                            throw new IllegalStateException("closed");
                        }
                        finally {
                            this.this$0.lock.unlock();
                        }
                    }
                    LOGGER.atDebug().addKeyValue("message", message).log("publish");
                }

                @Override
                public void close() {
                    this.this$0.lock.lock();
                    try {
                        if (!this.closed) {
                            this.closed = true;
                            ++this.this$0.closedSubscribers;
                            this.this$0.notEmpty.signalAll();
                            this.this$0.notFull.signalAll();
                        }
                    }
                    finally {
                        this.this$0.lock.unlock();
                    }
                }
            });
        }
    }

    @Override
    public T tryReceive() {
        T result;
        this.lock.lock();
        try {
            result = this.queue.poll();
            if (result != null) {
                this.consumed.incrementAndGet();
                this.notFull.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
        LOGGER.atDebug().addKeyValue("result", result).addKeyValue("closedSubscribers", (Object)this.closedSubscribers).addKeyValue("numSubscribers", (Object)this.numSubscribers).log("tryReceive");
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public T receive(Duration timeout) throws InterruptedException, TimeoutException {
        T result;
        long expiration = System.nanoTime() + timeout.toNanos();
        this.lock.lock();
        try {
            result = this.queue.poll();
            if (result == null && this.closedSubscribers != this.numSubscribers) {
                this.waits.incrementAndGet();
                long remaining = expiration - System.nanoTime();
                if (remaining <= 0L) {
                    throw new TimeoutException();
                }
                do {
                    if (result == null) {
                        remaining = this.notEmpty.awaitNanos(remaining);
                    }
                    if (remaining <= 0L) continue;
                    result = this.queue.poll();
                } while (result == null && this.closedSubscribers != this.numSubscribers && remaining > 0L);
                if (result == null && remaining <= 0L) {
                    throw new TimeoutException();
                }
            }
            if (result != null) {
                this.consumed.incrementAndGet();
                this.notFull.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
        LOGGER.atDebug().addKeyValue("result", result).addKeyValue("closedSubscribers", (Object)this.closedSubscribers).addKeyValue("numSubscribers", (Object)this.numSubscribers).log("receive");
        return result;
    }

    @Override
    public T receive() throws InterruptedException {
        T result;
        this.lock.lock();
        try {
            result = this.queue.poll();
            if (result == null && this.closedSubscribers != this.numSubscribers) {
                this.waits.incrementAndGet();
                do {
                    if (result != null) continue;
                    this.notEmpty.await();
                } while ((result = this.queue.poll()) == null && this.closedSubscribers != this.numSubscribers);
            }
            if (result != null) {
                this.consumed.incrementAndGet();
                this.notFull.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
        LOGGER.atDebug().addKeyValue("result", result).addKeyValue("closedSubscribers", (Object)this.closedSubscribers).addKeyValue("numSubscribers", (Object)this.numSubscribers).log("receive");
        return result;
    }

    @Override
    public boolean isDrained() {
        this.lock.lock();
        try {
            boolean bl = this.queue.isEmpty() && this.closedSubscribers == this.subscriptions.size();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public Queue.Metrics checkMetrics() {
        long pending = this.queue.size();
        long produced = this.produced.get();
        long stalls = this.stalls.get();
        long consumed = this.consumed.get();
        long waits = this.waits.get();
        return new Queue.Metrics(pending, produced, stalls, consumed, waits);
    }

    @Override
    public Queue.Metrics flushMetrics() {
        Queue.Metrics metrics = this.checkMetrics();
        this.produced.set(0L);
        this.stalls.set(0L);
        this.consumed.set(0L);
        this.waits.set(0L);
        return metrics;
    }

    public static class Builder<T>
    implements QueueBuilder<T> {
        private int capacity = 100;

        public Builder<T> setCapacity(int capacity) {
            this.capacity = capacity;
            return this;
        }

        @Override
        public Queue<T> build(List<Channel<T>> subscriptions) {
            return new DefaultQueue<T>(this.capacity, subscriptions);
        }
    }

    public record Metrics(int depth, long sent, long blocks, long received, long waits) {
        public Metrics {
            if (depth < 0) {
                throw new IllegalArgumentException("depth must be greater than or equal to 0");
            }
            if (sent < 0L) {
                throw new IllegalArgumentException("sent must be greater than or equal to 0");
            }
            if (blocks < 0L) {
                throw new IllegalArgumentException("blocks must be greater than or equal to 0");
            }
            if (received < 0L) {
                throw new IllegalArgumentException("received must be greater than or equal to 0");
            }
            if (waits < 0L) {
                throw new IllegalArgumentException("waits must be greater than or equal to 0");
            }
        }
    }
}

