/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty.client;

import io.rsocket.DuplexConnection;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.RSocketLengthCodec;
import io.rsocket.transport.netty.TcpDuplexConnection;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;

public final class TcpClientTransport
implements ClientTransport {
    private final TcpClient client;

    private TcpClientTransport(TcpClient client) {
        this.client = client;
    }

    public static TcpClientTransport create(int port) {
        TcpClient tcpClient = TcpClient.create(port);
        return TcpClientTransport.create(tcpClient);
    }

    public static TcpClientTransport create(String bindAddress, int port) {
        Objects.requireNonNull(bindAddress, "bindAddress must not be null");
        TcpClient tcpClient = TcpClient.create(bindAddress, port);
        return TcpClientTransport.create(tcpClient);
    }

    public static TcpClientTransport create(InetSocketAddress address) {
        Objects.requireNonNull(address, "address must not be null");
        TcpClient tcpClient = TcpClient.create(address.getHostString(), address.getPort());
        return TcpClientTransport.create(tcpClient);
    }

    public static TcpClientTransport create(TcpClient client) {
        Objects.requireNonNull(client, "client must not be null");
        return new TcpClientTransport(client);
    }

    @Override
    public Mono<DuplexConnection> connect() {
        return Mono.create((MonoSink<T> sink) -> this.client.newHandler((BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>)((BiFunction<NettyInbound, NettyOutbound, Publisher>)(in, out) -> {
            in.context().addHandler(new RSocketLengthCodec());
            TcpDuplexConnection connection = new TcpDuplexConnection((NettyInbound)in, (NettyOutbound)out, in.context());
            sink.success(connection);
            return connection.onClose();
        })).doOnError(sink::error).subscribe());
    }
}

