package reactor.io.netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import java.io.IOException;
import java.net.URI;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.subscriber.BaseSubscriber;
import reactor.core.util.BackpressureUtils;
import reactor.core.util.EmptySubscription;
import reactor.io.ipc.ChannelHandler;
import reactor.io.netty.common.ChannelBridge;
import reactor.io.netty.common.NettyChannel;
import reactor.io.netty.common.NettyChannelHandler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/io/netty/http/NettyHttpClientHandler.class */
public class NettyHttpClientHandler extends NettyChannelHandler<HttpClientChannel> {
    HttpClientChannel httpChannel;
    DirectProcessor<Void> connectSignal;
    Subscriber<? super HttpClientResponse> replySubscriber;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyHttpClientHandler(ChannelHandler<ByteBuf, ByteBuf, NettyChannel> channelHandler, ChannelBridge<HttpClientChannel> channelBridge, Channel channel) {
        super(channelHandler, channelBridge, channel);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyHttpClientHandler(ChannelHandler<ByteBuf, ByteBuf, NettyChannel> channelHandler, ChannelBridge<HttpClientChannel> channelBridge, Channel channel, NettyHttpClientHandler nettyHttpClientHandler) {
        super(channelHandler, channelBridge, channel, nettyHttpClientHandler);
        this.httpChannel = nettyHttpClientHandler.httpChannel;
        this.replySubscriber = nettyHttpClientHandler.replySubscriber;
    }

    @Override // reactor.io.netty.common.NettyChannelHandler
    public void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.fireChannelActive();
        if (this.httpChannel != null) {
            return;
        }
        this.httpChannel = (HttpClientChannel) this.bridgeFactory.createChannelBridge(channelHandlerContext.channel(), this.input, new Object[0]);
        this.httpChannel.keepAlive(true);
        HttpUtil.setTransferEncodingChunked(this.httpChannel.nettyRequest, true);
        ((Publisher) this.handler.apply(this.httpChannel)).subscribe(new BaseSubscriber<Void>() { // from class: reactor.io.netty.http.NettyHttpClientHandler.1
            public void onSubscribe(Subscription subscription) {
                channelHandlerContext.read();
                BackpressureUtils.validate((Subscription) null, subscription);
                subscription.request(Long.MAX_VALUE);
            }

            public void onError(Throwable th) {
                super.onError(th);
                if ((th instanceof IOException) && th.getMessage() != null && th.getMessage().contains("Broken pipe")) {
                    if (NettyHttpClientHandler.log.isDebugEnabled()) {
                        NettyHttpClientHandler.log.debug("Connection closed remotely", th);
                    }
                } else if (channelHandlerContext.channel().isOpen()) {
                    channelHandlerContext.channel().close();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void bridgeReply(Subscriber<? super HttpClientResponse> subscriber, DirectProcessor<Void> directProcessor) {
        this.replySubscriber = subscriber;
        this.connectSignal = directProcessor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.netty.common.NettyChannelHandler
    public void doOnTerminate(ChannelHandlerContext channelHandlerContext, ChannelFuture channelFuture, ChannelPromise channelPromise, Throwable th) {
        channelHandlerContext.channel().write(LastHttpContent.EMPTY_LAST_CONTENT);
        super.doOnTerminate(channelHandlerContext, channelFuture, channelPromise, th);
    }

    @Override // reactor.io.netty.common.NettyChannelHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!HttpResponse.class.isAssignableFrom(obj.getClass())) {
            if (LastHttpContent.EMPTY_LAST_CONTENT != obj) {
                doRead(obj);
            }
            postRead(channelHandlerContext, obj);
            return;
        }
        HttpResponse httpResponse = (HttpResponse) obj;
        if (this.httpChannel != null) {
            this.httpChannel.setNettyResponse(httpResponse);
        }
        if (log.isDebugEnabled()) {
            log.debug("Received response (auto-read:{}) : {}", new Object[]{Boolean.valueOf(channelHandlerContext.channel().config().isAutoRead()), this.httpChannel.headers().toString()});
        }
        if (checkResponseCode(channelHandlerContext, httpResponse)) {
            channelHandlerContext.fireChannelRead(obj);
            if (this.replySubscriber != null) {
                Flux.just(this.httpChannel).subscribe(this.replySubscriber);
            } else {
                log.debug("No Response/ HttpInbound subscriber on {}, msg is dropped {}", new Object[]{channelHandlerContext.channel(), obj});
            }
        }
        postRead(channelHandlerContext, obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final NettyWebSocketClientHandler withWebsocketSupport(URI uri, String str, boolean z) {
        if (this.httpChannel.markHeadersAsFlushed()) {
            return new NettyWebSocketClientHandler(uri, str, this, z);
        }
        log.error("Cannot enable websocket if headers have already been sent");
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final boolean checkResponseCode(ChannelHandlerContext channelHandlerContext, HttpResponse httpResponse) throws Exception {
        int code = httpResponse.status().code();
        if (code >= 400) {
            HttpException httpException = new HttpException(this.httpChannel);
            if (this.connectSignal != null) {
                this.connectSignal.onError(httpException);
                return false;
            }
            if (this.replySubscriber == null) {
                return false;
            }
            EmptySubscription.error(this.replySubscriber, httpException);
            return false;
        }
        if (code < 300 || !this.httpChannel.isFollowRedirect()) {
            if (this.connectSignal == null) {
                return true;
            }
            this.connectSignal.onComplete();
            return true;
        }
        RedirectException redirectException = new RedirectException(this.httpChannel);
        if (this.connectSignal != null) {
            this.connectSignal.onError(redirectException);
            return false;
        }
        if (this.replySubscriber == null) {
            return false;
        }
        EmptySubscription.error(this.replySubscriber, redirectException);
        return false;
    }

    protected void postRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj instanceof LastHttpContent) {
            channelHandlerContext.channel().close();
            downstream().complete();
        }
    }

    @Override // reactor.io.netty.common.NettyChannelHandler
    public String getName() {
        return this.httpChannel != null ? this.httpChannel.getName() : "HTTP Client Connection";
    }
}
