/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.BlockingUtils;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.http.api.BlockingStreamingHttpRequest;
import io.servicetalk.http.api.BlockingStreamingHttpServerResponse;
import io.servicetalk.http.api.BlockingStreamingHttpService;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.utils.internal.NumberUtils;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

final class StreamingHttpServiceToBlockingStreamingHttpService
implements BlockingStreamingHttpService {
    private final StreamingHttpService original;
    private final int demandBatchSize;

    StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService original) {
        this(original, 64);
    }

    StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService original, int demandBatchSize) {
        this.original = Objects.requireNonNull(original);
        this.demandBatchSize = NumberUtils.ensurePositive((int)demandBatchSize, (String)"demandBatchSize");
    }

    @Override
    public void handle(HttpServiceContext ctx, BlockingStreamingHttpRequest request, BlockingStreamingHttpServerResponse svcResponse) throws Exception {
        BlockingUtils.futureGetCancelOnInterrupt((Future)this.handleBlockingRequest(ctx, request, svcResponse).toFuture());
    }

    private Completable handleBlockingRequest(HttpServiceContext ctx, BlockingStreamingHttpRequest request, BlockingStreamingHttpServerResponse svcResponse) {
        return this.original.handle(ctx, request.toStreamingRequest(), ctx.streamingResponseFactory()).flatMapCompletable(streamingHttpResponse -> {
            this.copyMeta((StreamingHttpResponse)streamingHttpResponse, svcResponse);
            return new MessageBodyToPayloadWriter(streamingHttpResponse.messageBody(), svcResponse.sendMetaData(), this.demandBatchSize);
        });
    }

    private void copyMeta(StreamingHttpResponse streamingResponse, BlockingStreamingHttpServerResponse svcResponse) {
        svcResponse.setHeaders(streamingResponse.headers());
        svcResponse.status(streamingResponse.status());
        svcResponse.version(streamingResponse.version());
    }

    @Override
    public void close() throws Exception {
        this.original.closeAsync().toFuture().get();
    }

    public void closeGracefully() throws Exception {
        this.original.closeAsyncGracefully().toFuture().get();
    }

    private static final class MessageBodyToPayloadWriter
    extends SubscribableCompletable {
        private final Publisher<Object> messageBody;
        private final HttpPayloadWriter<Buffer> payloadWriter;
        private final int demandBatchSize;

        MessageBodyToPayloadWriter(Publisher<Object> messageBody, HttpPayloadWriter<Buffer> payloadWriter, int demandBatchSize) {
            this.messageBody = messageBody;
            this.payloadWriter = payloadWriter;
            this.demandBatchSize = demandBatchSize;
        }

        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            SourceAdapters.toSource(this.messageBody).subscribe((PublisherSource.Subscriber)new PayloadPump(subscriber, this.payloadWriter, this.demandBatchSize));
        }

        private static final class PayloadPump
        implements PublisherSource.Subscriber<Object> {
            private final CompletableSource.Subscriber subscriber;
            private final HttpPayloadWriter<Buffer> payloadWriter;
            @Nullable
            private PublisherSource.Subscription subscription;
            private final int demandBatchSize;
            private int itemsToNextRequest;

            PayloadPump(CompletableSource.Subscriber subscriber, HttpPayloadWriter<Buffer> payloadWriter, int demandBatchSize) {
                this.subscriber = subscriber;
                this.payloadWriter = payloadWriter;
                this.demandBatchSize = demandBatchSize;
            }

            public void onSubscribe(PublisherSource.Subscription inSubscription) {
                this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)inSubscription);
                this.subscriber.onSubscribe((Cancellable)this.subscription);
                this.itemsToNextRequest = this.demandBatchSize;
                this.subscription.request((long)this.itemsToNextRequest);
            }

            public void onNext(@Nullable Object bufferOrTrailers) {
                if (bufferOrTrailers instanceof Buffer) {
                    try {
                        this.payloadWriter.write((Buffer)bufferOrTrailers);
                    }
                    catch (IOException e) {
                        ThrowableUtils.throwException((Throwable)e);
                    }
                } else if (bufferOrTrailers instanceof HttpHeaders) {
                    this.payloadWriter.setTrailers((HttpHeaders)bufferOrTrailers);
                } else {
                    throw new IllegalArgumentException("unsupported type: " + bufferOrTrailers);
                }
                this.requestMoreIfRequired();
            }

            public void onError(Throwable t) {
                try {
                    this.payloadWriter.close(t);
                }
                catch (Throwable cause) {
                    this.subscriber.onError(cause);
                    return;
                }
                this.subscriber.onError(t);
            }

            public void onComplete() {
                try {
                    this.payloadWriter.close();
                }
                catch (Throwable cause) {
                    this.subscriber.onError(cause);
                    return;
                }
                this.subscriber.onComplete();
            }

            private void requestMoreIfRequired() {
                if (--this.itemsToNextRequest == this.demandBatchSize >>> 1) {
                    int toRequest = this.demandBatchSize - this.itemsToNextRequest;
                    this.itemsToNextRequest = this.demandBatchSize;
                    assert (this.subscription != null);
                    this.subscription.request((long)toRequest);
                }
            }
        }
    }
}

