/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.RetryStrategies;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.Exceptions;
import io.servicetalk.loadbalancer.HealthCheckConfig;
import io.servicetalk.loadbalancer.NormalizedTimeSourceExecutor;
import io.servicetalk.loadbalancer.TestableLoadBalancer;
import io.servicetalk.utils.internal.RandomUtils;
import java.lang.invoke.LambdaMetafactory;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements TestableLoadBalancer<ResolvedAddress, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);
    private static final Object[] EMPTY_ARRAY = new Object[0];
    private static final AtomicReferenceFieldUpdater<RoundRobinLoadBalancer, List> usedHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, List.class, "usedHosts");
    private static final AtomicIntegerFieldUpdater<RoundRobinLoadBalancer> indexUpdater = AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "index");
    private static final AtomicLongFieldUpdater<RoundRobinLoadBalancer> nextResubscribeTimeUpdater = AtomicLongFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "nextResubscribeTime");
    private static final long RESUBSCRIBING = -1L;
    private static final int MIN_RANDOM_SEARCH_SPACE = 64;
    private static final float RANDOM_SEARCH_FACTOR = 0.75f;
    private volatile long nextResubscribeTime = -1L;
    private volatile int index;
    private volatile List<Host<ResolvedAddress, C>> usedHosts = Collections.emptyList();
    @Nullable
    private volatile EventSubscriber currentSubscriber;
    private final String id;
    private final String targetResource;
    private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
    private final PublisherSource.Processor<Object, Object> eventStreamProcessor = Processors.newPublisherProcessorDropHeadOnOverflow(32);
    private final Publisher<Object> eventStream;
    private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
    private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
    private final int linearSearchSpace;
    @Nullable
    private final HealthCheckConfig healthCheckConfig;
    private final ListenableAsyncCloseable asyncCloseable;

    RoundRobinLoadBalancer(String id, String targetResourceName, Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher, ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory, int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
        this.id = id + '@' + Integer.toHexString(System.identityHashCode(this));
        this.targetResource = Objects.requireNonNull(targetResourceName);
        this.eventPublisher = Objects.requireNonNull(eventPublisher);
        this.eventStream = SourceAdapters.fromSource(this.eventStreamProcessor).replay(1);
        this.connectionFactory = Objects.requireNonNull(connectionFactory);
        this.linearSearchSpace = linearSearchSpace;
        this.healthCheckConfig = healthCheckConfig;
        this.asyncCloseable = AsyncCloseables.toAsyncCloseable(graceful -> {
            List<Host<ResolvedAddress, C>> currentList;
            this.discoveryCancellable.cancel();
            this.eventStreamProcessor.onComplete();
            while (!RoundRobinLoadBalancer.isClosedList(currentList = this.usedHosts) && !usedHostsUpdater.compareAndSet(this, currentList, new ClosedList(currentList))) {
            }
            CompositeCloseable compositeCloseable = AsyncCloseables.newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory);
            LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.", this, graceful ? "" : "non", currentList.size(), currentList);
            return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()).beforeOnError(t -> {
                if (!graceful) {
                    this.usedHosts = new ClosedList<Host<ResolvedAddress, C>>(Collections.emptyList());
                }
            }).beforeOnComplete(() -> {
                this.usedHosts = new ClosedList<Host<ResolvedAddress, C>>(Collections.emptyList());
            });
        });
        this.eventStream.ignoreElements().subscribe();
        this.subscribeToEvents(false);
    }

    private void subscribeToEvents(boolean resubscribe) {
        EventSubscriber eventSubscriber;
        assert (this.nextResubscribeTime == -1L);
        if (resubscribe) {
            assert (this.healthCheckConfig != null) : "Resubscribe can happen only when health-checking is configured";
            LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", (Object)this);
            this.discoveryCancellable.cancelCurrent();
        }
        this.currentSubscriber = eventSubscriber = new EventSubscriber(resubscribe);
        SourceAdapters.toSource(this.eventPublisher).subscribe(eventSubscriber);
        if (this.healthCheckConfig != null) {
            assert (this.healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor);
            this.nextResubscribeTime = RoundRobinLoadBalancer.nextResubscribeTime(this.healthCheckConfig, this);
        }
    }

    private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(HealthCheckConfig config, RoundRobinLoadBalancer<R, C> lb) {
        long lower = config.healthCheckResubscribeLowerBound;
        long upper = config.healthCheckResubscribeUpperBound;
        long currentTime = config.executor.currentTime(TimeUnit.NANOSECONDS);
        long result = currentTime + RandomUtils.nextLongInclusive(lower, upper);
        LOGGER.debug("{}: current time {}, next resubscribe attempt can be performed at {}.", lb, currentTime, result);
        return result;
    }

    private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(List<Host<ResolvedAddress, C>> usedHosts) {
        boolean allUnhealthy = !usedHosts.isEmpty();
        for (Host<ResolvedAddress, C> host : usedHosts) {
            if (Host.isUnhealthy(((Host)host).connState)) continue;
            allUnhealthy = false;
            break;
        }
        return allUnhealthy;
    }

    private static <ResolvedAddress, C extends LoadBalancedConnection> boolean contains(Host<ResolvedAddress, C> host, Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
        for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
            if (!host.address.equals(event.address())) continue;
            return true;
        }
        return false;
    }

    private static <T> Single<T> failedLBClosed(String targetResource) {
        return Single.failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
    }

    @Override
    public Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context) {
        return Single.defer(() -> this.selectConnection0(selector, context, false).shareContextOnSubscribe());
    }

    @Override
    public Single<C> newConnection(@Nullable ContextMap context) {
        return Single.defer(() -> this.selectConnection0(c -> true, context, true).shareContextOnSubscribe());
    }

    @Override
    public Publisher<Object> eventStream() {
        return this.eventStream;
    }

    public String toString() {
        return "RoundRobinLoadBalancer{id=" + this.id + ", targetResource=" + this.targetResource + '}';
    }

    private Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context, boolean forceNewConnectionAndReserve) {
        List<Host<ResolvedAddress, C>> usedHosts = this.usedHosts;
        if (usedHosts.isEmpty()) {
            return RoundRobinLoadBalancer.isClosedList(usedHosts) ? RoundRobinLoadBalancer.failedLBClosed(this.targetResource) : Single.failed(Exceptions.StacklessNoAvailableHostException.newInstance("No hosts are available to connect for " + this.targetResource + ".", RoundRobinLoadBalancer.class, "selectConnection0(...)"));
        }
        int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        Host<ResolvedAddress, C> pickedHost = null;
        for (int i = 0; i < usedHosts.size(); ++i) {
            int localCursor = (cursor + i) % usedHosts.size();
            Host<ResolvedAddress, C> host = usedHosts.get(localCursor);
            assert (host != null) : "Host can't be null.";
            if (!forceNewConnectionAndReserve) {
                Object[] connections = ((Host)host).connState.connections;
                int linearAttempts = Math.min(connections.length, this.linearSearchSpace);
                for (int j = 0; j < linearAttempts; ++j) {
                    LoadBalancedConnection connection = (LoadBalancedConnection)connections[j];
                    if (!selector.test(connection)) continue;
                    return Single.succeeded(connection);
                }
                if (connections.length > linearAttempts) {
                    int diff = connections.length - linearAttempts;
                    int randomAttempts = diff < 64 ? diff : (int)((float)diff * 0.75f);
                    for (int j = 0; j < randomAttempts; ++j) {
                        LoadBalancedConnection connection = (LoadBalancedConnection)connections[rnd.nextInt(linearAttempts, connections.length)];
                        if (!selector.test(connection)) continue;
                        return Single.succeeded(connection);
                    }
                }
            }
            if (!host.isActiveAndHealthy()) continue;
            pickedHost = host;
            break;
        }
        if (pickedHost == null) {
            long currNextResubscribeTime;
            if (this.healthCheckConfig != null && RoundRobinLoadBalancer.allUnhealthy(usedHosts) && (currNextResubscribeTime = this.nextResubscribeTime) >= 0L && this.healthCheckConfig.executor.currentTime(TimeUnit.NANOSECONDS) >= currNextResubscribeTime && nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, -1L)) {
                this.subscribeToEvents(true);
            }
            return Single.failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " + this.targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts, RoundRobinLoadBalancer.class, "selectConnection0(...)"));
        }
        Host<ResolvedAddress, C> host = pickedHost;
        Single<C> establishConnection = this.connectionFactory.newConnection(host.address, context, null);
        if (((Host)host).healthCheckConfig != null) {
            establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy((Throwable)t, this.connectionFactory));
        }
        return establishConnection.flatMap(newCnx -> {
            if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
                return newCnx.closeAsync().concat(Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.targetResource + " could not be reserved.", RoundRobinLoadBalancer.class, "selectConnection0(...)"))).shareContextOnSubscribe();
            }
            if (!selector.test(newCnx)) {
                Single failedSingle = Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.targetResource + " was rejected by the selection filter.", RoundRobinLoadBalancer.class, "selectConnection0(...)"));
                return (host.addConnection(newCnx, null) ? failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
            }
            if (host.addConnection(newCnx, null)) {
                return Single.succeeded(newCnx).shareContextOnSubscribe();
            }
            return newCnx.closeAsync().concat(RoundRobinLoadBalancer.isClosedList(this.usedHosts) ? RoundRobinLoadBalancer.failedLBClosed(this.targetResource) : Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Failed to add newly created connection " + newCnx + " for " + this.targetResource + " for " + host, RoundRobinLoadBalancer.class, "selectConnection0(...)"))).shareContextOnSubscribe();
        });
    }

    @Override
    public Completable onClose() {
        return this.asyncCloseable.onClose();
    }

    @Override
    public Completable onClosing() {
        return this.asyncCloseable.onClosing();
    }

    @Override
    public Completable closeAsync() {
        return this.asyncCloseable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.asyncCloseable.closeAsyncGracefully();
    }

    @Override
    public List<Map.Entry<ResolvedAddress, List<C>>> usedAddresses() {
        return this.usedHosts.stream().map(Host::asEntry).collect(Collectors.toList());
    }

    private static boolean isClosedList(List<?> list) {
        return list.getClass().equals(ClosedList.class);
    }

    private static final class ClosedList<T>
    implements List<T> {
        private final List<T> delegate;

        private ClosedList(List<T> delegate) {
            this.delegate = Objects.requireNonNull(delegate);
        }

        @Override
        public int size() {
            return this.delegate.size();
        }

        @Override
        public boolean isEmpty() {
            return this.delegate.isEmpty();
        }

        @Override
        public boolean contains(Object o) {
            return this.delegate.contains(o);
        }

        @Override
        public Iterator<T> iterator() {
            return this.delegate.iterator();
        }

        @Override
        public void forEach(Consumer<? super T> action) {
            this.delegate.forEach(action);
        }

        @Override
        public Object[] toArray() {
            return this.delegate.toArray();
        }

        @Override
        public <T1> T1[] toArray(T1[] a) {
            return this.delegate.toArray(a);
        }

        @Override
        public boolean add(T t) {
            return this.delegate.add(t);
        }

        @Override
        public boolean remove(Object o) {
            return this.delegate.remove(o);
        }

        @Override
        public boolean containsAll(Collection<?> c) {
            return this.delegate.containsAll(c);
        }

        @Override
        public boolean addAll(Collection<? extends T> c) {
            return this.delegate.addAll(c);
        }

        @Override
        public boolean addAll(int index, Collection<? extends T> c) {
            return this.delegate.addAll(c);
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            return this.delegate.removeAll(c);
        }

        @Override
        public boolean removeIf(Predicate<? super T> filter) {
            return this.delegate.removeIf(filter);
        }

        @Override
        public boolean retainAll(Collection<?> c) {
            return this.delegate.retainAll(c);
        }

        @Override
        public void replaceAll(UnaryOperator<T> operator) {
            this.delegate.replaceAll(operator);
        }

        @Override
        public void sort(Comparator<? super T> c) {
            this.delegate.sort(c);
        }

        @Override
        public void clear() {
            this.delegate.clear();
        }

        @Override
        public T get(int index) {
            return this.delegate.get(index);
        }

        @Override
        public T set(int index, T element) {
            return this.delegate.set(index, element);
        }

        @Override
        public void add(int index, T element) {
            this.delegate.add(index, element);
        }

        @Override
        public T remove(int index) {
            return this.delegate.remove(index);
        }

        @Override
        public int indexOf(Object o) {
            return this.delegate.indexOf(o);
        }

        @Override
        public int lastIndexOf(Object o) {
            return this.delegate.lastIndexOf(o);
        }

        @Override
        public ListIterator<T> listIterator() {
            return this.delegate.listIterator();
        }

        @Override
        public ListIterator<T> listIterator(int index) {
            return this.delegate.listIterator(index);
        }

        @Override
        public List<T> subList(int fromIndex, int toIndex) {
            return new ClosedList<T>(this.delegate.subList(fromIndex, toIndex));
        }

        @Override
        public Spliterator<T> spliterator() {
            return this.delegate.spliterator();
        }

        @Override
        public Stream<T> stream() {
            return this.delegate.stream();
        }

        @Override
        public Stream<T> parallelStream() {
            return this.delegate.parallelStream();
        }
    }

    private static final class Host<Addr, C extends LoadBalancedConnection>
    implements ListenableAsyncCloseable {
        private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState();
        private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(RoundRobinLoadBalancer.access$1200(), STATE_ACTIVE_NO_FAILURES);
        private static final ConnState CLOSED_CONN_STATE = new ConnState(RoundRobinLoadBalancer.access$1200(), (Object)State.CLOSED);
        private static final AtomicReferenceFieldUpdater<Host, ConnState> connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState");
        private final String lbDescription;
        final Addr address;
        @Nullable
        private final HealthCheckConfig healthCheckConfig;
        private final ListenableAsyncCloseable closeable;
        private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;

        Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) {
            this.lbDescription = lbDescription;
            this.address = address;
            this.healthCheckConfig = healthCheckConfig;
            this.closeable = AsyncCloseables.toAsyncCloseable(graceful -> graceful ? this.doClose(AsyncCloseable::closeAsyncGracefully) : this.doClose(AsyncCloseable::closeAsync));
        }

        boolean markActiveIfNotClosed() {
            Object oldState = Host.connStateUpdater.getAndUpdate((Host)this, (UnaryOperator<ConnState>)(UnaryOperator)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$markActiveIfNotClosed$1(io.servicetalk.loadbalancer.RoundRobinLoadBalancer$Host$ConnState ), (Lio/servicetalk/loadbalancer/RoundRobinLoadBalancer$Host$ConnState;)Lio/servicetalk/loadbalancer/RoundRobinLoadBalancer$Host$ConnState;)()).state;
            return oldState != State.CLOSED;
        }

        void markClosed() {
            ConnState oldState = this.closeConnState();
            Object[] toRemove = oldState.connections;
            this.cancelIfHealthCheck(oldState);
            LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.", this.lbDescription, toRemove.length, this.address);
            for (Object conn : toRemove) {
                LoadBalancedConnection cConn = (LoadBalancedConnection)conn;
                cConn.closeAsyncGracefully().subscribe();
            }
        }

        private ConnState closeConnState() {
            ConnState oldState;
            do {
                oldState = this.connState;
            } while (oldState.state != State.CLOSED && !connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, (Object)State.CLOSED)));
            return oldState;
        }

        void markExpired() {
            block2: {
                State nextState;
                ConnState oldState;
                do {
                    oldState = connStateUpdater.get(this);
                    if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) break block2;
                } while (!connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, (Object)(nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED))));
                this.cancelIfHealthCheck(oldState);
                if (nextState == State.CLOSED) {
                    this.closeAsync().subscribe();
                }
            }
        }

        void markHealthy(HealthCheck<Addr, C> originalHealthCheckState) {
            ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> {
                if (Host.isUnhealthy(previous)) {
                    return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES);
                }
                return previous;
            });
            if (oldState.state != originalHealthCheckState) {
                this.cancelIfHealthCheck(oldState);
            }
        }

        void markUnhealthy(Throwable cause, ConnectionFactory<Addr, ? extends C> connectionFactory) {
            block4: {
                HealthCheck healthCheck;
                assert (this.healthCheckConfig != null);
                while (true) {
                    ConnState previous;
                    if (!Host.isActive(previous = connStateUpdater.get(this)) || previous.connections.length > 0 || cause instanceof ConnectionLimitReachedException) {
                        LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", this.lbDescription, this.address, previous, cause);
                        break block4;
                    }
                    ActiveState previousState = (ActiveState)previous.state;
                    if (previousState.failedConnections + 1 < this.healthCheckConfig.failedThreshold) {
                        ActiveState nextState = previousState.forNextFailedConnection();
                        if (!connStateUpdater.compareAndSet(this, previous, new ConnState(previous.connections, nextState))) continue;
                        LOGGER.debug("{}: failed to open a new connection to the host on address {} {} time(s) ({} consecutive failures will trigger health-checking).", this.lbDescription, this.address, nextState.failedConnections, this.healthCheckConfig.failedThreshold, cause);
                        break block4;
                    }
                    healthCheck = new HealthCheck(connectionFactory, this, cause);
                    ConnState nextState = new ConnState(previous.connections, healthCheck);
                    if (connStateUpdater.compareAndSet(this, previous, nextState)) break;
                }
                LOGGER.info("{}: failed to open a new connection to the host on address {} {} time(s) in a row. Error counting threshold reached, marking this host as UNHEALTHY for the selection algorithm and triggering background health-checking.", this.lbDescription, this.address, this.healthCheckConfig.failedThreshold, cause);
                healthCheck.schedule(cause);
            }
        }

        boolean isActiveAndHealthy() {
            return Host.isActive(this.connState);
        }

        static boolean isActive(ConnState connState) {
            return ActiveState.class.equals(connState.state.getClass());
        }

        static boolean isUnhealthy(ConnState connState) {
            return HealthCheck.class.equals(connState.state.getClass());
        }

        boolean addConnection(C connection, @Nullable HealthCheck<Addr, C> currentHealthCheck) {
            ActiveState newState;
            Object[] newList;
            ConnState previous;
            int addAttempt = 0;
            do {
                Object[] existing;
                previous = connStateUpdater.get(this);
                if (previous.state == State.CLOSED) {
                    return false;
                }
                ++addAttempt;
                for (Object o : existing = previous.connections) {
                    if (!o.equals(connection)) continue;
                    return true;
                }
                newList = Arrays.copyOf(existing, existing.length + 1);
                newList[existing.length] = connection;
            } while (!connStateUpdater.compareAndSet(this, previous, new ConnState(newList, newState = Host.isActive(previous) || Host.isUnhealthy(previous) ? STATE_ACTIVE_NO_FAILURES : previous.state)));
            if (Host.isUnhealthy(previous) && (currentHealthCheck == null || previous.state != currentHealthCheck)) {
                assert (newState == STATE_ACTIVE_NO_FAILURES);
                this.cancelIfHealthCheck(previous);
            }
            LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", this.lbDescription, connection, this, addAttempt);
            connection.onClose().beforeFinally(() -> {
                int removeAttempt = 0;
                while (true) {
                    int i;
                    ConnState currentConnState = this.connState;
                    if (currentConnState.state == State.CLOSED) break;
                    assert (currentConnState.connections.length > 0);
                    ++removeAttempt;
                    Object[] connections = currentConnState.connections;
                    for (i = 0; i < connections.length && !connections[i].equals(connection); ++i) {
                    }
                    if (i == connections.length) break;
                    if (connections.length == 1) {
                        assert (!Host.isUnhealthy(currentConnState)) : "Cannot be UNHEALTHY with #connections > 0";
                        if (Host.isActive(currentConnState)) {
                            if (!connStateUpdater.compareAndSet(this, currentConnState, new ConnState(EMPTY_ARRAY, currentConnState.state))) continue;
                        } else {
                            if (currentConnState.state != State.EXPIRED || !connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) continue;
                            this.closeAsync().subscribe();
                        }
                        break;
                    }
                    Object[] newList = new Object[connections.length - 1];
                    System.arraycopy(connections, 0, newList, 0, i);
                    System.arraycopy(connections, i + 1, newList, i, newList.length - i);
                    if (connStateUpdater.compareAndSet(this, currentConnState, new ConnState(newList, currentConnState.state))) break;
                }
                LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", this.lbDescription, connection, this, removeAttempt);
            }).onErrorComplete(t -> {
                LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", this.lbDescription, connection, t);
                return true;
            }).subscribe();
            return true;
        }

        Map.Entry<Addr, List<C>> asEntry() {
            return new AbstractMap.SimpleImmutableEntry(this.address, Stream.of(this.connState.connections).map(conn -> (LoadBalancedConnection)conn).collect(Collectors.toList()));
        }

        @Override
        public Completable closeAsync() {
            return this.closeable.closeAsync();
        }

        @Override
        public Completable closeAsyncGracefully() {
            return this.closeable.closeAsyncGracefully();
        }

        @Override
        public Completable onClose() {
            return this.closeable.onClose();
        }

        @Override
        public Completable onClosing() {
            return this.closeable.onClosing();
        }

        private Completable doClose(Function<? super C, Completable> closeFunction) {
            return Completable.defer(() -> {
                ConnState oldState = this.closeConnState();
                this.cancelIfHealthCheck(oldState);
                Object[] connections = oldState.connections;
                return (connections.length == 0 ? Completable.completed() : Publisher.from(connections).flatMapCompletableDelayError(conn -> (Completable)closeFunction.apply((Object)((LoadBalancedConnection)conn)))).shareContextOnSubscribe();
            });
        }

        private void cancelIfHealthCheck(ConnState connState) {
            if (Host.isUnhealthy(connState)) {
                HealthCheck healthCheck = (HealthCheck)connState.state;
                LOGGER.debug("{}: health check cancelled for {}.", (Object)this.lbDescription, (Object)healthCheck.host);
                healthCheck.cancel();
            }
        }

        public String toString() {
            ConnState connState = this.connState;
            return "Host{lbDescription=" + this.lbDescription + ", address=" + this.address + ", state=" + connState.state + ", #connections=" + connState.connections.length + '}';
        }

        private static /* synthetic */ ConnState lambda$markActiveIfNotClosed$1(ConnState oldConnState) {
            if (oldConnState.state == State.EXPIRED) {
                return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES);
            }
            return oldConnState;
        }

        private static final class ConnState {
            final Object[] connections;
            final Object state;

            ConnState(Object[] connections, Object state) {
                this.connections = connections;
                this.state = state;
            }

            public String toString() {
                return "ConnState{state=" + this.state + ", #connections=" + this.connections.length + '}';
            }
        }

        private static final class HealthCheck<ResolvedAddress, C extends LoadBalancedConnection>
        extends DelayedCancellable {
            private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
            private final Host<ResolvedAddress, C> host;
            private final Throwable lastError;

            private HealthCheck(ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory, Host<ResolvedAddress, C> host, Throwable lastError) {
                this.connectionFactory = connectionFactory;
                this.host = host;
                this.lastError = lastError;
            }

            public void schedule(Throwable originalCause) {
                assert (((Host)this.host).healthCheckConfig != null);
                this.delayedCancellable(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> true, ((Host)this.host).healthCheckConfig.healthCheckInterval, ((Host)this.host).healthCheckConfig.jitter, ((Host)this.host).healthCheckConfig.executor).apply(0, originalCause).beforeOnSubscribe(__ -> AsyncContext.clear()).concat(this.connectionFactory.newConnection(this.host.address, null, null).retryWhen(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> {
                    LOGGER.debug("{}: health check failed for {}.", ((Host)this.host).lbDescription, this.host, cause);
                    return true;
                }, ((Host)this.host).healthCheckConfig.healthCheckInterval, ((Host)this.host).healthCheckConfig.jitter, ((Host)this.host).healthCheckConfig.executor))).flatMapCompletable(newCnx -> {
                    if (this.host.addConnection((LoadBalancedConnection)newCnx, this)) {
                        LOGGER.info("{}: health check passed for {}, marked this host as ACTIVE for the selection algorithm.", (Object)((Host)this.host).lbDescription, (Object)this.host);
                        return Completable.completed();
                    }
                    assert (((Host)this.host).connState.state == State.CLOSED);
                    LOGGER.debug("{}: health check passed for {}, but the host rejected a new connection {}. Closing it now.", ((Host)this.host).lbDescription, this.host, newCnx);
                    return newCnx.closeAsync();
                }).onErrorComplete(t -> {
                    LOGGER.error("{}: health check terminated with an unexpected error for {}. Marking this host as ACTIVE as a fallback to allow connection attempts.", ((Host)this.host).lbDescription, this.host, t);
                    this.host.markHealthy(this);
                    return true;
                }).subscribe());
            }

            public String toString() {
                return "UNHEALTHY(" + this.lastError + ')';
            }
        }

        private static final class ActiveState {
            private final int failedConnections;

            ActiveState() {
                this(0);
            }

            private ActiveState(int failedConnections) {
                this.failedConnections = failedConnections;
            }

            ActiveState forNextFailedConnection() {
                return new ActiveState(FlowControlUtils.addWithOverflowProtection(this.failedConnections, 1));
            }

            public String toString() {
                return "ACTIVE(failedConnections=" + this.failedConnections + ')';
            }
        }

        private static enum State {
            EXPIRED,
            CLOSED;

        }
    }

    private final class EventSubscriber
    implements PublisherSource.Subscriber<Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> {
        private boolean firstEventsAfterResubscribe;

        EventSubscriber(boolean resubscribe) {
            this.firstEventsAfterResubscribe = resubscribe;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            s.request(Long.MAX_VALUE);
            RoundRobinLoadBalancer.this.discoveryCancellable.nextCancellable(s);
        }

        @Override
        public void onNext(@Nullable Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
            if (events == null) {
                LOGGER.debug("{}: unexpectedly received null instead of events.", (Object)RoundRobinLoadBalancer.this);
                return;
            }
            if (RoundRobinLoadBalancer.this.currentSubscriber != this) {
                LOGGER.debug("{}: received new events after cancelling previous subscription, discarding: {}", (Object)RoundRobinLoadBalancer.this, (Object)events);
                return;
            }
            for (ServiceDiscovererEvent event : events) {
                ServiceDiscovererEvent.Status eventStatus = event.status();
                LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.", RoundRobinLoadBalancer.this, event, eventStatus);
                List usedAddresses = (List)usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, oldHosts -> {
                    if (RoundRobinLoadBalancer.isClosedList(oldHosts)) {
                        return oldHosts;
                    }
                    Object addr = Objects.requireNonNull(event.address());
                    List oldHostsTyped = oldHosts;
                    if (ServiceDiscovererEvent.Status.AVAILABLE.equals(eventStatus)) {
                        return this.addHostToList(oldHostsTyped, addr);
                    }
                    if (ServiceDiscovererEvent.Status.EXPIRED.equals(eventStatus)) {
                        if (oldHostsTyped.isEmpty()) {
                            return Collections.emptyList();
                        }
                        return this.markHostAsExpired(oldHostsTyped, addr);
                    }
                    if (ServiceDiscovererEvent.Status.UNAVAILABLE.equals(eventStatus)) {
                        return this.listWithHostRemoved(oldHostsTyped, host -> {
                            boolean match = host.address.equals(addr);
                            if (match) {
                                host.markClosed();
                            }
                            return match;
                        });
                    }
                    LOGGER.error("{}: Unexpected Status in event: {} (mapped to {}). Leaving usedHosts unchanged: {}", RoundRobinLoadBalancer.this, event, eventStatus, oldHosts);
                    return oldHosts;
                });
                LOGGER.debug("{}: now using addresses (size={}): {}.", RoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses);
                if (ServiceDiscovererEvent.Status.AVAILABLE.equals(eventStatus)) {
                    if (usedAddresses.size() != 1) continue;
                    RoundRobinLoadBalancer.this.eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT);
                    continue;
                }
                if (!usedAddresses.isEmpty()) continue;
                RoundRobinLoadBalancer.this.eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT);
            }
            if (this.firstEventsAfterResubscribe) {
                if (events.isEmpty()) {
                    return;
                }
                this.firstEventsAfterResubscribe = false;
                List currentHosts = RoundRobinLoadBalancer.this.usedHosts;
                for (Host host : currentHosts) {
                    if (RoundRobinLoadBalancer.contains(host, events)) continue;
                    host.closeAsyncGracefully().subscribe();
                }
            }
        }

        private List<Host<ResolvedAddress, C>> markHostAsExpired(List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
            for (Host host : oldHostsTyped) {
                if (!host.address.equals(addr)) continue;
                host.markExpired();
                break;
            }
            return oldHostsTyped;
        }

        private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
            Host host = new Host(RoundRobinLoadBalancer.this.toString(), addr, RoundRobinLoadBalancer.this.healthCheckConfig);
            host.onClose().afterFinally(() -> {
                List cfr_ignored_0 = (List)usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> {
                    List previousHostsTyped = previousHosts;
                    return this.listWithHostRemoved(previousHostsTyped, current -> current == host);
                });
            }).subscribe();
            return host;
        }

        private List<Host<ResolvedAddress, C>> addHostToList(List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
            if (oldHostsTyped.isEmpty()) {
                return Collections.singletonList(this.createHost(addr));
            }
            for (Host host : oldHostsTyped) {
                if (!host.address.equals(addr)) continue;
                if (!host.markActiveIfNotClosed()) break;
                return oldHostsTyped;
            }
            ArrayList newHosts = new ArrayList(oldHostsTyped.size() + 1);
            newHosts.addAll(oldHostsTyped);
            newHosts.add(this.createHost(addr));
            return newHosts;
        }

        private List<Host<ResolvedAddress, C>> listWithHostRemoved(List<Host<ResolvedAddress, C>> oldHostsTyped, Predicate<Host<ResolvedAddress, C>> hostPredicate) {
            if (oldHostsTyped.isEmpty()) {
                return oldHostsTyped;
            }
            ArrayList newHosts = new ArrayList(oldHostsTyped.size() - 1);
            for (int i = 0; i < oldHostsTyped.size(); ++i) {
                Host current = oldHostsTyped.get(i);
                if (hostPredicate.test(current)) {
                    for (int x = i + 1; x < oldHostsTyped.size(); ++x) {
                        newHosts.add(oldHostsTyped.get(x));
                    }
                    return newHosts.isEmpty() ? Collections.emptyList() : newHosts;
                }
                newHosts.add(current);
            }
            return newHosts;
        }

        @Override
        public void onError(Throwable t) {
            List hosts = RoundRobinLoadBalancer.this.usedHosts;
            if (RoundRobinLoadBalancer.this.healthCheckConfig == null) {
                RoundRobinLoadBalancer.this.eventStreamProcessor.onError(t);
            }
            LOGGER.error("{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.", RoundRobinLoadBalancer.this, RoundRobinLoadBalancer.this.eventPublisher, hosts.size(), hosts, t);
        }

        @Override
        public void onComplete() {
            List hosts = RoundRobinLoadBalancer.this.usedHosts;
            if (RoundRobinLoadBalancer.this.healthCheckConfig == null) {
                RoundRobinLoadBalancer.this.eventStreamProcessor.onComplete();
            }
            LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.", RoundRobinLoadBalancer.this, hosts.size(), hosts);
        }
    }
}

