/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.servicetalk.http.netty.HttpLifecycleObserverServiceFilter;
import io.servicetalk.http.netty.NoopHttpLifecycleObserver;
import io.servicetalk.transport.api.ConnectionInfo;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class HttpMessageDiscardWatchdogServiceFilter
implements StreamingHttpServiceFilterFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogServiceFilter.class);
    static final StreamingHttpServiceFilterFactory INSTANCE = new HttpMessageDiscardWatchdogServiceFilter();
    static final StreamingHttpServiceFilterFactory CLEANER = new HttpLifecycleObserverServiceFilter(new CleanerHttpLifecycleObserver());
    private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key.newKey(HttpMessageDiscardWatchdogServiceFilter.class.getName() + ".messagePublisher", HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference());

    private HttpMessageDiscardWatchdogServiceFilter() {
    }

    @Override
    public StreamingHttpServiceFilter create(StreamingHttpService service) {
        return new StreamingHttpServiceFilter(service){

            @Override
            public Single<StreamingHttpResponse> handle(HttpServiceContext ctx, StreamingHttpRequest request, StreamingHttpResponseFactory responseFactory) {
                return this.delegate().handle(ctx, request, responseFactory).map(response -> {
                    AtomicReference reference = request.context().computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference());
                    assert (reference != null);
                    if (reference.getAndSet(response.messageBody()) != null) {
                        LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Responses (or their message body) must be fully consumed before retrying.");
                    }
                    return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
                        reference.set(null);
                        return NoopSubscriber.INSTANCE;
                    }));
                });
            }
        };
    }

    @Override
    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    static <T> Class<T> generifyAtomicReference() {
        return AtomicReference.class;
    }

    private static final class CleanerHttpLifecycleObserver
    implements HttpLifecycleObserver {
        private CleanerHttpLifecycleObserver() {
        }

        @Override
        public HttpLifecycleObserver.HttpExchangeObserver onNewExchange() {
            return new HttpLifecycleObserver.HttpExchangeObserver(){
                @Nullable
                private ContextMap requestContext;

                @Override
                public HttpLifecycleObserver.HttpRequestObserver onRequest(HttpRequestMetaData requestMetaData) {
                    this.requestContext = requestMetaData.context();
                    return NoopHttpLifecycleObserver.NoopHttpRequestObserver.INSTANCE;
                }

                @Override
                public HttpLifecycleObserver.HttpResponseObserver onResponse(HttpResponseMetaData responseMetaData) {
                    return NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE;
                }

                @Override
                public void onExchangeFinally() {
                    AtomicReference maybePublisher;
                    if (this.requestContext != null && (maybePublisher = (AtomicReference)this.requestContext.get(MESSAGE_PUBLISHER_KEY)) != null && maybePublisher.get() != null) {
                        LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Responses (or their message body) must be fully consumed before discarding.");
                    }
                }

                @Override
                public void onConnectionSelected(ConnectionInfo info) {
                }

                @Override
                public void onResponseError(Throwable cause) {
                }

                @Override
                public void onResponseCancel() {
                }
            };
        }
    }

    static final class NoopSubscriber
    implements PublisherSource.Subscriber<Object> {
        static final NoopSubscriber INSTANCE = new NoopSubscriber();

        private NoopSubscriber() {
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription subscription) {
        }

        @Override
        public void onNext(@Nullable Object o) {
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    }
}

