/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.traffic.resilience.http;

import io.servicetalk.capacity.limiter.api.CapacityLimiter;
import io.servicetalk.capacity.limiter.api.Classification;
import io.servicetalk.capacity.limiter.api.RequestDroppedException;
import io.servicetalk.circuit.breaker.api.CircuitBreaker;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SingleOperator;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.traffic.resilience.http.ClientPeerRejectionPolicy;
import io.servicetalk.traffic.resilience.http.TrafficResiliencyObserver;
import io.servicetalk.transport.api.ServerListenContext;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractTrafficResilienceHttpFilter
implements HttpExecutionStrategyInfluencer {
    private static final RequestDroppedException CAPACITY_REJECTION = (RequestDroppedException)ThrowableUtils.unknownStackTrace((Throwable)new RequestDroppedException("Service under heavy load", null, false, true), AbstractTrafficResilienceHttpFilter.class, (String)"remoteRejection");
    private static final RequestDroppedException BREAKER_REJECTION = (RequestDroppedException)ThrowableUtils.unknownStackTrace((Throwable)new RequestDroppedException("Service Unavailable", null, false, true), AbstractTrafficResilienceHttpFilter.class, (String)"breakerRejection");
    protected static final Single<StreamingHttpResponse> DEFAULT_CAPACITY_REJECTION = Single.failed((Throwable)CAPACITY_REJECTION);
    protected static final Single<StreamingHttpResponse> DEFAULT_BREAKER_REJECTION = Single.failed((Throwable)BREAKER_REJECTION);
    private final Supplier<Function<HttpRequestMetaData, CapacityLimiter>> capacityPartitionsSupplier;
    private final Consumer<CapacityLimiter.Ticket> onSuccessTicketTerminal;
    private final Consumer<CapacityLimiter.Ticket> onCancellationTicketTerminal;
    private final BiConsumer<CapacityLimiter.Ticket, Throwable> onErrorTicketTerminal;
    private final boolean rejectWhenNotMatchedCapacityPartition;
    private final Supplier<Function<HttpRequestMetaData, Classification>> classifier;
    private final Predicate<HttpResponseMetaData> capacityRejectionPredicate;
    private final Predicate<HttpResponseMetaData> breakerRejectionPredicate;
    private final Supplier<Function<HttpRequestMetaData, CircuitBreaker>> circuitBreakerPartitionsSupplier;
    private final TrafficResiliencyObserver observer;

    AbstractTrafficResilienceHttpFilter(Supplier<Function<HttpRequestMetaData, CapacityLimiter>> capacityPartitionsSupplier, boolean rejectWhenNotMatchedCapacityPartition, Supplier<Function<HttpRequestMetaData, Classification>> classifier, Predicate<HttpResponseMetaData> capacityRejectionPredicate, Predicate<HttpResponseMetaData> breakerRejectionPredicate, Consumer<CapacityLimiter.Ticket> onSuccessTicketTerminal, Consumer<CapacityLimiter.Ticket> onCancellationTicketTerminal, BiConsumer<CapacityLimiter.Ticket, Throwable> onErrorTicketTerminal, Supplier<Function<HttpRequestMetaData, CircuitBreaker>> circuitBreakerPartitionsSupplier, TrafficResiliencyObserver observer) {
        this.capacityPartitionsSupplier = Objects.requireNonNull(capacityPartitionsSupplier, "capacityPartitionsSupplier");
        this.rejectWhenNotMatchedCapacityPartition = rejectWhenNotMatchedCapacityPartition;
        this.capacityRejectionPredicate = Objects.requireNonNull(capacityRejectionPredicate, "capacityRejectionPredicate");
        this.breakerRejectionPredicate = Objects.requireNonNull(breakerRejectionPredicate, "breakerRejectionPredicate");
        this.classifier = Objects.requireNonNull(classifier, "classifier");
        this.onSuccessTicketTerminal = Objects.requireNonNull(onSuccessTicketTerminal, "onSuccessTicketTerminal");
        this.onCancellationTicketTerminal = Objects.requireNonNull(onCancellationTicketTerminal, "onCancellationTicketTerminal");
        this.onErrorTicketTerminal = Objects.requireNonNull(onErrorTicketTerminal, "onErrorTicketTerminal");
        this.circuitBreakerPartitionsSupplier = Objects.requireNonNull(circuitBreakerPartitionsSupplier, "circuitBreakerPartitionsSupplier");
        this.observer = Objects.requireNonNull(observer, "observer");
    }

    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    final Function<HttpRequestMetaData, CapacityLimiter> newCapacityPartitions() {
        return this.capacityPartitionsSupplier.get();
    }

    final Function<HttpRequestMetaData, CircuitBreaker> newCircuitBreakerPartitions() {
        return this.circuitBreakerPartitionsSupplier.get();
    }

    final Function<HttpRequestMetaData, Classification> newClassifier() {
        return this.classifier.get();
    }

    Function<HttpResponseMetaData, Duration> newDelayProvider() {
        return __ -> Duration.ZERO;
    }

    Single<StreamingHttpResponse> applyCapacityControl(Function<HttpRequestMetaData, CapacityLimiter> capacityPartitions, Function<HttpRequestMetaData, CircuitBreaker> circuitBreakerPartitions, Function<HttpRequestMetaData, Classification> classifier, Function<HttpResponseMetaData, Duration> delayProvider, @Nullable ServerListenContext serverListenContext, StreamingHttpRequest request, @Nullable StreamingHttpResponseFactory responseFactory, Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate) {
        return Single.defer(() -> {
            long startTime = System.nanoTime();
            CapacityLimiter partition = (CapacityLimiter)capacityPartitions.apply((HttpRequestMetaData)request);
            if (partition == null) {
                this.observer.onRejectedUnmatchedPartition(request);
                return this.rejectWhenNotMatchedCapacityPartition ? this.handleLocalCapacityRejection(null, request, responseFactory).shareContextOnSubscribe() : AbstractTrafficResilienceHttpFilter.handlePassthrough(delegate, request).shareContextOnSubscribe();
            }
            ContextMap meta = request.context();
            Classification classification = (Classification)classifier.apply((HttpRequestMetaData)request);
            CapacityLimiter.Ticket ticket = partition.tryAcquire(classification, meta);
            if (ticket != null) {
                ticket = new TrackingDelegatingTicket(ticket, request.hashCode());
            }
            if (ticket == null) {
                this.observer.onRejectedLimit(request, partition.name(), meta, classification);
                return this.handleLocalCapacityRejection(serverListenContext, request, responseFactory).shareContextOnSubscribe();
            }
            CircuitBreaker breaker = (CircuitBreaker)circuitBreakerPartitions.apply((HttpRequestMetaData)request);
            if (breaker != null && !breaker.tryAcquirePermit()) {
                this.observer.onRejectedOpenCircuit(request, breaker.name(), meta, classification);
                ticket.ignored();
                return this.handleLocalBreakerRejection(request, responseFactory, breaker).shareContextOnSubscribe();
            }
            try {
                TrafficResiliencyObserver.TicketObserver ticketObserver = this.observer.onAllowedThrough(request, ticket.state());
                return this.handleAllow(delegate, delayProvider, request, this.wrapTicket(serverListenContext, ticket), ticketObserver, breaker, startTime).shareContextOnSubscribe();
            }
            catch (Throwable cause) {
                this.onError(cause, breaker, startTime, ticket);
                throw cause;
            }
        });
    }

    CapacityLimiter.Ticket wrapTicket(@Nullable ServerListenContext serverListenContext, CapacityLimiter.Ticket ticket) {
        return ticket;
    }

    abstract Single<StreamingHttpResponse> handleLocalCapacityRejection(@Nullable ServerListenContext var1, StreamingHttpRequest var2, @Nullable StreamingHttpResponseFactory var3);

    abstract Single<StreamingHttpResponse> handleLocalBreakerRejection(StreamingHttpRequest var1, @Nullable StreamingHttpResponseFactory var2, CircuitBreaker var3);

    RuntimeException peerRejection(StreamingHttpResponse resp) {
        return CAPACITY_REJECTION;
    }

    RuntimeException peerBreakerRejection(HttpResponseMetaData resp, CircuitBreaker breaker, Function<HttpResponseMetaData, Duration> delayProvider) {
        return BREAKER_REJECTION;
    }

    private static Single<StreamingHttpResponse> handlePassthrough(Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate, StreamingHttpRequest request) {
        return delegate.apply(request);
    }

    private Single<StreamingHttpResponse> handleAllow(Function<StreamingHttpRequest, Single<StreamingHttpResponse>> delegate, Function<HttpResponseMetaData, Duration> delayProvider, StreamingHttpRequest request, final CapacityLimiter.Ticket ticket, final TrafficResiliencyObserver.TicketObserver ticketObserver, final @Nullable CircuitBreaker breaker, final long startTimeNs) {
        return delegate.apply(request).flatMap(resp -> {
            if (breaker != null && this.breakerRejectionPredicate.test((HttpResponseMetaData)resp)) {
                return resp.payloadBody().ignoreElements().concat(Single.failed((Throwable)this.peerBreakerRejection((HttpResponseMetaData)resp, breaker, delayProvider))).shareContextOnSubscribe();
            }
            if (this.capacityRejectionPredicate.test((HttpResponseMetaData)resp)) {
                RuntimeException rejection = this.peerRejection((StreamingHttpResponse)resp);
                if (ClientPeerRejectionPolicy.PassthroughRequestDroppedException.class.equals(rejection.getClass())) {
                    return Single.failed((Throwable)rejection).shareContextOnSubscribe();
                }
                return resp.payloadBody().ignoreElements().concat(Single.failed((Throwable)rejection)).shareContextOnSubscribe();
            }
            return Single.succeeded((Object)resp).shareContextOnSubscribe();
        }).liftSync((SingleOperator)new BeforeFinallyHttpOperator(new TerminalSignalConsumer(){

            public void onComplete() {
                try {
                    if (breaker != null) {
                        breaker.onSuccess(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
                    }
                }
                finally {
                    AbstractTrafficResilienceHttpFilter.this.onSuccessTicketTerminal.accept(ticket);
                    ticketObserver.onComplete();
                }
            }

            public void onError(Throwable throwable) {
                AbstractTrafficResilienceHttpFilter.this.onError(throwable, breaker, startTimeNs, ticket);
                ticketObserver.onError(throwable);
            }

            public void cancel() {
                try {
                    if (breaker != null) {
                        breaker.ignorePermit();
                    }
                }
                finally {
                    AbstractTrafficResilienceHttpFilter.this.onCancellationTicketTerminal.accept(ticket);
                    ticketObserver.onCancel();
                }
            }
        }, true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onError(Throwable throwable, @Nullable CircuitBreaker breaker, long startTimeNs, CapacityLimiter.Ticket ticket) {
        try {
            if (breaker != null && !CAPACITY_REJECTION.equals(throwable)) {
                breaker.onError(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS, throwable);
            }
        }
        finally {
            this.onErrorTicketTerminal.accept(ticket, throwable);
        }
    }

    static final class TrackingDelegatingTicket
    implements CapacityLimiter.Ticket {
        private static final Logger LOGGER = LoggerFactory.getLogger(TrackingDelegatingTicket.class);
        private static final int NOT_SIGNALED = 0;
        private static final int SIGNAL_COMPLETED = 1;
        private static final int SIGNAL_DROPPED = 2;
        private static final int SIGNAL_FAILED = 4;
        private static final int SIGNAL_IGNORED = 8;
        private static final AtomicIntegerFieldUpdater<TrackingDelegatingTicket> signaledUpdater = AtomicIntegerFieldUpdater.newUpdater(TrackingDelegatingTicket.class, "signaled");
        private final CapacityLimiter.Ticket delegate;
        private final int requestHashCode;
        private volatile int signaled;

        TrackingDelegatingTicket(CapacityLimiter.Ticket delegate, int requestHashCode) {
            this.delegate = delegate;
            this.requestHashCode = requestHashCode;
        }

        public CapacityLimiter.LimiterState state() {
            return this.delegate.state();
        }

        public int completed() {
            this.signal(1);
            return this.delegate.completed();
        }

        public int dropped() {
            this.signal(2);
            return this.delegate.dropped();
        }

        public int failed(Throwable error) {
            this.signal(4);
            return this.delegate.failed(error);
        }

        public int ignored() {
            this.signal(8);
            return this.delegate.ignored();
        }

        private void signal(int newSignal) {
            int oldValue;
            while (!signaledUpdater.compareAndSet(this, oldValue = this.signaled, oldValue | newSignal)) {
            }
            if (oldValue > 0) {
                LOGGER.warn("{} signaled completion more than once. Already signaled with {}, new signal {}.", new Object[]{this.getClass().getSimpleName(), oldValue, newSignal});
            }
        }

        public String toString() {
            return "TrackingDelegatingTicket{delegate=" + this.delegate + ", requestHashCode=" + this.requestHashCode + ", signaled=" + this.signaled + '}';
        }
    }
}

