/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket.server;

import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.NettyContextCloseable;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.rsocket.server.RSocketServiceAcceptor;
import io.scalecube.services.transport.server.api.ServerTransport;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.options.NettyOptions;
import reactor.ipc.netty.options.ServerOptions;
import reactor.ipc.netty.tcp.TcpServer;

public class RSocketServerTransport
implements ServerTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class);
    private final ServiceMessageCodec codec;
    private NettyContextCloseable server;
    private List<NettyContext> channels = new CopyOnWriteArrayList<NettyContext>();

    public RSocketServerTransport(ServiceMessageCodec codec) {
        this.codec = codec;
    }

    @Override
    public InetSocketAddress bindAwait(InetSocketAddress address, ServiceMethodRegistry methodRegistry) {
        TcpServer tcpServer = TcpServer.create(options -> {
            ServerOptions.Builder cfr_ignored_0 = (ServerOptions.Builder)((NettyOptions.Builder)options.listenAddress(address)).afterNettyContextInit(nettyContext -> {
                LOGGER.info("Accepted connection on {}", (Object)nettyContext.channel());
                nettyContext.onClose(() -> {
                    LOGGER.info("Connection closed on {}", (Object)nettyContext.channel());
                    this.channels.remove(nettyContext);
                });
                this.channels.add((NettyContext)nettyContext);
            });
        });
        this.server = RSocketFactory.receive().frameDecoder(frame -> ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain())).acceptor(new RSocketServiceAcceptor(this.codec, methodRegistry)).transport(TcpServerTransport.create(tcpServer)).start().block();
        return this.server.address();
    }

    @Override
    public Mono<Void> stop() {
        if (this.server != null) {
            this.server.dispose();
            List onCloseList = this.channels.stream().map(nettyContext -> {
                nettyContext.dispose();
                return nettyContext.onClose();
            }).collect(Collectors.toList());
            return this.server.onClose().then(Mono.when(onCloseList));
        }
        return Mono.empty();
    }
}

