/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.servicetalk.client.api.DelegatingServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.api.BiIntFunction;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.RetryStrategies;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ExecutionStrategy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RetryingServiceDiscoverer<U, R, E extends ServiceDiscovererEvent<R>>
extends DelegatingServiceDiscoverer<U, R, E> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RetryingServiceDiscoverer.class);
    private static final Duration SD_RETRY_STRATEGY_INIT_DURATION = Duration.ofSeconds(2L);
    private static final Duration SD_RETRY_STRATEGY_MAX_DELAY = Duration.ofSeconds(128L);
    private final String targetResource;
    private final BiIntFunction<Throwable, ? extends Completable> retryStrategy;
    private final UnaryOperator<E> makeUnavailable;

    RetryingServiceDiscoverer(String targetResource, ServiceDiscoverer<U, R, E> delegate, @Nullable BiIntFunction<Throwable, ? extends Completable> retryStrategy, ExecutionContext<? extends ExecutionStrategy> executionContext, UnaryOperator<E> makeUnavailable) {
        super(delegate);
        this.targetResource = targetResource;
        if (retryStrategy == null) {
            retryStrategy = RetryStrategies.retryWithExponentialBackoffFullJitter(__ -> true, SD_RETRY_STRATEGY_INIT_DURATION, SD_RETRY_STRATEGY_MAX_DELAY, executionContext.executor());
        }
        this.retryStrategy = retryStrategy;
        this.makeUnavailable = makeUnavailable;
    }

    @Override
    public Publisher<Collection<E>> discover(U address) {
        return Publisher.defer(() -> {
            ServiceDiscovererEventsCache eventsCache = new ServiceDiscovererEventsCache(this.targetResource, this.makeUnavailable);
            return this.delegate().discover(address).map(eventsCache::consumeAndFilter).beforeOnError(eventsCache::errorSeen).retryWhen(false, this.retryStrategy);
        });
    }

    private static final class ServiceDiscovererEventsCache<R, E extends ServiceDiscovererEvent<R>> {
        private static final Map NONE_RETAINED = Collections.emptyMap();
        private final String targetResource;
        private final UnaryOperator<E> makeUnavailable;
        private final Map<R, E> currentState = new HashMap<R, E>();
        private Map<R, E> retainedState = ServiceDiscovererEventsCache.noneRetained();

        private ServiceDiscovererEventsCache(String targetResource, UnaryOperator<E> makeUnavailable) {
            this.targetResource = targetResource;
            this.makeUnavailable = makeUnavailable;
        }

        void errorSeen(Throwable t) {
            if (this.retainedState == NONE_RETAINED) {
                this.retainedState = new HashMap<R, E>(this.currentState);
                this.currentState.clear();
            }
            LOGGER.debug("{} observed an error from ServiceDiscoverer", (Object)this.targetResource, (Object)t);
        }

        Collection<E> consumeAndFilter(Collection<E> events) {
            if (this.retainedState == NONE_RETAINED) {
                for (ServiceDiscovererEvent event : events) {
                    if (ServiceDiscovererEvent.Status.UNAVAILABLE.equals(event.status())) {
                        this.currentState.remove(event.address());
                        continue;
                    }
                    this.currentState.put(event.address(), event);
                }
                return events;
            }
            assert (this.currentState.isEmpty());
            ArrayList<ServiceDiscovererEvent<Object>> toReturn = new ArrayList<ServiceDiscovererEvent<Object>>(events.size() + this.retainedState.size());
            int unavailableCounter = 0;
            for (ServiceDiscovererEvent event : events) {
                Object address = event.address();
                toReturn.add(event);
                this.retainedState.remove(address);
                if (!ServiceDiscovererEvent.Status.UNAVAILABLE.equals(event.status())) {
                    this.currentState.put(address, event);
                    continue;
                }
                ++unavailableCounter;
            }
            if (unavailableCounter > 0) {
                LOGGER.warn("{} received {} UNAVAILABLE events but expected a new 'state of the world'. This is an indicator of a buggy ServiceDiscoverer implementation that doesn't honor the API contract.", (Object)this.targetResource, (Object)unavailableCounter);
            }
            for (ServiceDiscovererEvent event : this.retainedState.values()) {
                assert (event.status() != ServiceDiscovererEvent.Status.UNAVAILABLE);
                toReturn.add((ServiceDiscovererEvent<Object>)this.makeUnavailable.apply(event));
            }
            this.retainedState = ServiceDiscovererEventsCache.noneRetained();
            return toReturn;
        }

        private static <R, E extends ServiceDiscovererEvent<R>> Map<R, E> noneRetained() {
            return NONE_RETAINED;
        }
    }
}

