/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClientSession;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class WebsocketGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClient.class);
    private static final String STREAM_ID = "sid";
    private static final AtomicReferenceFieldUpdater<WebsocketGatewayClient, Mono> websocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(WebsocketGatewayClient.class, Mono.class, "websocketMono");
    private final AtomicLong sidCounter = new AtomicLong();
    private final GatewayClientCodec<ByteBuf> codec;
    private final GatewayClientSettings settings;
    private final HttpClient httpClient;
    private final LoopResources loopResources;
    private final boolean ownsLoopResources;
    private final Sinks.One<Void> close = Sinks.one();
    private final Sinks.One<Void> onClose = Sinks.one();
    private volatile Mono<?> websocketMono;

    public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec) {
        this(settings, codec, LoopResources.create((String)"websocket-gateway-client"), true);
    }

    public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec, LoopResources loopResources) {
        this(settings, codec, loopResources, false);
    }

    private WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec, LoopResources loopResources, boolean ownsLoopResources) {
        this.settings = settings;
        this.codec = codec;
        this.loopResources = loopResources;
        this.ownsLoopResources = ownsLoopResources;
        HttpClient httpClient = ((HttpClient)((HttpClient)HttpClient.create((ConnectionProvider)ConnectionProvider.newConnection()).headers(headers -> settings.headers().forEach((arg_0, arg_1) -> ((HttpHeaders)headers).add(arg_0, arg_1))).followRedirect(settings.followRedirect()).wiretap(settings.wiretap()).runOn(loopResources)).host(settings.host())).port(settings.port());
        if (settings.sslProvider() != null) {
            httpClient = httpClient.secure(settings.sslProvider());
        }
        this.httpClient = httpClient;
        this.close.asMono().then(this.doClose()).doFinally(s -> this.onClose.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).doOnTerminate(() -> LOGGER.info("Closed client")).subscribe(null, ex -> LOGGER.warn("Failed to close client, cause: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return this.getOrConnect().flatMap(session -> {
            long sid = this.sidCounter.incrementAndGet();
            return session.send(this.encodeRequest(request, sid)).doOnSubscribe(s -> LOGGER.debug("Sending request {}", (Object)request)).then(session.newMonoProcessor(sid).asMono()).doOnCancel(() -> session.cancel(sid, request.qualifier())).doFinally(s -> session.removeProcessor(sid));
        });
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return this.getOrConnect().flatMapMany(session -> {
            long sid = this.sidCounter.incrementAndGet();
            return session.send(this.encodeRequest(request, sid)).doOnSubscribe(s -> LOGGER.debug("Sending request {}", (Object)request)).thenMany((Publisher)session.newUnicastProcessor(sid).asFlux()).doOnCancel(() -> session.cancel(sid, request.qualifier())).doFinally(s -> session.removeProcessor(sid));
        });
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return Flux.error((Throwable)new UnsupportedOperationException("requestChannel is not supported"));
    }

    @Override
    public void close() {
        this.close.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    private Mono<Void> doClose() {
        return this.ownsLoopResources ? Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater()) : Mono.empty();
    }

    private Mono<WebsocketGatewayClientSession> getOrConnect() {
        return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
    }

    private Mono<WebsocketGatewayClientSession> getOrConnect0(Mono<WebsocketGatewayClientSession> prev) {
        if (prev != null) {
            return prev;
        }
        Duration keepAliveInterval = this.settings.keepAliveInterval();
        return ((HttpClient.WebsocketSender)this.httpClient.websocket().uri("/")).connect().map(connection -> keepAliveInterval != Duration.ZERO ? connection.onReadIdle(keepAliveInterval.toMillis(), () -> this.onReadIdle((Connection)connection)).onWriteIdle(keepAliveInterval.toMillis(), () -> this.onWriteIdle((Connection)connection)) : connection).map(connection -> {
            WebsocketGatewayClientSession session = new WebsocketGatewayClientSession(this.codec, (Connection)connection);
            LOGGER.info("Created session: {}", (Object)session);
            session.onClose().doOnTerminate(() -> {
                websocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Closed session: {}", (Object)session);
            }).subscribe(null, th -> LOGGER.warn("Exception on closing session: {}, cause: {}", (Object)session, (Object)th.toString()));
            return session;
        }).doOnError(ex -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), this.settings.port(), ex});
            websocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private void onWriteIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on writeIdle");
        connection.outbound().sendObject((Object)new PingWebSocketFrame()).then().subscribe(null, ex -> LOGGER.warn("Can't send keepalive on writeIdle: " + ex));
    }

    private void onReadIdle(Connection connection) {
        LOGGER.debug("Sending keepalive on readIdle");
        connection.outbound().sendObject((Object)new PingWebSocketFrame()).then().subscribe(null, ex -> LOGGER.warn("Can't send keepalive on readIdle: " + ex));
    }

    private ByteBuf encodeRequest(ServiceMessage message, long sid) {
        return this.codec.encode(ServiceMessage.from((ServiceMessage)message).header(STREAM_ID, (Object)sid).build());
    }
}

