/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConsumableEvent;
import io.servicetalk.client.api.internal.IgnoreConsumedEvent;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.ClientInvoker;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpApiConversions;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpMetaData;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponses;
import io.servicetalk.http.netty.DefaultNettyHttpConnectionContext;
import io.servicetalk.http.netty.HeaderUtils;
import io.servicetalk.http.netty.SpliceFlatStreamToMetaSingle;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.netty.internal.FlushStrategies;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import java.util.Objects;
import javax.annotation.Nullable;

abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext>
implements FilterableStreamingHttpConnection,
ClientInvoker<FlushStrategy> {
    private static final IgnoreConsumedEvent<Integer> ZERO_MAX_CONCURRENCY_EVENT = new IgnoreConsumedEvent<Integer>(0);
    final CC connection;
    private final HttpConnectionContext connectionContext;
    private final Publisher<? extends ConsumableEvent<Integer>> maxConcurrencySetting;
    private final StreamingHttpRequestResponseFactory reqRespFactory;
    private final HttpHeadersFactory headersFactory;
    private final boolean allowDropTrailersReadFromTransport;

    AbstractStreamingHttpConnection(CC conn, int maxPipelinedRequests, HttpExecutionContext executionContext, StreamingHttpRequestResponseFactory reqRespFactory, HttpHeadersFactory headersFactory, boolean allowDropTrailersReadFromTransport) {
        this.connection = (NettyConnectionContext)Objects.requireNonNull(conn);
        this.connectionContext = new DefaultNettyHttpConnectionContext((NettyConnectionContext)conn, executionContext);
        this.reqRespFactory = Objects.requireNonNull(reqRespFactory);
        this.maxConcurrencySetting = Publisher.from(new IgnoreConsumedEvent<Integer>(maxPipelinedRequests)).concat(this.connection.onClosing()).concat(Single.succeeded(ZERO_MAX_CONCURRENCY_EVENT)).publishOn(executionContext.executionStrategy().isEventOffloaded() ? executionContext.executor() : Executors.immediate(), IoThreadFactory.IoThread::currentThreadIsIoThread);
        this.headersFactory = headersFactory;
        this.allowDropTrailersReadFromTransport = allowDropTrailersReadFromTransport;
    }

    @Override
    public final HttpConnectionContext connectionContext() {
        return this.connectionContext;
    }

    @Override
    public final <T> Publisher<? extends T> transportEventStream(HttpEventKey<T> eventKey) {
        return eventKey == HttpEventKey.MAX_CONCURRENCY ? this.maxConcurrencySetting : Publisher.failed(new IllegalArgumentException("Unknown key: " + eventKey));
    }

    @Override
    public final Single<StreamingHttpResponse> invokeClient(Publisher<Object> flattenedRequest, @Nullable FlushStrategy flushStrategy) {
        return this.writeAndRead(flattenedRequest, flushStrategy).liftSyncToSingle(new SpliceFlatStreamToMetaSingle(this::newSplicedResponse));
    }

    @Override
    public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
        return Single.defer(() -> {
            Publisher<Object> flatRequest;
            if (HeaderUtils.canAddRequestContentLength(request)) {
                flatRequest = HeaderUtils.setRequestContentLength(this.connectionContext().protocol(), request);
            } else {
                if (HeaderUtils.emptyMessageBody(request, request.messageBody())) {
                    flatRequest = HeaderUtils.flatEmptyMessage(this.connectionContext().protocol(), request, request.messageBody());
                } else {
                    flatRequest = Single.succeeded(request).concat(request.messageBody(), true);
                    if (HeaderUtils.shouldAppendTrailers(this.connectionContext().protocol(), request)) {
                        flatRequest = flatRequest.scanWith(HeaderUtils::appendTrailersMapper);
                    }
                }
                HeaderUtils.addRequestTransferEncodingIfNecessary(request);
            }
            HttpExecutionStrategy strategy = AbstractStreamingHttpConnection.requestExecutionStrategy(request, this.executionContext().executionStrategy());
            if (strategy.isSendOffloaded()) {
                flatRequest = flatRequest.subscribeOn(this.connectionContext.executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread);
            }
            Single<StreamingHttpResponse> resp = this.invokeClient(flatRequest, AbstractStreamingHttpConnection.determineFlushStrategyForApi(request));
            if (strategy.isMetadataReceiveOffloaded()) {
                resp = resp.publishOn(this.connectionContext.executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread);
            }
            if (strategy.isDataReceiveOffloaded()) {
                resp = resp.map(response -> response.transformMessageBody(payload -> payload.publishOn(this.connectionContext.executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread)));
            }
            return resp.shareContextOnSubscribe();
        });
    }

    static HttpExecutionStrategy requestExecutionStrategy(HttpRequestMetaData metaData, HttpExecutionStrategy fallback) {
        HttpExecutionStrategy strategy = metaData.context().get(HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY);
        return strategy != null ? strategy : fallback;
    }

    @Nullable
    private static FlushStrategy determineFlushStrategyForApi(HttpRequestMetaData request) {
        return AbstractStreamingHttpConnection.isSafeToAggregateOrEmpty(request) && !HeaderUtils.REQ_EXPECT_CONTINUE.test(request) ? FlushStrategies.flushOnEnd() : null;
    }

    static boolean isSafeToAggregateOrEmpty(HttpMetaData request) {
        return HttpApiConversions.isPayloadEmpty(request) || HttpApiConversions.isSafeToAggregate(request);
    }

    @Override
    public final HttpExecutionContext executionContext() {
        return this.connectionContext.executionContext();
    }

    protected abstract Publisher<Object> writeAndRead(Publisher<Object> var1, @Nullable FlushStrategy var2);

    private StreamingHttpResponse newSplicedResponse(HttpResponseMetaData meta, Publisher<Object> pub) {
        return StreamingHttpResponses.newTransportResponse(meta.status(), meta.version(), meta.headers(), this.connectionContext.executionContext().bufferAllocator(), pub, this.allowDropTrailersReadFromTransport, this.headersFactory);
    }

    @Override
    public final StreamingHttpRequest newRequest(HttpRequestMethod method, String requestTarget) {
        return this.reqRespFactory.newRequest(method, requestTarget);
    }

    @Override
    public final StreamingHttpResponseFactory httpResponseFactory() {
        return this.reqRespFactory;
    }

    @Override
    public final Completable onClose() {
        return this.connectionContext.onClose();
    }

    @Override
    public final Completable closeAsync() {
        return this.connectionContext.closeAsync();
    }

    @Override
    public final Completable closeAsyncGracefully() {
        return this.connectionContext.closeAsyncGracefully();
    }

    public String toString() {
        return this.getClass().getName() + '(' + this.connectionContext + ')';
    }
}

