/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderException;
import io.servicetalk.buffer.netty.BufferUtils;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.RejectedSubscribeError;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.http.api.DefaultHttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequests;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.netty.AbstractStreamingHttpConnection;
import io.servicetalk.http.netty.DefaultBlockingStreamingHttpResponseFactory;
import io.servicetalk.http.netty.DefaultHttpResponseFactory;
import io.servicetalk.http.netty.DefaultStreamingHttpResponseFactory;
import io.servicetalk.http.netty.H1ProtocolConfig;
import io.servicetalk.http.netty.HeaderUtils;
import io.servicetalk.http.netty.HttpDebugUtils;
import io.servicetalk.http.netty.HttpKeepAlive;
import io.servicetalk.http.netty.HttpRequestDecoder;
import io.servicetalk.http.netty.HttpResponseEncoder;
import io.servicetalk.http.netty.ReadOnlyHttpServerConfig;
import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle;
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpServerConfig;
import io.servicetalk.tcp.netty.internal.TcpServerBinder;
import io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer;
import io.servicetalk.transport.api.ConnectionAcceptor;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.FlushStrategies;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnection;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.SplittingFlushStrategy;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NettyHttpServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyHttpServer.class);

    private NettyHttpServer() {
    }

    static Single<ServerContext> bind(HttpExecutionContext executionContext, ReadOnlyHttpServerConfig config, SocketAddress address, @Nullable ConnectionAcceptor connectionAcceptor, StreamingHttpService service, boolean drainRequestPayloadBody) {
        if (config.h1Config() == null) {
            return Single.failed(NettyHttpServer.newH1ConfigException());
        }
        ReadOnlyTcpServerConfig tcpServerConfig = config.tcpConfig();
        return TcpServerBinder.bind(address, tcpServerConfig, false, executionContext, connectionAcceptor, (channel, connectionObserver) -> NettyHttpServer.initChannel(channel, executionContext, config, new TcpServerChannelInitializer(tcpServerConfig, (ConnectionObserver)connectionObserver), service, drainRequestPayloadBody, connectionObserver), serverConnection -> serverConnection.process(true)).map(delegate -> {
            LOGGER.debug("Started HTTP/1.1 server for address {}.", (Object)delegate.listenAddress());
            return new NettyHttpServerContext((ServerContext)delegate, service);
        });
    }

    private static Throwable newH1ConfigException() {
        return new IllegalStateException("HTTP/1.x channel initialization failure due to missing HTTP/1.x configuration");
    }

    static Single<NettyHttpServerConnection> initChannel(Channel channel, HttpExecutionContext httpExecutionContext, ReadOnlyHttpServerConfig config, ChannelInitializer initializer, StreamingHttpService service, boolean drainRequestPayloadBody, ConnectionObserver observer) {
        return NettyHttpServer.initChannel(channel, httpExecutionContext, config, initializer, service, drainRequestPayloadBody, observer, CloseHandler.forPipelinedRequestResponse(false, channel.config()));
    }

    private static Single<NettyHttpServerConnection> initChannel(Channel channel, HttpExecutionContext httpExecutionContext, ReadOnlyHttpServerConfig config, ChannelInitializer initializer, StreamingHttpService service, boolean drainRequestPayloadBody, ConnectionObserver observer, CloseHandler closeHandler) {
        H1ProtocolConfig h1Config = config.h1Config();
        if (h1Config == null) {
            return Single.failed(NettyHttpServer.newH1ConfigException());
        }
        return HttpDebugUtils.showPipeline(DefaultNettyConnection.initChannel(channel, httpExecutionContext.bufferAllocator(), httpExecutionContext.executor(), HeaderUtils.LAST_CHUNK_PREDICATE, closeHandler, config.tcpConfig().flushStrategy(), config.tcpConfig().idleTimeoutMs(), initializer.andThen(NettyHttpServer.getChannelInitializer(BufferUtils.getByteBufAllocator(httpExecutionContext.bufferAllocator()), h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HttpProtocolVersion.HTTP_1_1, observer, false, __ -> false).map(conn -> new NettyHttpServerConnection((NettyConnection<Object, Object>)conn, service, httpExecutionContext.executionStrategy(), HttpProtocolVersion.HTTP_1_1, h1Config.headersFactory(), drainRequestPayloadBody, config.allowDropTrailersReadFromTransport())), HttpProtocolVersion.HTTP_1_1, channel);
    }

    private static ChannelInitializer getChannelInitializer(ByteBufAllocator alloc, H1ProtocolConfig config, CloseHandler closeHandler) {
        return new CopyByteBufHandlerChannelInitializer(alloc).andThen(channel -> {
            ArrayDeque<HttpRequestMethod> methodQueue = new ArrayDeque<HttpRequestMethod>(2);
            ChannelPipeline pipeline = channel.pipeline();
            HttpRequestDecoder decoder = new HttpRequestDecoder(methodQueue, alloc, config.headersFactory(), config.maxStartLineLength(), config.maxHeaderFieldLength(), config.specExceptions().allowPrematureClosureBeforePayloadBody(), config.specExceptions().allowLFWithoutCR(), closeHandler);
            pipeline.addLast(decoder);
            pipeline.addLast(new HttpResponseEncoder(methodQueue, config.headersEncodedSizeEstimate(), config.trailersEncodedSizeEstimate(), closeHandler, decoder));
        });
    }

    private static final class ErrorLoggingHttpSubscriber
    implements CompletableSource.Subscriber {
        private static final Logger LOGGER = LoggerFactory.getLogger(ErrorLoggingHttpSubscriber.class);
        private final NettyConnection<Object, Object> connection;

        ErrorLoggingHttpSubscriber(NettyConnection<Object, Object> connection) {
            this.connection = connection;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
        }

        @Override
        public void onComplete() {
        }

        @Override
        public void onError(Throwable t) {
            if (t instanceof CloseHandler.CloseEventObservedException) {
                CloseHandler.CloseEventObservedException ceoe = (CloseHandler.CloseEventObservedException)t;
                if (ceoe.event() == CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND && t.getCause() instanceof ClosedChannelException) {
                    LOGGER.trace("{} Client closed the {} connection without sending {}.", this.connection, this.connection.protocol(), HttpProtocolVersion.HTTP_2_0.equals(this.connection.protocol()) ? "GO_AWAY" : "'Connection: close' header", t);
                    return;
                }
                if (t.getCause() instanceof DecoderException) {
                    ErrorLoggingHttpSubscriber.logDecoderException((DecoderException)t.getCause(), this.connection);
                    return;
                }
            } else if (t instanceof DecoderException) {
                ErrorLoggingHttpSubscriber.logDecoderException((DecoderException)t, this.connection);
                return;
            }
            LOGGER.debug("{} Unexpected error received, closing {} {} due to:", this.connection, this.connection.protocol(), HttpProtocolVersion.HTTP_2_0.equals(this.connection.protocol()) ? "stream" : "connection", t);
        }

        private static void logDecoderException(DecoderException e, NettyConnection<Object, Object> connection) {
            LOGGER.warn("{} Can not decode a message, no more requests will be received on this {} {}.", connection, connection.protocol(), HttpProtocolVersion.HTTP_2_0.equals(connection.protocol()) ? "stream" : "connection", e);
        }
    }

    private static final class SingleSubscriberProcessor
    extends SubscribableCompletable
    implements CompletableSource.Processor,
    Cancellable {
        private static final Object CANCELLED = new Object();
        private static final AtomicReferenceFieldUpdater<SingleSubscriberProcessor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(SingleSubscriberProcessor.class, Object.class, "state");
        @Nullable
        private volatile Object state;

        private SingleSubscriberProcessor() {
        }

        @Override
        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            Object cState;
            subscriber.onSubscribe(this);
            do {
                if ((cState = this.state) instanceof TerminalNotification) {
                    TerminalNotification terminalNotification = (TerminalNotification)cState;
                    terminalNotification.terminate(subscriber);
                } else {
                    if (!(cState instanceof CompletableSource.Subscriber)) continue;
                    subscriber.onError(new DuplicateSubscribeException(cState, subscriber));
                }
                break;
            } while (cState != CANCELLED && (cState != null || !stateUpdater.compareAndSet(this, null, subscriber)));
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
        }

        @Override
        public void onComplete() {
            Object oldState = stateUpdater.getAndSet(this, TerminalNotification.complete());
            if (oldState instanceof CompletableSource.Subscriber) {
                ((CompletableSource.Subscriber)oldState).onComplete();
            }
        }

        @Override
        public void onError(Throwable t) {
            Object oldState = stateUpdater.getAndSet(this, TerminalNotification.error(t));
            if (oldState instanceof CompletableSource.Subscriber) {
                ((CompletableSource.Subscriber)oldState).onError(t);
            }
        }

        @Override
        public void cancel() {
            this.state = CANCELLED;
        }
    }

    static final class NettyHttpServerConnection
    extends HttpServiceContext
    implements NettyConnectionContext {
        private static final Logger LOGGER = LoggerFactory.getLogger(NettyHttpServerConnection.class);
        private final StreamingHttpService service;
        private final HttpExecutionStrategy strategy;
        private final NettyConnection<Object, Object> connection;
        private final HttpHeadersFactory headersFactory;
        private final HttpExecutionContext executionContext;
        private final SplittingFlushStrategy splittingFlushStrategy;
        private final boolean drainRequestPayloadBody;
        private final boolean requireTrailerHeader;

        NettyHttpServerConnection(NettyConnection<Object, Object> connection, StreamingHttpService service, HttpExecutionStrategy strategy, HttpProtocolVersion version, HttpHeadersFactory headersFactory, boolean drainRequestPayloadBody, boolean requireTrailerHeader) {
            super(headersFactory, new DefaultHttpResponseFactory(headersFactory, connection.executionContext().bufferAllocator(), version), new DefaultStreamingHttpResponseFactory(headersFactory, connection.executionContext().bufferAllocator(), version), new DefaultBlockingStreamingHttpResponseFactory(headersFactory, connection.executionContext().bufferAllocator(), version));
            this.connection = connection;
            this.headersFactory = headersFactory;
            this.executionContext = new DefaultHttpExecutionContext(connection.executionContext().bufferAllocator(), connection.executionContext().ioExecutor(), connection.executionContext().executor(), strategy);
            this.service = service;
            this.strategy = strategy;
            this.splittingFlushStrategy = new SplittingFlushStrategy(connection.defaultFlushStrategy(), itemWritten -> {
                if (itemWritten instanceof HttpResponseMetaData) {
                    HttpResponseMetaData metadata = (HttpResponseMetaData)itemWritten;
                    return this.protocol().major() > 1 && HeaderUtils.emptyMessageBody(metadata) ? SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.End : SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.Start;
                }
                if (itemWritten instanceof HttpHeaders) {
                    return SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.End;
                }
                return SplittingFlushStrategy.FlushBoundaryProvider.FlushBoundary.InProgress;
            });
            connection.updateFlushStrategy((current, isCurrentOriginal) -> this.splittingFlushStrategy);
            this.drainRequestPayloadBody = drainRequestPayloadBody;
            this.requireTrailerHeader = requireTrailerHeader;
        }

        void process(boolean handleMultipleRequests) {
            Single<StreamingHttpRequest> requestSingle = this.connection.read().liftSyncToSingle(new SpliceFlatStreamToMetaSingle((meta, payload) -> StreamingHttpRequests.newTransportRequest(meta.method(), meta.requestTarget(), meta.version(), meta.headers(), this.executionContext().bufferAllocator(), payload, this.requireTrailerHeader, this.headersFactory)));
            SourceAdapters.toSource(this.handleRequestAndWriteResponse(requestSingle, handleMultipleRequests)).subscribe(new ErrorLoggingHttpSubscriber(this.connection));
        }

        @Override
        public Cancellable updateFlushStrategy(NettyConnectionContext.FlushStrategyProvider strategyProvider) {
            return this.splittingFlushStrategy.updateFlushStrategy(strategyProvider);
        }

        @Override
        public FlushStrategy defaultFlushStrategy() {
            return this.connection.defaultFlushStrategy();
        }

        private Completable handleRequestAndWriteResponse(Single<StreamingHttpRequest> requestSingle, boolean handleMultipleRequests) {
            Publisher responseObjectPublisher = requestSingle.flatMapPublisher(rawRequest -> {
                final SingleSubscriberProcessor requestCompletion = new SingleSubscriberProcessor();
                AtomicBoolean payloadSubscribed = this.drainRequestPayloadBody ? new AtomicBoolean() : null;
                AtomicBoolean responseSent = HeaderUtils.REQ_EXPECT_CONTINUE.test((HttpRequestMetaData)rawRequest) ? new AtomicBoolean() : null;
                StreamingHttpRequest request = rawRequest.transformMessageBody(payload -> payload.afterSubscriber(() -> {
                    if (this.drainRequestPayloadBody) {
                        payloadSubscribed.set(true);
                    }
                    if (responseSent != null && !responseSent.get()) {
                        Channel channel = this.nettyChannel();
                        if (channel.eventLoop().inEventLoop()) {
                            channel.write(this.streamingResponseFactory().continueResponse());
                        } else {
                            channel.eventLoop().execute(() -> channel.write(this.streamingResponseFactory().continueResponse()));
                        }
                    }
                    return new PublisherSource.Subscriber<Object>(){

                        @Override
                        public void onSubscribe(PublisherSource.Subscription s) {
                        }

                        @Override
                        public void onNext(Object obj) {
                        }

                        @Override
                        public void onError(Throwable t) {
                            if (!drainRequestPayloadBody || !(t instanceof RejectedSubscribeError)) {
                                requestCompletion.onComplete();
                            }
                        }

                        @Override
                        public void onComplete() {
                            requestCompletion.onComplete();
                        }
                    };
                }));
                HttpRequestMethod requestMethod = request.method();
                Publisher<Object> responsePublisher = this.strategy.invokeService(this.executionContext().executor(), request, req -> this.service.handle(this, (StreamingHttpRequest)req, this.streamingResponseFactory()).flatMapPublisher(response -> {
                    FlushStrategy flushStrategy;
                    if (responseSent != null) {
                        responseSent.set(true);
                    }
                    if ((flushStrategy = NettyHttpServerConnection.determineFlushStrategyForApi(response)) != null) {
                        this.splittingFlushStrategy.updateFlushStrategy((prev, isOriginal) -> isOriginal ? flushStrategy : prev, 1);
                    }
                    return NettyHttpServerConnection.handleResponse(this.protocol(), requestMethod, response);
                }), (cause, executor) -> {
                    StreamingHttpResponse errorResponse = this.newErrorResponse((Throwable)cause, (Executor)executor, request);
                    return HeaderUtils.flatEmptyMessage(this.protocol(), errorResponse, errorResponse.messageBody());
                });
                if (this.drainRequestPayloadBody) {
                    responsePublisher = responsePublisher.concat(Completable.defer(() -> payloadSubscribed.get() ? Completable.completed() : request.messageBody().ignoreElements().onErrorComplete()));
                }
                return responsePublisher.concat(requestCompletion);
            });
            return this.connection.write(handleMultipleRequests ? responseObjectPublisher.repeat(val -> true) : responseObjectPublisher);
        }

        @Nonnull
        private static Publisher<Object> handleResponse(HttpProtocolVersion protocolVersion, HttpRequestMethod requestMethod, StreamingHttpResponse response) {
            if (HeaderUtils.canAddResponseContentLength(response, requestMethod)) {
                return HeaderUtils.setResponseContentLength(protocolVersion, response);
            }
            Publisher<Object> flatResponse = HeaderUtils.emptyMessageBody(response, response.messageBody()) ? HeaderUtils.flatEmptyMessage(protocolVersion, response, response.messageBody()) : Single.succeeded(response).concat(response.messageBody()).scanWith(HeaderUtils::insertTrailersMapper);
            HeaderUtils.addResponseTransferEncodingIfNecessary(response, requestMethod);
            return flatResponse;
        }

        @Nullable
        private static FlushStrategy determineFlushStrategyForApi(HttpResponseMetaData response) {
            return AbstractStreamingHttpConnection.isSafeToAggregateOrEmpty(response) ? FlushStrategies.flushOnEnd() : null;
        }

        private StreamingHttpResponse newErrorResponse(Throwable cause, Executor executor, StreamingHttpRequest request) {
            HttpResponseStatus status;
            if (cause instanceof RejectedExecutionException) {
                status = HttpResponseStatus.SERVICE_UNAVAILABLE;
                LOGGER.error("Task rejected by Executor {} for connection={}, request='{} {} {}'. Returning: {}", executor, this, request.method(), request.requestTarget(), request.version(), status, cause);
            } else {
                status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
                LOGGER.error("Internal server error service={} connection={}", this.service, this, cause);
                LOGGER.error("Unexpected exception during service processing for connection={}, request='{} {} {}'. Trying to return: {}", this, request.method(), request.requestTarget(), request.version(), status, cause);
            }
            StreamingHttpResponse response = this.streamingResponseFactory().newResponse(status).version(request.version()).setHeader(HttpHeaderNames.CONTENT_LENGTH, HttpHeaderValues.ZERO);
            HttpKeepAlive.responseKeepAlive(request).addConnectionHeaderIfNecessary(response);
            return response;
        }

        @Override
        public SocketAddress localAddress() {
            return this.connection.localAddress();
        }

        @Override
        public SocketAddress remoteAddress() {
            return this.connection.remoteAddress();
        }

        @Override
        @Nullable
        public SSLSession sslSession() {
            return this.connection.sslSession();
        }

        @Override
        public HttpExecutionContext executionContext() {
            return this.executionContext;
        }

        @Override
        @Nullable
        public <T> T socketOption(SocketOption<T> option) {
            return this.connection.socketOption(option);
        }

        @Override
        public HttpProtocolVersion protocol() {
            return (HttpProtocolVersion)this.connection.protocol();
        }

        @Override
        public Single<Throwable> transportError() {
            return this.connection.transportError();
        }

        @Override
        public Completable onClosing() {
            return this.connection.onClosing();
        }

        @Override
        public Completable onClose() {
            return this.connection.onClose();
        }

        @Override
        public Completable closeAsync() {
            return this.connection.closeAsync();
        }

        @Override
        public Completable closeAsyncGracefully() {
            return this.connection.closeAsyncGracefully();
        }

        @Override
        public Channel nettyChannel() {
            return this.connection.nettyChannel();
        }

        public String toString() {
            return this.connection.toString();
        }
    }

    static final class NettyHttpServerContext
    implements ServerContext {
        private final ServerContext delegate;
        private final ListenableAsyncCloseable asyncCloseable;

        NettyHttpServerContext(ServerContext delegate, StreamingHttpService service) {
            this.delegate = delegate;
            this.asyncCloseable = AsyncCloseables.toListenableAsyncCloseable(AsyncCloseables.newCompositeCloseable().appendAll(service, delegate));
        }

        @Override
        public SocketAddress listenAddress() {
            return this.delegate.listenAddress();
        }

        @Override
        public ExecutionContext executionContext() {
            return this.delegate.executionContext();
        }

        @Override
        public Completable closeAsync() {
            return this.asyncCloseable.closeAsync().whenFinally(() -> LOGGER.debug("Stopped HTTP server for address {}.", (Object)this.listenAddress()));
        }

        @Override
        public Completable closeAsyncGracefully() {
            return this.asyncCloseable.closeAsyncGracefully();
        }

        @Override
        public Completable onClose() {
            return this.asyncCloseable.onClose();
        }

        public String toString() {
            return this.delegate.toString();
        }
    }
}

