/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import io.scalecube.services.gateway.transport.websocket.Signal;
import io.scalecube.services.gateway.transport.websocket.WebsocketSession;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.LoopResources;

public final class WebsocketGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketClientTransport.class);
    private static final String STREAM_ID = "sid";
    private static final String SIGNAL = "sig";
    private static final AtomicReferenceFieldUpdater<WebsocketGatewayClient, Mono> websocketMonoUpdater = AtomicReferenceFieldUpdater.newUpdater(WebsocketGatewayClient.class, Mono.class, "websocketMono");
    private final GatewayClientCodec<ByteBuf> codec;
    private final GatewayClientSettings settings;
    private final HttpClient httpClient;
    private final AtomicLong sidCounter = new AtomicLong();
    private final LoopResources loopResources;
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private volatile Mono<?> websocketMono;

    public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec) {
        this.settings = settings;
        this.codec = codec;
        this.loopResources = LoopResources.create((String)"websocket-gateway-client");
        this.httpClient = HttpClient.newConnection().followRedirect(settings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (settings.sslProvider() != null) {
                tcpClient = tcpClient.secure(settings.sslProvider());
            }
            return tcpClient.runOn(this.loopResources).host(settings.host()).port(settings.port());
        });
        this.close.then(this.doClose()).doFinally(s -> this.onClose.onComplete()).doOnTerminate(() -> LOGGER.info("Closed WebsocketGatewayClient resources")).subscribe(null, ex -> LOGGER.warn("Exception occurred on WebsocketGatewayClient close: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return Mono.defer(() -> {
            long sid = this.sidCounter.incrementAndGet();
            ByteBuf byteBuf = this.encodeRequest(request, sid);
            return this.getOrConnect().flatMap(session -> session.send(byteBuf, sid).then(Mono.create(sink -> session.receive(sid).subscribe(arg_0 -> ((MonoSink)sink).success(arg_0), arg_0 -> ((MonoSink)sink).error(arg_0), () -> ((MonoSink)sink).success()))).doOnCancel(() -> this.handleCancel(sid, (WebsocketSession)session)));
        });
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return Flux.defer(() -> {
            long sid = this.sidCounter.incrementAndGet();
            ByteBuf byteBuf = this.encodeRequest(request, sid);
            return this.getOrConnect().flatMapMany(session -> session.send(byteBuf, sid).thenMany((Publisher)Flux.create(sink -> session.receive(sid).subscribe(arg_0 -> ((FluxSink)sink).next(arg_0), arg_0 -> ((FluxSink)sink).error(arg_0), () -> ((FluxSink)sink).complete()))).doOnCancel(() -> this.handleCancel(sid, (WebsocketSession)session)));
        });
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return Flux.error((Throwable)new UnsupportedOperationException("Request channel is not supported by WebSocket transport implementation"));
    }

    @Override
    public void close() {
        this.close.onComplete();
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        return Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater());
    }

    public GatewayClientCodec<ByteBuf> getCodec() {
        return this.codec;
    }

    private Mono<WebsocketSession> getOrConnect() {
        return Mono.defer(() -> websocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
    }

    private Mono<WebsocketSession> getOrConnect0(Mono<WebsocketSession> prev) {
        if (prev != null) {
            return prev;
        }
        return ((HttpClient.WebsocketSender)this.httpClient.websocket().uri("/")).connect().map(connection -> {
            WebsocketSession session = new WebsocketSession(this.codec, (Connection)connection);
            LOGGER.info("Created {} on {}:{}", new Object[]{session, this.settings.host(), this.settings.port()});
            session.onClose().doOnTerminate(() -> {
                websocketMonoUpdater.getAndSet(this, null);
                LOGGER.info("Closed {} on {}:{}", new Object[]{session, this.settings.host(), this.settings.port()});
            }).subscribe(null, th -> LOGGER.warn("Exception on closing session={}", (Object)session.id(), th));
            return session;
        }).doOnError(ex -> {
            LOGGER.warn("Failed to connect on {}:{}, cause: {}", new Object[]{this.settings.host(), this.settings.port(), ex});
            websocketMonoUpdater.getAndSet(this, null);
        }).cache();
    }

    private Disposable handleCancel(long sid, WebsocketSession session) {
        ByteBuf byteBuf = this.codec.encode(ServiceMessage.builder().header(STREAM_ID, (Object)sid).header(SIGNAL, Signal.CANCEL.codeAsString()).build());
        return session.send(byteBuf, sid).subscribe(null, th -> LOGGER.error("Exception on sending CANCEL signal for session={}", (Object)session.id(), th));
    }

    private ByteBuf encodeRequest(ServiceMessage message, long sid) {
        return this.codec.encode(ServiceMessage.from((ServiceMessage)message).header(STREAM_ID, (Object)sid).build());
    }
}

