/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.rsocket;

import io.netty.handler.codec.http.HttpHeaders;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.util.EmptyPayload;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class RSocketGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketGatewayClient.class);
    private static final AtomicReferenceFieldUpdater<RSocketGatewayClient, Mono> rsocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(RSocketGatewayClient.class, Mono.class, "rsocketMono");
    private final GatewayClientSettings settings;
    private final GatewayClientCodec<Payload> codec;
    private final LoopResources loopResources;
    private final boolean ownsLoopResources;
    private final Sinks.One<Void> close = Sinks.one();
    private final Sinks.One<Void> onClose = Sinks.one();
    private volatile Mono<?> rsocketMono;

    public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Payload> codec) {
        this(settings, codec, LoopResources.create((String)"rsocket-gateway-client"), true);
    }

    public RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Payload> codec, LoopResources loopResources) {
        this(settings, codec, loopResources, false);
    }

    private RSocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<Payload> codec, LoopResources loopResources, boolean ownsLoopResources) {
        this.settings = settings;
        this.codec = codec;
        this.loopResources = loopResources;
        this.ownsLoopResources = ownsLoopResources;
        this.close.asMono().then(this.doClose()).doFinally(s -> this.onClose.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).doOnTerminate(() -> LOGGER.info("Closed RSocketGatewayClient resources")).subscribe(null, ex -> LOGGER.warn("Exception occurred on RSocketGatewayClient close: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return this.getOrConnect().flatMap(rsocket -> rsocket.requestResponse(this.toPayload(request)).doOnSubscribe(s -> LOGGER.debug("Sending request {}", (Object)request))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return this.getOrConnect().flatMapMany(rsocket -> rsocket.requestStream(this.toPayload(request)).doOnSubscribe(s -> LOGGER.debug("Sending request {}", (Object)request))).map(this::toMessage);
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return this.getOrConnect().flatMapMany(rsocket -> rsocket.requestChannel((Publisher)requests.doOnNext(r -> LOGGER.debug("Sending request {}", r)).map(this::toPayload))).map(this::toMessage);
    }

    @Override
    public void close() {
        this.close.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    private Mono<Void> doClose() {
        return this.ownsLoopResources ? Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater()) : Mono.empty();
    }

    private Mono<RSocket> getOrConnect() {
        return Mono.defer(() -> rsocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
    }

    private Mono<RSocket> getOrConnect0(Mono prev) {
        if (prev != null) {
            return prev;
        }
        EmptyPayload setupPayload = EmptyPayload.INSTANCE;
        if (!this.settings.headers().isEmpty()) {
            setupPayload = this.codec.encode(ServiceMessage.builder().headers(this.settings.headers()).build());
        }
        return RSocketConnector.create().payloadDecoder(PayloadDecoder.DEFAULT).setupPayload((Payload)setupPayload).metadataMimeType(this.settings.contentType()).connect((ClientTransport)this.createClientTransport(this.settings)).doOnSuccess(rsocket -> {
            LOGGER.info("Connected successfully on {}:{}", (Object)this.settings.host(), (Object)this.settings.port());
            rsocket.onClose().doOnTerminate(() -> {
                rsocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Connection closed on {}:{}", (Object)this.settings.host(), (Object)this.settings.port());
            }).subscribe(null, th -> LOGGER.warn("Exception on closing rsocket: {}", (Object)th.toString()));
        }).doOnError(ex -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), this.settings.port(), ex.toString()});
            rsocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private WebsocketClientTransport createClientTransport(GatewayClientSettings settings) {
        HttpClient httpClient = ((HttpClient)((HttpClient)HttpClient.create((ConnectionProvider)ConnectionProvider.newConnection()).headers(headers -> settings.headers().forEach((arg_0, arg_1) -> ((HttpHeaders)headers).add(arg_0, arg_1))).followRedirect(settings.followRedirect()).wiretap(settings.wiretap()).runOn(this.loopResources)).host(settings.host())).port(settings.port());
        if (settings.sslProvider() != null) {
            httpClient = httpClient.secure(settings.sslProvider());
        }
        return WebsocketClientTransport.create((HttpClient)httpClient, (String)"/");
    }

    private Payload toPayload(ServiceMessage message) {
        return this.codec.encode(message);
    }

    private ServiceMessage toMessage(Payload payload) {
        ServiceMessage message = this.codec.decode(payload);
        LOGGER.debug("Received response {}", (Object)message);
        return message;
    }
}

