/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.FilterableStreamingHttpClient;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequester;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.HttpMessageDiscardWatchdogServiceFilter;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class HttpMessageDiscardWatchdogClientFilter
implements StreamingHttpConnectionFilterFactory {
    private static final ContextMap.Key<AtomicReference<Publisher<?>>> MESSAGE_PUBLISHER_KEY = ContextMap.Key.newKey(HttpMessageDiscardWatchdogClientFilter.class.getName() + ".messagePublisher", HttpMessageDiscardWatchdogServiceFilter.generifyAtomicReference());
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpMessageDiscardWatchdogClientFilter.class);
    static final HttpMessageDiscardWatchdogClientFilter INSTANCE = new HttpMessageDiscardWatchdogClientFilter();
    static final StreamingHttpClientFilterFactory CLIENT_CLEANER = new CleanerStreamingHttpClientFilterFactory();

    private HttpMessageDiscardWatchdogClientFilter() {
    }

    @Override
    public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
        return new StreamingHttpConnectionFilter(connection){

            @Override
            public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
                return this.delegate().request(request).map(response -> {
                    AtomicReference reference = request.context().computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference());
                    assert (reference != null);
                    if (reference.getAndSet(response.messageBody()) != null) {
                        LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Response payload (message) body must be fully consumed before retrying.");
                    }
                    return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
                        reference.set(null);
                        return HttpMessageDiscardWatchdogServiceFilter.NoopSubscriber.INSTANCE;
                    }));
                });
            }
        };
    }

    @Override
    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    private static final class CleanerStreamingHttpClientFilterFactory
    implements StreamingHttpClientFilterFactory {
        private CleanerStreamingHttpClientFilterFactory() {
        }

        @Override
        public StreamingHttpClientFilter create(FilterableStreamingHttpClient client) {
            return new StreamingHttpClientFilter(client){

                @Override
                protected Single<StreamingHttpResponse> request(StreamingHttpRequester delegate, StreamingHttpRequest request) {
                    return delegate.request(request).onErrorResume(cause -> {
                        AtomicReference maybePublisher = (AtomicReference)request.context().get(MESSAGE_PUBLISHER_KEY);
                        if (maybePublisher != null && maybePublisher.getAndSet(null) != null) {
                            LOGGER.warn("Discovered un-drained HTTP response message body which has been dropped by user code - this is a strong indication of a bug in a user-defined filter. Response payload (message) body must be fully consumed before discarding.");
                        }
                        return Single.failed(cause).shareContextOnSubscribe();
                    });
                }
            };
        }

        @Override
        public HttpExecutionStrategy requiredOffloads() {
            return HttpExecutionStrategies.offloadNone();
        }
    }
}

