/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron.server;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronResources;
import reactor.aeron.ByteBufferFlux;
import reactor.aeron.DataMessageSubscriber;
import reactor.aeron.InnerPoller;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.TopicProcessor;

final class AeronServerInbound
implements AeronInbound,
OnDisposable {
    private final String name;
    private final AeronResources resources;
    private final TopicProcessor<ByteBuffer> processor;
    private final ByteBufferFlux flux;
    private volatile InnerPoller subscription;

    AeronServerInbound(String name, AeronResources resources) {
        this.name = name;
        this.resources = resources;
        this.processor = TopicProcessor.builder().name(name).build();
        this.flux = new ByteBufferFlux((Publisher<ByteBuffer>)this.processor);
    }

    Mono<Void> start(String channel, int streamId, long sessionId, Runnable onCompleteHandler) {
        return Mono.defer(() -> {
            ServerDataMessageProcessor messageProcessor = new ServerDataMessageProcessor(this.name, sessionId, onCompleteHandler);
            AeronEventLoop eventLoop = this.resources.nextEventLoop();
            return this.resources.dataSubscription(this.name, channel, streamId, messageProcessor, eventLoop, null, image -> Optional.ofNullable(onCompleteHandler).ifPresent(Runnable::run)).doOnSuccess(result -> {
                this.subscription = result;
                messageProcessor.onSubscription(this.subscription);
                messageProcessor.subscribe((Subscriber<? super ByteBuffer>)this.processor);
            }).then().log("serverInbound");
        });
    }

    @Override
    public ByteBufferFlux receive() {
        return this.flux;
    }

    public void dispose() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
        this.processor.onComplete();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.subscription != null ? this.subscription.onDispose() : Mono.empty();
    }

    public boolean isDisposed() {
        return this.subscription != null && this.subscription.isDisposed();
    }

    private static class ServerDataMessageProcessor
    implements DataMessageSubscriber,
    Publisher<ByteBuffer> {
        private static final Logger logger = LoggerFactory.getLogger(ServerDataMessageProcessor.class);
        private final String category;
        private final long sessionId;
        private final Runnable onCompleteHandler;
        private volatile Subscription subscription;
        private volatile Subscriber<? super ByteBuffer> subscriber;

        private ServerDataMessageProcessor(String category, long sessionId, Runnable onCompleteHandler) {
            this.category = category;
            this.sessionId = sessionId;
            this.onCompleteHandler = onCompleteHandler;
        }

        @Override
        public void onSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override
        public void onNext(long sessionId, ByteBuffer buffer) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}, buffer: {}", new Object[]{this.category, MessageType.NEXT, sessionId, buffer});
            }
            if (this.sessionId == sessionId) {
                this.subscriber.onNext((Object)buffer);
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.NEXT, sessionId});
            }
        }

        @Override
        public void onComplete(long sessionId) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, sessionId});
            }
            if (this.sessionId == sessionId) {
                this.onCompleteHandler.run();
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, sessionId});
            }
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
            subscriber.onSubscribe(this.subscription);
        }
    }
}

