/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;
import io.servicetalk.http.api.AbstractServiceAdapterHolder;
import io.servicetalk.http.api.BlockingStreamingHttpService;
import io.servicetalk.http.api.BlockingUtils;
import io.servicetalk.http.api.DefaultBlockingStreamingHttpServerResponse;
import io.servicetalk.http.api.HeaderUtils;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponses;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class BlockingStreamingToStreamingService
extends AbstractServiceAdapterHolder {
    private static final HttpExecutionStrategy DEFAULT_STRATEGY = HttpExecutionStrategies.OFFLOAD_RECEIVE_META_STRATEGY;
    private static final Logger LOGGER = LoggerFactory.getLogger(BlockingStreamingToStreamingService.class);
    private final BlockingStreamingHttpService original;

    BlockingStreamingToStreamingService(BlockingStreamingHttpService original, HttpExecutionStrategyInfluencer influencer) {
        super(BlockingStreamingToStreamingService.serviceInvocationStrategy(influencer));
        this.original = Objects.requireNonNull(original);
    }

    private static HttpExecutionStrategy serviceInvocationStrategy(HttpExecutionStrategyInfluencer influencer) {
        HttpExecutionStrategy httpExecutionStrategy = influencer.influenceStrategy(DEFAULT_STRATEGY);
        assert (httpExecutionStrategy.isMetadataReceiveOffloaded()) : "This will deadlock!";
        return httpExecutionStrategy;
    }

    @Override
    public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx, final StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
        return new Single<StreamingHttpResponse>(){

            protected void handleSubscribe(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
                final ThreadInterruptingCancellable tiCancellable = new ThreadInterruptingCancellable(Thread.currentThread());
                try {
                    subscriber.onSubscribe((Cancellable)tiCancellable);
                }
                catch (Throwable cause) {
                    SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, (Throwable)cause);
                    return;
                }
                CompletableSource.Processor payloadProcessor = Processors.newCompletableProcessor();
                DefaultBlockingStreamingHttpServerResponse response = null;
                BufferHttpPayloadWriter payloadWriterOuter = null;
                try {
                    BufferHttpPayloadWriter payloadWriter = payloadWriterOuter = new BufferHttpPayloadWriter(ctx.headersFactory().newTrailers(), (CompletableSource.Subscriber)payloadProcessor);
                    Consumer<HttpResponseMetaData> sendMeta = metaData -> {
                        StreamingHttpResponse result;
                        try {
                            HttpHeaders headers = metaData.headers();
                            boolean addTrailers = HeaderUtils.isTransferEncodingChunked(headers);
                            if (!(addTrailers || HttpProtocolVersion.HTTP_1_0.equals(metaData.version()) || HeaderUtils.hasContentLength(headers) || request.method() == HttpRequestMethod.HEAD)) {
                                headers.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                addTrailers = true;
                            }
                            Publisher payload = SourceAdapters.fromSource((CompletableSource)payloadProcessor).merge(payloadWriter.connect().map(buffer -> buffer));
                            if (addTrailers) {
                                payload = payload.concat(1.succeeded((Object)payloadWriter.trailers()));
                            }
                            payload = payload.beforeSubscription(() -> new PublisherSource.Subscription(){

                                public void request(long n) {
                                }

                                public void cancel() {
                                    tiCancellable.cancel();
                                }
                            });
                            result = StreamingHttpResponses.newTransportResponse(metaData.status(), metaData.version(), metaData.headers(), ctx.executionContext().bufferAllocator(), (Publisher<Object>)payload, ctx.headersFactory());
                        }
                        catch (Throwable t) {
                            subscriber.onError(t);
                            throw t;
                        }
                        subscriber.onSuccess((Object)result);
                    };
                    response = new DefaultBlockingStreamingHttpServerResponse(HttpResponseStatus.OK, request.version(), ctx.headersFactory().newHeaders(), payloadWriter, ctx.executionContext().bufferAllocator(), sendMeta);
                    BlockingStreamingToStreamingService.this.original.handle(ctx, request.toBlockingStreamingRequest(), response);
                }
                catch (Throwable cause) {
                    tiCancellable.setDone(cause);
                    if (response == null || response.markMetaSent()) {
                        SubscriberUtils.safeOnError(subscriber, (Throwable)cause);
                    } else if (payloadWriterOuter.markSubscriberComplete()) {
                        payloadProcessor.onError(cause);
                    } else {
                        LOGGER.error("An exception occurred after the response was sent", cause);
                    }
                    return;
                }
                tiCancellable.setDone();
            }
        };
    }

    @Override
    public Completable closeAsync() {
        return BlockingUtils.blockingToCompletable(this.original::close);
    }

    private static final class BufferHttpPayloadWriter
    implements HttpPayloadWriter<Buffer> {
        private static final AtomicIntegerFieldUpdater<BufferHttpPayloadWriter> subscriberCompleteUpdater = AtomicIntegerFieldUpdater.newUpdater(BufferHttpPayloadWriter.class, "subscriberComplete");
        private volatile int subscriberComplete;
        private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter();
        private final HttpHeaders trailers;
        private final CompletableSource.Subscriber subscriber;

        BufferHttpPayloadWriter(HttpHeaders trailers, CompletableSource.Subscriber subscriber) {
            this.trailers = trailers;
            this.subscriber = subscriber;
        }

        public void write(Buffer object) throws IOException {
            this.payloadWriter.write((Object)object);
        }

        public void flush() throws IOException {
            this.payloadWriter.flush();
        }

        public void close() throws IOException {
            try {
                this.payloadWriter.close();
            }
            finally {
                if (this.markSubscriberComplete()) {
                    this.subscriber.onComplete();
                }
            }
        }

        @Override
        public HttpHeaders trailers() {
            return this.trailers;
        }

        Publisher<Buffer> connect() {
            return this.payloadWriter.connect();
        }

        boolean markSubscriberComplete() {
            return subscriberCompleteUpdater.compareAndSet(this, 0, 1);
        }
    }
}

