/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.http;

import io.netty.buffer.ByteBuf;
import io.scalecube.services.api.Qualifier;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class HttpGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayClient.class);
    private final GatewayClientCodec<ByteBuf> codec;
    private final HttpClient httpClient;
    private final LoopResources loopResources;
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec) {
        this.codec = codec;
        this.loopResources = LoopResources.create((String)"http-gateway-client");
        this.httpClient = HttpClient.create((ConnectionProvider)ConnectionProvider.elastic((String)"http-gateway-client")).followRedirect(settings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (settings.sslProvider() != null) {
                tcpClient = tcpClient.secure(settings.sslProvider());
            }
            return tcpClient.runOn(this.loopResources).host(settings.host()).port(settings.port());
        });
        this.close.then(this.doClose()).doFinally(s -> this.onClose.onComplete()).doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources")).subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return Mono.defer(() -> {
            BiFunction<HttpClientRequest, NettyOutbound, Publisher> sender = (httpRequest, out) -> {
                LOGGER.debug("Sending request {}", (Object)request);
                request.headers().forEach((arg_0, arg_1) -> ((HttpClientRequest)httpRequest).header(arg_0, arg_1));
                return out.sendObject((Publisher)Mono.just((Object)this.codec.encode(request))).then();
            };
            return ((HttpClient.RequestSender)this.httpClient.post().uri(request.qualifier())).send(sender).responseSingle((httpResponse, bbMono) -> bbMono.map(ByteBuf::retain).map(content -> this.toMessage((HttpClientResponse)httpResponse, (ByteBuf)content)));
        });
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return Flux.error((Throwable)new UnsupportedOperationException("requestStream is not supported by HTTP/1.x"));
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return Flux.error((Throwable)new UnsupportedOperationException("requestChannel is not supported by HTTP/1.x"));
    }

    @Override
    public void close() {
        this.close.onComplete();
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        return Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater());
    }

    public GatewayClientCodec<ByteBuf> getCodec() {
        return this.codec;
    }

    private ServiceMessage toMessage(HttpClientResponse httpResponse, ByteBuf content) {
        int httpCode = httpResponse.status().code();
        String qualifier = this.isError(httpCode) ? Qualifier.asError((int)httpCode) : httpResponse.uri();
        ServiceMessage.Builder builder = ServiceMessage.builder().qualifier(qualifier).data((Object)content);
        httpResponse.responseHeaders().entries().forEach(entry -> builder.header((String)entry.getKey(), (String)entry.getValue()));
        ServiceMessage message = builder.build();
        LOGGER.debug("Received response {}", (Object)message);
        return message;
    }

    private boolean isError(int httpCode) {
        return httpCode >= 400 && httpCode <= 599;
    }
}

