/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket.client;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.codec.ServiceMessageCodec;
import io.scalecube.services.transport.client.api.ClientChannel;
import io.scalecube.services.transport.client.api.ClientTransport;
import io.scalecube.services.transport.rsocket.client.RSocketServiceClientAdapter;
import io.scalecube.transport.Address;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.tcp.TcpClient;

public class RSocketClientTransport
implements ClientTransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketClientTransport.class);
    private final ThreadLocal<Map<Address, Mono<RSocket>>> rSockets = ThreadLocal.withInitial(ConcurrentHashMap::new);
    private final ServiceMessageCodec codec;

    public RSocketClientTransport(ServiceMessageCodec codec) {
        this.codec = codec;
    }

    @Override
    public ClientChannel create(Address address) {
        Map<Address, Mono<RSocket>> monoMap = this.rSockets.get();
        Mono rSocket = monoMap.computeIfAbsent(address, address1 -> RSocketClientTransport.connect(address1, monoMap));
        return new RSocketServiceClientAdapter(rSocket, this.codec);
    }

    private static Mono<RSocket> connect(Address address, Map<Address, Mono<RSocket>> monoMap) {
        TcpClient tcpClient = TcpClient.create(options -> ((ClientOptions.Builder)((ClientOptions.Builder)options.disablePool()).host(address.host())).port(address.port()));
        TcpClientTransport tcpClientTransport = TcpClientTransport.create(tcpClient);
        Mono<RSocket> rSocketMono = RSocketFactory.connect().frameDecoder(frame -> ByteBufPayload.create(frame.sliceData().retain(), frame.sliceMetadata().retain())).transport(tcpClientTransport).start();
        return rSocketMono.doOnSuccess(rSocket -> {
            LOGGER.info("Connected successfully on {}", (Object)address);
            rSocket.onClose().doOnTerminate(() -> {
                monoMap.remove(address);
                LOGGER.info("Connection closed on {} and removed from the pool", (Object)address);
            }).subscribe();
        }).doOnError(throwable -> {
            LOGGER.warn("Connect failed on {}, cause: {}", (Object)address, throwable);
            monoMap.remove(address);
        }).cache();
    }
}

