/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.RetryStrategies;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.Exceptions;
import io.servicetalk.loadbalancer.HealthCheckConfig;
import io.servicetalk.loadbalancer.Host;
import io.servicetalk.loadbalancer.RoundRobinLoadBalancer;
import java.lang.invoke.LambdaMetafactory;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DefaultHost<Addr, C extends LoadBalancedConnection>
implements Host<Addr, C> {
    private static final int MIN_RANDOM_SEARCH_SPACE = 64;
    private static final float RANDOM_SEARCH_FACTOR = 0.75f;
    private static final Object[] EMPTY_ARRAY = new Object[0];
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHost.class);
    private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState();
    private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES);
    private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, (Object)State.CLOSED);
    private static final AtomicReferenceFieldUpdater<DefaultHost, ConnState> connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultHost.class, ConnState.class, "connState");
    private final String lbDescription;
    final Addr address;
    @Nullable
    private final HealthCheckConfig healthCheckConfig;
    private final ConnectionFactory<Addr, ? extends C> connectionFactory;
    private final int linearSearchSpace;
    private final ListenableAsyncCloseable closeable;
    private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;

    DefaultHost(String lbDescription, Addr address, ConnectionFactory<Addr, ? extends C> connectionFactory, int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
        this.lbDescription = Objects.requireNonNull(lbDescription, "lbDescription");
        this.address = Objects.requireNonNull(address, "address");
        this.linearSearchSpace = linearSearchSpace;
        this.connectionFactory = Objects.requireNonNull(connectionFactory, "connectionFactory");
        this.healthCheckConfig = healthCheckConfig;
        this.closeable = AsyncCloseables.toAsyncCloseable(graceful -> graceful ? this.doClose(AsyncCloseable::closeAsyncGracefully) : this.doClose(AsyncCloseable::closeAsync));
    }

    @Override
    public Addr address() {
        return this.address;
    }

    @Override
    public boolean markActiveIfNotClosed() {
        Object oldState = DefaultHost.connStateUpdater.getAndUpdate((DefaultHost)this, (UnaryOperator<ConnState>)(UnaryOperator)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$markActiveIfNotClosed$1(io.servicetalk.loadbalancer.DefaultHost$ConnState ), (Lio/servicetalk/loadbalancer/DefaultHost$ConnState;)Lio/servicetalk/loadbalancer/DefaultHost$ConnState;)()).state;
        return oldState != State.CLOSED;
    }

    @Override
    public void markClosed() {
        ConnState oldState = this.closeConnState();
        Object[] toRemove = oldState.connections;
        this.cancelIfHealthCheck(oldState);
        LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.", this.lbDescription, toRemove.length, this.address);
        for (Object conn : toRemove) {
            LoadBalancedConnection cConn = (LoadBalancedConnection)conn;
            cConn.closeAsyncGracefully().subscribe();
        }
    }

    private ConnState closeConnState() {
        ConnState oldState;
        do {
            oldState = this.connState;
        } while (oldState.state != State.CLOSED && !connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, (Object)State.CLOSED)));
        return oldState;
    }

    @Override
    public void markExpired() {
        block2: {
            State nextState;
            ConnState oldState;
            do {
                oldState = connStateUpdater.get(this);
                if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) break block2;
            } while (!connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, (Object)(nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED))));
            this.cancelIfHealthCheck(oldState);
            if (nextState == State.CLOSED) {
                this.closeAsync().subscribe();
            }
        }
    }

    @Override
    @Nullable
    public C pickConnection(Predicate<C> selector, @Nullable ContextMap context) {
        Object[] connections = this.connState.connections;
        int linearAttempts = Math.min(connections.length, this.linearSearchSpace);
        for (int j = 0; j < linearAttempts; ++j) {
            LoadBalancedConnection connection = (LoadBalancedConnection)connections[j];
            if (!selector.test(connection)) continue;
            return (C)connection;
        }
        if (connections.length > linearAttempts) {
            int diff = connections.length - linearAttempts;
            int randomAttempts = diff < 64 ? diff : (int)((float)diff * 0.75f);
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            for (int j = 0; j < randomAttempts; ++j) {
                LoadBalancedConnection connection = (LoadBalancedConnection)connections[rnd.nextInt(linearAttempts, connections.length)];
                if (!selector.test(connection)) continue;
                return (C)connection;
            }
        }
        return null;
    }

    @Override
    public Single<C> newConnection(Predicate<C> selector, boolean forceNewConnectionAndReserve, @Nullable ContextMap context) {
        Single<C> establishConnection = this.connectionFactory.newConnection(this.address, context, null);
        if (this.healthCheckConfig != null) {
            establishConnection = establishConnection.beforeOnError(t -> this.markUnhealthy((Throwable)t));
        }
        return establishConnection.flatMap(newCnx -> {
            if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
                return newCnx.closeAsync().concat(Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.lbDescription + " could not be reserved.", RoundRobinLoadBalancer.class, "selectConnection0(...)"))).shareContextOnSubscribe();
            }
            if (!selector.test(newCnx)) {
                Single failedSingle = Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.lbDescription + " was rejected by the selection filter.", RoundRobinLoadBalancer.class, "selectConnection0(...)"));
                return (this.addConnection(newCnx, null) ? failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
            }
            if (this.addConnection(newCnx, null)) {
                return Single.succeeded(newCnx).shareContextOnSubscribe();
            }
            return newCnx.closeAsync().concat(Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Failed to add newly created connection " + newCnx + " for " + this.toString(), RoundRobinLoadBalancer.class, "selectConnection0(...)"))).shareContextOnSubscribe();
        });
    }

    private void markHealthy(HealthCheck<Addr, C> originalHealthCheckState) {
        ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> {
            if (DefaultHost.isUnhealthy(previous)) {
                return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES);
            }
            return previous;
        });
        if (oldState.state != originalHealthCheckState) {
            this.cancelIfHealthCheck(oldState);
        }
    }

    private void markUnhealthy(Throwable cause) {
        block4: {
            HealthCheck healthCheck;
            assert (this.healthCheckConfig != null);
            while (true) {
                ConnState previous;
                if (!DefaultHost.isActive(previous = connStateUpdater.get(this)) || previous.connections.length > 0 || cause instanceof ConnectionLimitReachedException) {
                    LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", this.lbDescription, this.address, previous, cause);
                    break block4;
                }
                ActiveState previousState = (ActiveState)previous.state;
                if (previousState.failedConnections + 1 < this.healthCheckConfig.failedThreshold) {
                    ActiveState nextState = previousState.forNextFailedConnection();
                    if (!connStateUpdater.compareAndSet(this, previous, new ConnState(previous.connections, nextState))) continue;
                    LOGGER.debug("{}: failed to open a new connection to the host on address {} {} time(s) ({} consecutive failures will trigger health-checking).", this.lbDescription, this.address, nextState.failedConnections, this.healthCheckConfig.failedThreshold, cause);
                    break block4;
                }
                healthCheck = new HealthCheck(this.connectionFactory, this, cause);
                ConnState nextState = new ConnState(previous.connections, healthCheck);
                if (connStateUpdater.compareAndSet(this, previous, nextState)) break;
            }
            LOGGER.info("{}: failed to open a new connection to the host on address {} {} time(s) in a row. Error counting threshold reached, marking this host as UNHEALTHY for the selection algorithm and triggering background health-checking.", this.lbDescription, this.address, this.healthCheckConfig.failedThreshold, cause);
            healthCheck.schedule(cause);
        }
    }

    @Override
    public boolean isActiveAndHealthy() {
        return DefaultHost.isActive(this.connState);
    }

    @Override
    public boolean isUnhealthy() {
        return DefaultHost.isUnhealthy(this.connState);
    }

    private static boolean isActive(ConnState connState) {
        return ActiveState.class.equals(connState.state.getClass());
    }

    private static boolean isUnhealthy(ConnState connState) {
        return HealthCheck.class.equals(connState.state.getClass());
    }

    private boolean addConnection(C connection, @Nullable HealthCheck<Addr, C> currentHealthCheck) {
        ActiveState newState;
        Object[] newList;
        ConnState previous;
        int addAttempt = 0;
        do {
            Object[] existing;
            previous = connStateUpdater.get(this);
            if (previous.state == State.CLOSED) {
                return false;
            }
            ++addAttempt;
            for (Object o : existing = previous.connections) {
                if (!o.equals(connection)) continue;
                return true;
            }
            newList = Arrays.copyOf(existing, existing.length + 1);
            newList[existing.length] = connection;
        } while (!connStateUpdater.compareAndSet(this, previous, new ConnState(newList, newState = DefaultHost.isActive(previous) || DefaultHost.isUnhealthy(previous) ? STATE_ACTIVE_NO_FAILURES : previous.state)));
        if (DefaultHost.isUnhealthy(previous) && (currentHealthCheck == null || previous.state != currentHealthCheck)) {
            assert (newState == STATE_ACTIVE_NO_FAILURES);
            this.cancelIfHealthCheck(previous);
        }
        LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", this.lbDescription, connection, this, addAttempt);
        connection.onClose().beforeFinally(() -> {
            int removeAttempt = 0;
            while (true) {
                int i;
                ConnState currentConnState = this.connState;
                if (currentConnState.state == State.CLOSED) break;
                assert (currentConnState.connections.length > 0);
                ++removeAttempt;
                Object[] connections = currentConnState.connections;
                for (i = 0; i < connections.length && !connections[i].equals(connection); ++i) {
                }
                if (i == connections.length) break;
                if (connections.length == 1) {
                    assert (!DefaultHost.isUnhealthy(currentConnState)) : "Cannot be UNHEALTHY with #connections > 0";
                    if (DefaultHost.isActive(currentConnState)) {
                        if (!connStateUpdater.compareAndSet(this, currentConnState, new ConnState(EMPTY_ARRAY, currentConnState.state))) continue;
                    } else {
                        if (currentConnState.state != State.EXPIRED || !connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) continue;
                        this.closeAsync().subscribe();
                    }
                    break;
                }
                Object[] newList = new Object[connections.length - 1];
                System.arraycopy(connections, 0, newList, 0, i);
                System.arraycopy(connections, i + 1, newList, i, newList.length - i);
                if (connStateUpdater.compareAndSet(this, currentConnState, new ConnState(newList, currentConnState.state))) break;
            }
            LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", this.lbDescription, connection, this, removeAttempt);
        }).onErrorComplete(t -> {
            LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", this.lbDescription, connection, t);
            return true;
        }).subscribe();
        return true;
    }

    Map.Entry<Addr, List<C>> asEntry() {
        return new AbstractMap.SimpleImmutableEntry(this.address, Stream.of(this.connState.connections).map(conn -> (LoadBalancedConnection)conn).collect(Collectors.toList()));
    }

    @Override
    public Completable closeAsync() {
        return this.closeable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.closeable.closeAsyncGracefully();
    }

    @Override
    public Completable onClose() {
        return this.closeable.onClose();
    }

    @Override
    public Completable onClosing() {
        return this.closeable.onClosing();
    }

    private Completable doClose(Function<? super C, Completable> closeFunction) {
        return Completable.defer(() -> {
            ConnState oldState = this.closeConnState();
            this.cancelIfHealthCheck(oldState);
            Object[] connections = oldState.connections;
            return (connections.length == 0 ? Completable.completed() : Publisher.from(connections).flatMapCompletableDelayError(conn -> (Completable)closeFunction.apply((Object)((LoadBalancedConnection)conn)))).shareContextOnSubscribe();
        });
    }

    private void cancelIfHealthCheck(ConnState connState) {
        if (DefaultHost.isUnhealthy(connState)) {
            HealthCheck healthCheck = (HealthCheck)connState.state;
            LOGGER.debug("{}: health check cancelled for {}.", (Object)this.lbDescription, (Object)healthCheck.host);
            healthCheck.cancel();
        }
    }

    @Override
    public int score() {
        return 1;
    }

    public String toString() {
        ConnState connState = this.connState;
        return "Host{lbDescription=" + this.lbDescription + ", address=" + this.address + ", state=" + connState.state + ", #connections=" + connState.connections.length + '}';
    }

    private static /* synthetic */ ConnState lambda$markActiveIfNotClosed$1(ConnState oldConnState) {
        if (oldConnState.state == State.EXPIRED) {
            return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES);
        }
        return oldConnState;
    }

    private static final class ConnState {
        final Object[] connections;
        final Object state;

        ConnState(Object[] connections, Object state) {
            this.connections = connections;
            this.state = state;
        }

        public String toString() {
            return "ConnState{state=" + this.state + ", #connections=" + this.connections.length + '}';
        }
    }

    private static final class HealthCheck<ResolvedAddress, C extends LoadBalancedConnection>
    extends DelayedCancellable {
        private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
        private final DefaultHost<ResolvedAddress, C> host;
        private final Throwable lastError;

        private HealthCheck(ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory, DefaultHost<ResolvedAddress, C> host, Throwable lastError) {
            this.connectionFactory = connectionFactory;
            this.host = host;
            this.lastError = lastError;
        }

        public void schedule(Throwable originalCause) {
            assert (((DefaultHost)this.host).healthCheckConfig != null);
            this.delayedCancellable(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> true, ((DefaultHost)this.host).healthCheckConfig.healthCheckInterval, ((DefaultHost)this.host).healthCheckConfig.jitter, ((DefaultHost)this.host).healthCheckConfig.executor).apply(0, originalCause).beforeOnSubscribe(__ -> AsyncContext.clear()).concat(this.connectionFactory.newConnection(this.host.address, null, null).retryWhen(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> {
                LOGGER.debug("{}: health check failed for {}.", ((DefaultHost)this.host).lbDescription, this.host, cause);
                return true;
            }, ((DefaultHost)this.host).healthCheckConfig.healthCheckInterval, ((DefaultHost)this.host).healthCheckConfig.jitter, ((DefaultHost)this.host).healthCheckConfig.executor))).flatMapCompletable(newCnx -> {
                if (((DefaultHost)this.host).addConnection(newCnx, this)) {
                    LOGGER.info("{}: health check passed for {}, marked this host as ACTIVE for the selection algorithm.", (Object)((DefaultHost)this.host).lbDescription, (Object)this.host);
                    return Completable.completed();
                }
                assert (((DefaultHost)this.host).connState.state == State.CLOSED);
                LOGGER.debug("{}: health check passed for {}, but the host rejected a new connection {}. Closing it now.", ((DefaultHost)this.host).lbDescription, this.host, newCnx);
                return newCnx.closeAsync();
            }).onErrorComplete(t -> {
                LOGGER.error("{}: health check terminated with an unexpected error for {}. Marking this host as ACTIVE as a fallback to allow connection attempts.", ((DefaultHost)this.host).lbDescription, this.host, t);
                ((DefaultHost)this.host).markHealthy(this);
                return true;
            }).subscribe());
        }

        public String toString() {
            return "UNHEALTHY(" + this.lastError + ')';
        }
    }

    private static final class ActiveState {
        private final int failedConnections;

        ActiveState() {
            this(0);
        }

        private ActiveState(int failedConnections) {
            this.failedConnections = failedConnections;
        }

        ActiveState forNextFailedConnection() {
            return new ActiveState(FlowControlUtils.addWithOverflowProtection(this.failedConnections, 1));
        }

        public String toString() {
            return "ACTIVE(failedConnections=" + this.failedConnections + ')';
        }
    }

    private static enum State {
        EXPIRED,
        CLOSED;

    }
}

