/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron;

import io.aeron.Publication;
import io.aeron.logbuffer.BufferClaim;
import java.nio.ByteBuffer;
import java.time.Duration;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue;
import org.agrona.concurrent.UnsafeBuffer;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronUtils;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.aeron.Protocol;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.util.Logger;
import reactor.util.Loggers;

public final class MessagePublication
implements OnDisposable,
AutoCloseable {
    private static final Logger logger = Loggers.getLogger(MessagePublication.class);
    private final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
    private final String category;
    private final int mtuLength;
    private final Publication publication;
    private final AeronOptions options;
    private final AeronEventLoop eventLoop;
    private final ManyToOneConcurrentLinkedQueue<PublishTask> publishTasks = new ManyToOneConcurrentLinkedQueue();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    MessagePublication(String category, int mtuLength, Publication publication, AeronOptions options, AeronEventLoop eventLoop) {
        this.category = category;
        this.mtuLength = mtuLength;
        this.publication = publication;
        this.options = options;
        this.eventLoop = eventLoop;
    }

    public Mono<Void> enqueue(MessageType messageType, ByteBuffer buffer, long sessionId) {
        return Mono.create(sink -> {
            boolean result = false;
            if (!this.isDisposed()) {
                result = this.publishTasks.offer((Object)new PublishTask(messageType, buffer, sessionId, (MonoSink)sink));
            }
            if (!result) {
                sink.error((Throwable)Exceptions.failWithRejected());
            }
        });
    }

    public int proceed() {
        PublishTask task = (PublishTask)this.publishTasks.peek();
        if (task == null) {
            return 0;
        }
        long result = task.run();
        if (result > 0L) {
            this.publishTasks.poll();
            task.success();
            return 1;
        }
        if (result == -4L) {
            logger.warn("[{}] Publication CLOSED: {}", new Object[]{this.category, this.toString()});
            this.dispose();
            return 0;
        }
        RuntimeException ex = null;
        if (result == -1L && task.isTimeoutElapsed(this.options.connectTimeout())) {
            logger.warn("[{}] Publication NOT_CONNECTED: {} during {} millis", new Object[]{this.category, this.toString(), this.options.connectTimeout()});
            ex = new RuntimeException("Failed to connect within timeout");
        }
        if (result == -2L && task.isTimeoutElapsed(this.options.backpressureTimeout())) {
            logger.warn("[{}] Publication BACK_PRESSURED during {}: {}", new Object[]{this.category, this.toString(), this.options.backpressureTimeout()});
            ex = new RuntimeException("Failed to resolve backpressure within timeout");
        }
        if (result == -3L && task.isTimeoutElapsed(this.options.connectTimeout())) {
            logger.warn("[{}] Publication ADMIN_ACTION: {} during {} millis", new Object[]{this.category, this.toString(), this.options.connectTimeout()});
            ex = new RuntimeException("Failed to resolve admin_action within timeout");
        }
        if (ex != null) {
            this.publishTasks.poll();
            task.error(ex);
        }
        return 0;
    }

    public String toString() {
        return AeronUtils.format(this.publication);
    }

    @Override
    public void close() {
        if (!this.eventLoop.inEventLoop()) {
            throw new IllegalStateException("Can only close aeron publication from within event loop");
        }
        try {
            this.publication.close();
        }
        finally {
            this.disposePublishTasks();
            this.onDispose.onComplete();
        }
    }

    public void dispose() {
        this.eventLoop.dispose(this).subscribe(null, th -> {});
    }

    public boolean isDisposed() {
        return this.publication.isClosed();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    public boolean isConnected() {
        return this.publication.isConnected();
    }

    private void disposePublishTasks() {
        PublishTask task;
        while ((task = (PublishTask)this.publishTasks.poll()) != null) {
            task.error(new RuntimeException("Publication closed"));
        }
    }

    private class PublishTask {
        private final MessageType msgType;
        private final ByteBuffer msgBody;
        private final long sessionId;
        private final MonoSink<Void> sink;
        private final long start = System.currentTimeMillis();

        private PublishTask(MessageType msgType, ByteBuffer msgBody, long sessionId, MonoSink<Void> sink) {
            this.msgType = msgType;
            this.msgBody = msgBody;
            this.sessionId = sessionId;
            this.sink = sink;
        }

        private long run() {
            int capacity = this.msgBody.remaining() + 9;
            if (capacity < MessagePublication.this.mtuLength) {
                BufferClaim bufferClaim = (BufferClaim)MessagePublication.this.bufferClaims.get();
                long result = MessagePublication.this.publication.tryClaim(capacity, bufferClaim);
                if (result > 0L) {
                    try {
                        MutableDirectBuffer buffer = bufferClaim.buffer();
                        int index = bufferClaim.offset();
                        index = Protocol.putHeader(buffer, index, this.msgType, this.sessionId);
                        buffer.putBytes(index, this.msgBody, this.msgBody.position(), this.msgBody.limit());
                        bufferClaim.commit();
                    }
                    catch (Exception ex) {
                        bufferClaim.abort();
                        throw new RuntimeException("Unexpected exception", ex);
                    }
                }
                return result;
            }
            UnsafeBuffer buffer = new UnsafeBuffer(new byte[this.msgBody.remaining() + 9]);
            int index = Protocol.putHeader((MutableDirectBuffer)buffer, 0, this.msgType, this.sessionId);
            buffer.putBytes(index, this.msgBody, this.msgBody.remaining());
            return MessagePublication.this.publication.offer((DirectBuffer)buffer);
        }

        private boolean isTimeoutElapsed(Duration timeout) {
            return System.currentTimeMillis() - this.start > timeout.toMillis();
        }

        private void success() {
            this.sink.success();
        }

        private void error(Throwable ex) {
            this.sink.error(ex);
        }
    }
}

