/*
 * Decompiled with CFR 0.152.
 */
package reactor.aeron.server;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.Connection;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.aeron.Protocol;
import reactor.aeron.server.AeronServerInbound;
import reactor.aeron.server.AeronServerSettings;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

final class AeronServerHandler
implements ControlMessageSubscriber,
OnDisposable {
    private static final Logger logger = Loggers.getLogger(AeronServerHandler.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger(1000);
    private final String category;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super Connection, ? extends Publisher<Void>> handler;
    private final String serverChannel;
    private final AtomicLong nextSessionId = new AtomicLong(0L);
    private final List<SessionHandler> handlers = new CopyOnWriteArrayList<SessionHandler>();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    AeronServerHandler(AeronServerSettings settings) {
        this.category = settings.name();
        this.options = settings.options();
        this.resources = settings.aeronResources();
        this.handler = settings.handler();
        this.serverChannel = this.options.serverChannel();
        this.dispose.then(this.doDispose()).doFinally(s -> this.onDispose.onComplete()).subscribe(null, th -> logger.warn("AeronServerHandler disposed with error: {}", th));
    }

    @Override
    public void onSubscription(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onConnect(UUID connectRequestId, String clientChannel, int clientControlStreamId, int clientSessionStreamId) {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Received {} for connectRequestId: {}, channel={}, clientControlStreamId={}, clientSessionStreamId={}", new Object[]{this.category, MessageType.CONNECT, connectRequestId, AeronUtils.minifyChannel(clientChannel), clientControlStreamId, clientSessionStreamId});
        }
        int serverSessionStreamId = streamIdCounter.incrementAndGet();
        long sessionId = this.nextSessionId.incrementAndGet();
        SessionHandler sessionHandler = new SessionHandler(clientChannel, clientSessionStreamId, clientControlStreamId, connectRequestId, sessionId, serverSessionStreamId);
        sessionHandler.start().subscribeOn(Schedulers.single()).subscribe(connection -> this.handler.apply((Connection)connection).subscribe(connection.disposeSubscriber()), th -> logger.error("[{}] Occurred exception on connect to {}, sessionId: {}, connectRequestId: {}, clientSessionStreamId: {}, clientControlStreamId: {}, serverSessionStreamId: {}, error: ", new Object[]{this.category, clientChannel, sessionId, connectRequestId, clientSessionStreamId, clientControlStreamId, serverSessionStreamId, th}));
    }

    @Override
    public void onConnectAck(UUID connectRequestId, long sessionId, int serverSessionStreamId) {
        logger.error("[{}] Received unsupported server request {}, connectRequestId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, connectRequestId});
    }

    @Override
    public void onComplete(long sessionId) {
        logger.info("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, sessionId});
        this.handlers.stream().filter(handler -> ((SessionHandler)handler).sessionId == sessionId).findFirst().ifPresent(SessionHandler::dispose);
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> Mono.whenDelayError((Iterable)this.handlers.stream().map(sessionHandler -> {
            sessionHandler.dispose();
            return sessionHandler.onDispose();
        }).collect(Collectors.toList())));
    }

    private class SessionHandler
    implements Connection {
        private final Logger logger = Loggers.getLogger(SessionHandler.class);
        private final DefaultAeronOutbound outbound;
        private final AeronServerInbound inbound;
        private final String clientChannel;
        private final int clientSessionStreamId;
        private final int serverSessionStreamId;
        private final UUID connectRequestId;
        private final long sessionId;
        private final Mono<MessagePublication> controlPublication;
        private final MonoProcessor<Void> dispose = MonoProcessor.create();
        private final MonoProcessor<Void> onDispose = MonoProcessor.create();

        private SessionHandler(String clientChannel, int clientSessionStreamId, int clientControlStreamId, UUID connectRequestId, long sessionId, int serverSessionStreamId) {
            this.clientSessionStreamId = clientSessionStreamId;
            this.clientChannel = clientChannel;
            this.outbound = new DefaultAeronOutbound(AeronServerHandler.this.category, clientChannel, AeronServerHandler.this.resources, AeronServerHandler.this.options);
            this.connectRequestId = connectRequestId;
            this.sessionId = sessionId;
            this.serverSessionStreamId = serverSessionStreamId;
            this.inbound = new AeronServerInbound(AeronServerHandler.this.category, AeronServerHandler.this.resources);
            this.controlPublication = Mono.defer(() -> this.newControlPublication(clientChannel, clientControlStreamId)).cache();
            this.dispose.then(this.doDispose()).doFinally(s -> this.onDispose.onComplete()).subscribe(null, th -> this.logger.warn("SessionHandler disposed with error: {}", th));
        }

        private Mono<MessagePublication> newControlPublication(String clientChannel, int clientControlStreamId) {
            return AeronServerHandler.this.resources.messagePublication(AeronServerHandler.this.category, clientChannel, clientControlStreamId, AeronServerHandler.this.options, AeronServerHandler.this.resources.nextEventLoop());
        }

        private Mono<? extends Connection> start() {
            return this.connect().then(this.outbound.start(this.sessionId, this.clientSessionStreamId)).then(this.inbound.start(AeronServerHandler.this.serverChannel, this.serverSessionStreamId, this.sessionId, this::dispose)).thenReturn((Object)this).doOnSuccess(connection -> {
                AeronServerHandler.this.handlers.add(this);
                this.logger.debug("[{}] Client with connectRequestId: {} successfully connected, sessionId: {}", new Object[]{AeronServerHandler.this.category, this.connectRequestId, this.sessionId});
            }).doOnError(th -> {
                this.logger.debug("[{}] Failed to connect to the client for sessionId: {}", new Object[]{AeronServerHandler.this.category, this.sessionId, th});
                this.dispose();
            });
        }

        @Override
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override
        public AeronOutbound outbound() {
            return this.outbound;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ServerSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(this.clientChannel);
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append(", connectRequestId=").append(this.connectRequestId);
            sb.append('}');
            return sb.toString();
        }

        public void dispose() {
            this.dispose.onComplete();
        }

        public boolean isDisposed() {
            return this.onDispose.isDisposed();
        }

        @Override
        public Mono<Void> onDispose() {
            return this.onDispose;
        }

        private Mono<Void> doDispose() {
            return Mono.defer(() -> {
                this.logger.debug("[{}] About to close session with sessionId: {}", new Object[]{AeronServerHandler.this.category, this.sessionId});
                AeronServerHandler.this.handlers.remove(this);
                Optional.ofNullable(this.outbound).ifPresent(DefaultAeronOutbound::dispose);
                Optional.ofNullable(this.inbound).ifPresent(AeronServerInbound::dispose);
                return Mono.whenDelayError((Publisher[])new Publisher[]{(Publisher)Optional.ofNullable(this.outbound).map(DefaultAeronOutbound::onDispose).orElse(Mono.empty()), (Publisher)Optional.ofNullable(this.inbound).map(AeronServerInbound::onDispose).orElse(Mono.empty())}).doFinally(s -> this.logger.debug("[{}] Closed session with sessionId: {}", new Object[]{AeronServerHandler.this.category, this.sessionId}));
            });
        }

        private Mono<Void> connect() {
            return Mono.defer(() -> {
                Duration retryInterval = Duration.ofMillis(100L);
                Duration connectTimeout = AeronServerHandler.this.options.connectTimeout().plus(AeronServerHandler.this.options.backpressureTimeout());
                long retryCount = connectTimeout.toMillis() / retryInterval.toMillis();
                return this.controlPublication.flatMap(publication -> this.sendConnectAck((MessagePublication)publication).retryBackoff(retryCount, retryInterval, retryInterval).timeout(connectTimeout).then().doOnSuccess(avoid -> this.logger.debug("[{}] Sent {} to {}", new Object[]{AeronServerHandler.this.category, MessageType.CONNECT_ACK, publication})).onErrorResume(throwable -> {
                    String errMessage = String.format("Failed to send %s, publication %s is not connected", new Object[]{MessageType.CONNECT_ACK, publication});
                    return Mono.error((Throwable)new RuntimeException(errMessage, (Throwable)throwable));
                }));
            });
        }

        private Mono<Void> sendConnectAck(MessagePublication publication) {
            ByteBuffer buffer = Protocol.createConnectAckBody(this.connectRequestId, this.serverSessionStreamId);
            return publication.enqueue(MessageType.CONNECT_ACK, buffer, this.sessionId);
        }
    }
}

