/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.api;

import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.ExecutionContextOverridingServiceContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import javax.annotation.Nullable;

public class StreamingHttpServiceToOffloadedStreamingHttpService
implements StreamingHttpService {
    private final StreamingHttpService delegate;
    @Nullable
    private final io.servicetalk.concurrent.api.Executor executor;
    private final BooleanSupplier shouldOffload;
    private final HttpExecutionStrategy strategy;

    StreamingHttpServiceToOffloadedStreamingHttpService(HttpExecutionStrategy strategy, @Nullable io.servicetalk.concurrent.api.Executor executor, BooleanSupplier shouldOffload, StreamingHttpService delegate) {
        this.strategy = strategy;
        this.executor = executor;
        this.shouldOffload = shouldOffload;
        this.delegate = delegate;
    }

    @Override
    public Single<StreamingHttpResponse> handle(HttpServiceContext ctx, StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
        Single resp;
        HttpExecutionStrategy additionalOffloads = ctx.executionContext().executionStrategy().missing(this.strategy);
        io.servicetalk.concurrent.api.Executor useExecutor = null != this.executor ? this.executor : ctx.executionContext().executor();
        ExecutionContextOverridingServiceContext wrappedCtx = new ExecutionContextOverridingServiceContext(ctx, this.strategy, useExecutor);
        if (!additionalOffloads.isRequestResponseOffloaded()) {
            return this.delegate.handle(wrappedCtx, request, responseFactory);
        }
        if (additionalOffloads.isDataReceiveOffloaded()) {
            request = request.transformMessageBody(p -> p.publishOn((Executor)useExecutor, this.shouldOffload));
        }
        if (additionalOffloads.isMetadataReceiveOffloaded() && this.shouldOffload.getAsBoolean()) {
            StreamingHttpRequest finalRequest = request;
            resp = additionalOffloads.isSendOffloaded() ? Single.defer(() -> {
                if (this.shouldOffload.getAsBoolean()) {
                    return this.offloadHandle(useExecutor, wrappedCtx, finalRequest, responseFactory).shareContextOnSubscribe();
                }
                return this.delegate.handle(wrappedCtx, finalRequest, responseFactory).shareContextOnSubscribe();
            }) : this.offloadHandle(useExecutor, wrappedCtx, request, responseFactory);
        } else {
            resp = this.delegate.handle(wrappedCtx, request, responseFactory);
        }
        return additionalOffloads.isSendOffloaded() ? resp.map(r -> r.transformMessageBody(p -> p.subscribeOn((Executor)useExecutor, this.shouldOffload))).subscribeOn((Executor)useExecutor, this.shouldOffload) : resp;
    }

    private Single<StreamingHttpResponse> offloadHandle(io.servicetalk.concurrent.api.Executor executor, HttpServiceContext ctx, StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
        return executor.submit(() -> this.delegate.handle(ctx, request, responseFactory).shareContextOnSubscribe()).flatMap(Function.identity());
    }

    @Override
    public Completable closeAsync() {
        return this.delegate.closeAsync();
    }

    public Completable closeAsyncGracefully() {
        return this.delegate.closeAsyncGracefully();
    }

    public static StreamingHttpService offloadService(final HttpExecutionStrategy strategy, final @Nullable io.servicetalk.concurrent.api.Executor executor, BooleanSupplier shouldOffload, final StreamingHttpService service) {
        return strategy.isRequestResponseOffloaded() ? new StreamingHttpServiceToOffloadedStreamingHttpService(strategy, executor, shouldOffload, service) : new StreamingHttpService(){

            @Override
            public Single<StreamingHttpResponse> handle(HttpServiceContext ctx, StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
                io.servicetalk.concurrent.api.Executor useExecutor = null != executor ? executor : ctx.executionContext().executor();
                ExecutionContextOverridingServiceContext wrappedCtx = new ExecutionContextOverridingServiceContext(ctx, strategy, useExecutor);
                return service.handle(wrappedCtx, request, responseFactory);
            }

            @Override
            public Completable closeAsync() {
                return service.closeAsync();
            }

            public Completable closeAsyncGracefully() {
                return service.closeAsyncGracefully();
            }
        };
    }
}

