/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

public final class RSocketGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGatewayClient.class);
    private static final AtomicReferenceFieldUpdater<RSocketGatewayClient, Mono> rSocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(RSocketGatewayClient.class, Mono.class, "rsocketMono");
    private final GatewayClientSettings settings;
    private final GatewayClientCodec<Payload> codec;
    private final LoopResources loopResources;
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private volatile Mono<?> rsocketMono;

    public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Payload> codec) {
        this.settings = settings;
        this.codec = codec;
        this.loopResources = LoopResources.create((String)"rsocket-gateway-client");
        this.close.then(this.doClose()).doFinally(s -> this.onClose.onComplete()).doOnTerminate(() -> LOGGER.info("Closed RSocketGatewayClient resources")).subscribe(null, ex -> LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return Mono.defer(() -> {
            Payload payload = this.toPayload(request);
            return this.getOrConnect().flatMap(rsocket -> rsocket.requestResponse(payload).onErrorMap(ClosedChannelException.class, e -> new ConnectionClosedException("Connection closed"))).map(this::toMessage);
        });
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return Flux.defer(() -> {
            Payload payload = this.toPayload(request);
            return this.getOrConnect().flatMapMany(rsocket -> rsocket.requestStream(payload).onErrorMap(ClosedChannelException.class, e -> new ConnectionClosedException("Connection closed"))).map(this::toMessage);
        });
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return Flux.defer(() -> {
            Flux reqPayloads = requests.map(this::toPayload);
            return this.getOrConnect().flatMapMany(rsocket -> rsocket.requestChannel((Publisher)reqPayloads).onErrorMap(ClosedChannelException.class, e -> new ConnectionClosedException("Connection closed"))).map(this::toMessage);
        });
    }

    @Override
    public void close() {
        this.close.onComplete();
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        return Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater());
    }

    public GatewayClientCodec<Payload> getCodec() {
        return this.codec;
    }

    private Mono<? extends Void> dispose(RSocket rsocket) {
        rsocket.dispose();
        return rsocket.onClose();
    }

    private Mono<RSocket> getOrConnect() {
        return Mono.defer(() -> rSocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
    }

    private Mono<RSocket> getOrConnect0(Mono prev) {
        if (prev != null) {
            return prev;
        }
        return RSocketFactory.connect().metadataMimeType(this.settings.contentType()).transport((ClientTransport)this.createRSocketTransport(this.settings)).start().doOnSuccess(rsocket -> {
            LOGGER.info("Connected successfully on {}:{}", (Object)this.settings.host(), (Object)this.settings.port());
            rsocket.onClose().doOnTerminate(() -> {
                rSocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Connection closed on {}:{}", (Object)this.settings.host(), (Object)this.settings.port());
            }).subscribe(null, th -> LOGGER.warn("Exception on closing rsocket: {}", (Object)th.toString()));
        }).doOnError(ex -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), this.settings.port(), ex});
            rSocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private WebsocketClientTransport createRSocketTransport(GatewayClientSettings settings) {
        String path = "/";
        HttpClient httpClient = HttpClient.newConnection().followRedirect(settings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (settings.sslProvider() != null) {
                tcpClient = tcpClient.secure(settings.sslProvider());
            }
            return tcpClient.runOn(this.loopResources).host(settings.host()).port(settings.port());
        });
        return WebsocketClientTransport.create((HttpClient)httpClient, (String)path);
    }

    private Payload toPayload(ServiceMessage message) {
        return this.codec.encode(message);
    }

    private ServiceMessage toMessage(Payload payload) {
        return this.codec.decode(payload);
    }
}

