/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.netty.channel.EventLoopGroup;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.rsocket.RSocketServiceClientAdapter;
import io.scalecube.transport.Address;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.options.NettyOptions;
import reactor.ipc.netty.tcp.TcpClient;

public class RSocketClientTransport
implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ThreadLocal<Map<Address, Mono<RSocket>>> rsockets = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final ServiceMessageCodec codec;
    private final EventLoopGroup eventLoopGroup;

    public RSocketClientTransport(ServiceMessageCodec codec, EventLoopGroup eventLoopGroup) {
        this.codec = codec;
        this.eventLoopGroup = eventLoopGroup;
    }

    @Override
    public ClientChannel create(Address address) {
        Map<Address, Mono<RSocket>> monoMap = this.rsockets.get();
        Mono rsocket = monoMap.computeIfAbsent(address, address1 -> this.connect((Address)address1, monoMap));
        return new RSocketServiceClientAdapter(rsocket, this.codec);
    }

    private Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoMap) {
        TcpClient tcpClient = TcpClient.create(options -> ((ClientOptions.Builder)((ClientOptions.Builder)((NettyOptions.Builder)options.disablePool()).eventLoopGroup(this.eventLoopGroup)).host(address.host())).port(address.port()));
        TcpClientTransport tcpClientTransport = TcpClientTransport.create(tcpClient);
        Mono<RSocket> rsocketMono = RSocketFactory.connect().frameDecoder(frame -> ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain())).transport(tcpClientTransport).start();
        return rsocketMono.doOnSuccess(rsocket -> {
            LOGGER.info("Connected successfully on {}", (Object)address);
            rsocket.onClose().doOnTerminate(() -> {
                monoMap.remove(address);
                LOGGER.info("Connection closed on {} and removed from the pool", (Object)address);
            }).subscribe();
        }).doOnError(throwable -> {
            LOGGER.warn("Connect failed on {}, cause: {}", (Object)address, throwable);
            monoMap.remove(address);
        }).cache();
    }
}

