/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.gateway.transport.http;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpHeaders;
import io.scalecube.reactor.RetryNonSerializedEmitFailureHandler;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

public final class HttpGatewayClient
implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayClient.class);
    private final GatewayClientCodec<ByteBuf> codec;
    private final HttpClient httpClient;
    private final LoopResources loopResources;
    private final boolean ownsLoopResources;
    private final Sinks.One<Void> close = Sinks.one();
    private final Sinks.One<Void> onClose = Sinks.one();

    public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec) {
        this(settings, codec, LoopResources.create((String)"http-gateway-client"), true);
    }

    public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec, LoopResources loopResources) {
        this(settings, codec, loopResources, false);
    }

    private HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec<ByteBuf> codec, LoopResources loopResources, boolean ownsLoopResources) {
        this.codec = codec;
        this.loopResources = loopResources;
        this.ownsLoopResources = ownsLoopResources;
        HttpClient httpClient = ((HttpClient)((HttpClient)HttpClient.create((ConnectionProvider)ConnectionProvider.create((String)"http-gateway-client")).headers(headers -> settings.headers().forEach((arg_0, arg_1) -> ((HttpHeaders)headers).add(arg_0, arg_1))).followRedirect(settings.followRedirect()).wiretap(settings.wiretap()).runOn(loopResources)).host(settings.host())).port(settings.port());
        if (settings.sslProvider() != null) {
            httpClient = httpClient.secure(settings.sslProvider());
        }
        this.httpClient = httpClient;
        this.close.asMono().then(this.doClose()).doFinally(s -> this.onClose.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED)).doOnTerminate(() -> LOGGER.info("Closed HttpGatewayClient resources")).subscribe(null, ex -> LOGGER.warn("Exception occurred on HttpGatewayClient close: " + ex));
    }

    @Override
    public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
        return Mono.defer(() -> {
            BiFunction<HttpClientRequest, NettyOutbound, Publisher> sender = (httpRequest, out) -> {
                LOGGER.debug("Sending request {}", (Object)request);
                request.headers().forEach((arg_0, arg_1) -> ((HttpClientRequest)httpRequest).header(arg_0, arg_1));
                return out.sendObject((Publisher)Mono.just((Object)this.codec.encode(request))).then();
            };
            return ((HttpClient.RequestSender)this.httpClient.post().uri("/" + request.qualifier())).send(sender).responseSingle((httpResponse, bbMono) -> bbMono.map(ByteBuf::retain).map(content -> this.toMessage((HttpClientResponse)httpResponse, (ByteBuf)content)));
        });
    }

    @Override
    public Flux<ServiceMessage> requestStream(ServiceMessage request) {
        return Flux.error((Throwable)new UnsupportedOperationException("requestStream is not supported by HTTP/1.x"));
    }

    @Override
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> requests) {
        return Flux.error((Throwable)new UnsupportedOperationException("requestChannel is not supported by HTTP/1.x"));
    }

    @Override
    public void close() {
        this.close.emitEmpty((Sinks.EmitFailureHandler)RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED);
    }

    @Override
    public Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    private Mono<Void> doClose() {
        return this.ownsLoopResources ? Mono.defer(() -> ((LoopResources)this.loopResources).disposeLater()) : Mono.empty();
    }

    private ServiceMessage toMessage(HttpClientResponse httpResponse, ByteBuf content) {
        ServiceMessage.Builder builder = ServiceMessage.builder().qualifier(httpResponse.uri()).data((Object)content);
        int httpCode = httpResponse.status().code();
        if (this.isError(httpCode)) {
            builder.header("errorType", String.valueOf(httpCode));
        }
        httpResponse.responseHeaders().entries().forEach(entry -> builder.header((String)entry.getKey(), (String)entry.getValue()));
        ServiceMessage message = builder.build();
        LOGGER.debug("Received response {}", (Object)message);
        return message;
    }

    private boolean isError(int httpCode) {
        return httpCode >= 400 && httpCode <= 599;
    }
}

