/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.http.api.BlockingStreamingHttpRequest;
import io.servicetalk.http.api.BlockingStreamingHttpServerResponse;
import io.servicetalk.http.api.BlockingStreamingHttpService;
import io.servicetalk.http.api.BlockingUtils;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.utils.internal.PlatformDependent;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class StreamingHttpServiceToBlockingStreamingHttpService
implements BlockingStreamingHttpService {
    private final StreamingHttpService original;

    StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService original) {
        this.original = Objects.requireNonNull(original);
    }

    @Override
    public void handle(HttpServiceContext ctx, BlockingStreamingHttpRequest request, BlockingStreamingHttpServerResponse svcResponse) throws Exception {
        BlockingUtils.futureGetCancelOnInterrupt(this.handleBlockingRequest(ctx, request, svcResponse).toFuture());
    }

    @Nonnull
    private Completable handleBlockingRequest(HttpServiceContext ctx, BlockingStreamingHttpRequest request, BlockingStreamingHttpServerResponse svcResponse) {
        return this.original.handle(ctx, request.toStreamingRequest(), ctx.streamingResponseFactory()).flatMapCompletable(streamingHttpResponse -> {
            this.copyMeta((StreamingHttpResponse)streamingHttpResponse, svcResponse);
            return new PayloadBodyAndTrailersToPayloadWriter(streamingHttpResponse.payloadBodyAndTrailers(), svcResponse.sendMetaData());
        });
    }

    private void copyMeta(StreamingHttpResponse streamingResponse, BlockingStreamingHttpServerResponse svcResponse) {
        svcResponse.setHeaders(streamingResponse.headers());
        svcResponse.status(streamingResponse.status());
        svcResponse.version(streamingResponse.version());
    }

    @Override
    public void close() throws Exception {
        this.original.closeAsync().toFuture().get();
    }

    public void closeGracefully() throws Exception {
        this.original.closeAsyncGracefully().toFuture().get();
    }

    private static class PayloadBodyAndTrailersToPayloadWriter
    extends SubscribableCompletable {
        private final Publisher<Object> payloadBodyAndTrailers;
        private final HttpPayloadWriter<Buffer> payloadWriter;

        PayloadBodyAndTrailersToPayloadWriter(Publisher<Object> payloadBodyAndTrailers, HttpPayloadWriter<Buffer> payloadWriter) {
            this.payloadBodyAndTrailers = payloadBodyAndTrailers;
            this.payloadWriter = payloadWriter;
        }

        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            SourceAdapters.toSource(this.payloadBodyAndTrailers).subscribe((PublisherSource.Subscriber)new PayloadPump(subscriber, this.payloadWriter));
        }

        private static final class PayloadPump
        implements PublisherSource.Subscriber<Object> {
            private static final Logger LOGGER = LoggerFactory.getLogger(PayloadPump.class);
            private static final AtomicIntegerFieldUpdater<PayloadPump> terminatedUpdater = AtomicIntegerFieldUpdater.newUpdater(PayloadPump.class, "terminated");
            private final CompletableSource.Subscriber subscriber;
            private final HttpPayloadWriter<Buffer> payloadWriter;
            private volatile int terminated;
            @Nullable
            private ConcurrentSubscription subscription;

            PayloadPump(CompletableSource.Subscriber subscriber, HttpPayloadWriter<Buffer> payloadWriter) {
                this.subscriber = subscriber;
                this.payloadWriter = payloadWriter;
            }

            public void onSubscribe(PublisherSource.Subscription inSubscription) {
                this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)inSubscription);
                this.subscriber.onSubscribe((Cancellable)this.subscription);
                this.subscription.request(Long.MAX_VALUE);
            }

            public void onNext(@Nullable Object bufferOrTrailers) {
                assert (bufferOrTrailers != null);
                try {
                    if (bufferOrTrailers instanceof Buffer) {
                        this.payloadWriter.write((Buffer)bufferOrTrailers);
                        return;
                    }
                    if (bufferOrTrailers instanceof HttpHeaders) {
                        this.payloadWriter.setTrailers((HttpHeaders)bufferOrTrailers);
                        return;
                    }
                    assert (false) : "Expected only buffer or trailer in payloadBodyAndTrailers()";
                }
                catch (IOException e) {
                    try {
                        if (this.tryTerminate()) {
                            this.subscriber.onError((Throwable)e);
                        } else {
                            PlatformDependent.throwException((Throwable)e);
                        }
                    }
                    finally {
                        assert (this.subscription != null);
                        this.subscription.cancel();
                    }
                }
            }

            public void onError(Throwable t) {
                if (this.tryTerminate()) {
                    this.subscriber.onError(t);
                } else {
                    LOGGER.error("Failed to deliver onError() after termination", t);
                }
            }

            public void onComplete() {
                try {
                    this.payloadWriter.close();
                }
                catch (IOException e) {
                    if (this.tryTerminate()) {
                        this.subscriber.onError((Throwable)e);
                    }
                    LOGGER.warn("Failed to deliver IOException from payloadWriter.close() after termination", (Throwable)e);
                }
                if (this.tryTerminate()) {
                    this.subscriber.onComplete();
                }
            }

            boolean tryTerminate() {
                return terminatedUpdater.compareAndSet(this, 0, 1);
            }
        }
    }
}

