/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectTimeoutException;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.RetryStrategies;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.internal.DefaultContextMap;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.ConnectTracker;
import io.servicetalk.loadbalancer.ConnectionPoolStrategy;
import io.servicetalk.loadbalancer.Exceptions;
import io.servicetalk.loadbalancer.HealthCheckConfig;
import io.servicetalk.loadbalancer.HealthIndicator;
import io.servicetalk.loadbalancer.Host;
import io.servicetalk.loadbalancer.LoadBalancerObserver;
import io.servicetalk.loadbalancer.RequestTracker;
import io.servicetalk.transport.api.TransportObserver;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DefaultHost<Addr, C extends LoadBalancedConnection>
implements Host<Addr, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHost.class);
    private static final AtomicReferenceFieldUpdater<DefaultHost, ConnState> connStateUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultHost.class, ConnState.class, "connState");
    private final String lbDescription;
    private final Addr address;
    @Nullable
    private final HealthCheckConfig healthCheckConfig;
    private final ConnectionPoolStrategy<C> connectionPoolStrategy;
    @Nullable
    private final HealthIndicator<Addr, C> healthIndicator;
    private final LoadBalancerObserver.HostObserver hostObserver;
    private final ConnectionFactory<Addr, ? extends C> connectionFactory;
    private final ListenableAsyncCloseable closeable;
    private volatile ConnState connState = new ConnState(Collections.emptyList(), State.ACTIVE, 0, null);

    DefaultHost(String lbDescription, Addr address, ConnectionPoolStrategy<C> connectionPoolStrategy, ConnectionFactory<Addr, ? extends C> connectionFactory, LoadBalancerObserver.HostObserver hostObserver, @Nullable HealthCheckConfig healthCheckConfig, @Nullable HealthIndicator<Addr, C> healthIndicator) {
        this.lbDescription = Objects.requireNonNull(lbDescription, "lbDescription");
        this.address = Objects.requireNonNull(address, "address");
        this.healthIndicator = healthIndicator;
        this.connectionPoolStrategy = Objects.requireNonNull(connectionPoolStrategy, "connectionPoolStrategy");
        Objects.requireNonNull(connectionFactory, "connectionFactory");
        InstrumentedConnectionFactory instrumentedConnectionFactory = this.connectionFactory = healthIndicator == null ? connectionFactory : new InstrumentedConnectionFactory(connectionFactory, healthIndicator);
        assert (healthCheckConfig == null || healthCheckConfig.failedThreshold > 0);
        this.healthCheckConfig = healthCheckConfig;
        this.hostObserver = Objects.requireNonNull(hostObserver, "hostObserver");
        this.closeable = AsyncCloseables.toAsyncCloseable(this::doClose);
    }

    @Override
    public Addr address() {
        return this.address;
    }

    @Override
    public boolean markActiveIfNotClosed() {
        ConnState oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
            if (oldConnState.state == State.EXPIRED) {
                return oldConnState.toActiveNoFailures();
            }
            return oldConnState;
        });
        if (oldState.state == State.EXPIRED) {
            this.hostObserver.onExpiredHostRevived(oldState.connections.size());
        }
        return oldState.state != State.CLOSED;
    }

    private ConnState closeConnState() {
        ConnState oldState;
        do {
            oldState = this.connState;
            if (oldState.state != State.CLOSED) continue;
            return oldState;
        } while (!connStateUpdater.compareAndSet(this, oldState, oldState.toClosed()));
        if (this.healthIndicator != null) {
            this.healthIndicator.cancel();
        }
        return oldState;
    }

    @Override
    public boolean markExpired() {
        State nextState;
        ConnState oldState;
        do {
            oldState = connStateUpdater.get(this);
            if (oldState.state == State.EXPIRED) {
                return false;
            }
            if (oldState.state == State.CLOSED) {
                return true;
            }
            State state = nextState = oldState.connections.isEmpty() ? State.CLOSED : State.EXPIRED;
        } while (!connStateUpdater.compareAndSet(this, oldState, oldState.toExpired()));
        this.cancelIfHealthCheck(oldState);
        this.hostObserver.onHostMarkedExpired(oldState.connections.size());
        if (nextState == State.CLOSED) {
            this.closeAsync().subscribe();
            return true;
        }
        return false;
    }

    @Override
    @Nullable
    public C pickConnection(Predicate<C> selector, @Nullable ContextMap context) {
        List connections = this.connState.connections;
        return this.connectionPoolStrategy.select(connections, selector);
    }

    @Override
    public Single<C> newConnection(Predicate<C> selector, boolean forceNewConnectionAndReserve, @Nullable ContextMap context) {
        return Single.defer(() -> {
            ContextMap actualContext = context;
            if (actualContext == null) {
                actualContext = new DefaultContextMap();
            }
            if (this.healthIndicator != null) {
                actualContext.put(RequestTracker.REQUEST_TRACKER_KEY, this.healthIndicator);
            }
            Single<C> establishConnection = this.connectionFactory.newConnection(this.address, actualContext, null);
            if (this.healthCheckConfig != null) {
                establishConnection = establishConnection.beforeOnError(this::onConnectionError);
            }
            return establishConnection.flatMap(newCnx -> {
                if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
                    return newCnx.closeAsync().concat(Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.lbDescription + " could not be reserved.", DefaultHost.class, "newConnection(...)"))).shareContextOnSubscribe();
                }
                if (!selector.test(newCnx)) {
                    Single failedSingle = Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Newly created connection " + newCnx + " for " + this.lbDescription + " was rejected by the selection filter.", DefaultHost.class, "newConnection(...)"));
                    return (this.addConnection(newCnx, null) ? failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
                }
                if (this.addConnection(newCnx, null)) {
                    return Single.succeeded(newCnx).shareContextOnSubscribe();
                }
                return newCnx.closeAsync().concat(Single.failed(Exceptions.StacklessConnectionRejectedException.newInstance("Failed to add newly created connection " + newCnx + " for " + this, DefaultHost.class, "newConnection(...)"))).shareContextOnSubscribe();
            }).shareContextOnSubscribe();
        });
    }

    private void markHealthy(HealthCheck originalHealthCheckState) {
        ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> {
            if (previous.isUnhealthy()) {
                return previous.toActiveNoFailures();
            }
            return previous;
        });
        if (oldState.healthCheck != originalHealthCheckState) {
            this.cancelIfHealthCheck(oldState);
        }
        if (oldState.isUnhealthy()) {
            this.hostObserver.onHostRevived();
        }
    }

    private void onConnectionError(Throwable cause) {
        block5: {
            ConnState nextState;
            ConnState previous;
            assert (this.healthCheckConfig != null);
            do {
                if ((previous = connStateUpdater.get(this)).isActive() && previous.connections.isEmpty() && !(cause instanceof ConnectionLimitReachedException)) continue;
                LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.", this.lbDescription, this.address, previous, cause);
                break block5;
            } while (!connStateUpdater.compareAndSet(this, previous, nextState = previous.toNextFailedConnection(cause)));
            if (nextState.state == State.ACTIVE) {
                LOGGER.debug("{}: failed to open a new connection to the host on address {} {} time(s) ({} consecutive failures will trigger health-checking).", this.lbDescription, this.address, nextState.failedConnections, this.healthCheckConfig.failedThreshold, cause);
            } else {
                assert (nextState.state == State.UNHEALTHY && nextState.healthCheck != null);
                LOGGER.info("{}: failed to open a new connection to the host on address {} {} time(s) in a row. Error counting threshold reached, marking this host as UNHEALTHY for the selection algorithm and triggering background health-checking.", this.lbDescription, this.address, this.healthCheckConfig.failedThreshold, cause);
                this.hostObserver.onHostMarkedUnhealthy(cause);
                nextState.healthCheck.schedule(cause);
            }
        }
    }

    @Override
    public boolean isHealthy() {
        State state = this.connState.state;
        return state != State.UNHEALTHY && state != State.CLOSED && (this.healthIndicator == null || this.healthIndicator.isHealthy());
    }

    @Override
    public boolean canMakeNewConnections() {
        State state = this.connState.state;
        return state != State.EXPIRED && state != State.CLOSED;
    }

    private boolean addConnection(C connection, @Nullable HealthCheck currentHealthCheck) {
        ConnState nextState;
        ConnState previous;
        int addAttempt = 0;
        do {
            previous = connStateUpdater.get(this);
            if (previous.state == State.CLOSED) {
                return false;
            }
            ++addAttempt;
            nextState = previous.addNewConnection(connection);
            if (nextState != previous) continue;
            return true;
        } while (!connStateUpdater.compareAndSet(this, previous, nextState));
        if (previous.isUnhealthy()) {
            if (currentHealthCheck == null || previous.healthCheck != currentHealthCheck) {
                this.cancelIfHealthCheck(previous);
            }
            this.hostObserver.onHostRevived();
        }
        LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).", this.lbDescription, connection, this, addAttempt);
        connection.onClose().beforeFinally(() -> {
            int removeAttempt = 0;
            while (true) {
                ConnState currentConnState = this.connState;
                if (currentConnState.state == State.CLOSED) break;
                ++removeAttempt;
                ConnState nextState = currentConnState.removeConnection(connection);
                if (nextState == currentConnState) break;
                if (nextState.connections.isEmpty()) {
                    if (currentConnState.isActive()) {
                        if (!connStateUpdater.compareAndSet(this, currentConnState, nextState)) continue;
                    } else {
                        if (currentConnState.state != State.EXPIRED || !connStateUpdater.compareAndSet(this, currentConnState, nextState.toClosed())) continue;
                        this.closeAsync().subscribe();
                        this.hostObserver.onExpiredHostRemoved(nextState.connections.size());
                    }
                    break;
                }
                if (connStateUpdater.compareAndSet(this, currentConnState, nextState)) break;
            }
            LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).", this.lbDescription, connection, this, removeAttempt);
        }).onErrorComplete(t -> {
            LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.", this.lbDescription, connection, t);
            return true;
        }).subscribe();
        return true;
    }

    Map.Entry<Addr, List<C>> asEntry() {
        return new AbstractMap.SimpleImmutableEntry(this.address, this.connState.connections);
    }

    @Override
    public Completable closeAsync() {
        return this.closeable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.closeable.closeAsyncGracefully();
    }

    @Override
    public Completable onClose() {
        return this.closeable.onClose();
    }

    @Override
    public Completable onClosing() {
        return this.closeable.onClosing();
    }

    private Completable doClose(boolean graceful) {
        return Completable.defer(() -> {
            ConnState oldState = this.closeConnState();
            this.cancelIfHealthCheck(oldState);
            LOGGER.debug("{}: closing {} connection(s) {}gracefully to the closed address: {}.", this.lbDescription, oldState.connections.size(), graceful ? "" : "un", this.address);
            if (oldState.state == State.ACTIVE) {
                this.hostObserver.onActiveHostRemoved(oldState.connections.size());
            } else if (oldState.state == State.EXPIRED) {
                this.hostObserver.onExpiredHostRemoved(oldState.connections.size());
            }
            List connections = oldState.connections;
            return (connections.isEmpty() ? Completable.completed() : Publisher.fromIterable(connections).flatMapCompletableDelayError(conn -> graceful ? conn.closeAsyncGracefully() : conn.closeAsync())).shareContextOnSubscribe();
        });
    }

    private void cancelIfHealthCheck(ConnState connState) {
        if (connState.isUnhealthy()) {
            LOGGER.debug("{}: health check cancelled for {}.", (Object)this.lbDescription, (Object)this);
            assert (connState.healthCheck != null);
            connState.healthCheck.cancel();
        }
    }

    @Override
    public int score() {
        return this.healthIndicator == null ? 1 : this.healthIndicator.score();
    }

    public String toString() {
        ConnState connState = this.connState;
        return "Host{lbDescription=" + this.lbDescription + ", address=" + this.address + ", state=" + (Object)((Object)connState.state) + ", #connections=" + connState.connections.size() + '}';
    }

    private static class ConnectSignalConsumer
    implements TerminalSignalConsumer {
        private final ConnectTracker connectTracker;
        private final long connectStartTime;

        ConnectSignalConsumer(long connectStartTime, ConnectTracker connectTracker) {
            this.connectStartTime = connectStartTime;
            this.connectTracker = connectTracker;
        }

        @Override
        public void onComplete() {
            this.connectTracker.onConnectSuccess(this.connectStartTime);
        }

        @Override
        public void cancel() {
            this.doOnError(ConnectTracker.ErrorClass.CANCELLED);
        }

        @Override
        public void onError(Throwable t) {
            this.doOnError(t instanceof ConnectTimeoutException ? ConnectTracker.ErrorClass.CONNECT_TIMEOUT : ConnectTracker.ErrorClass.CONNECT_ERROR);
        }

        private void doOnError(ConnectTracker.ErrorClass errorClass) {
            this.connectTracker.onConnectError(this.connectStartTime, errorClass);
        }
    }

    private static final class InstrumentedConnectionFactory<Addr, C extends LoadBalancedConnection>
    extends DelegatingConnectionFactory<Addr, C> {
        private final ConnectTracker connectTracker;

        InstrumentedConnectionFactory(ConnectionFactory<Addr, C> delegate, ConnectTracker connectTracker) {
            super(delegate);
            this.connectTracker = connectTracker;
        }

        @Override
        public Single<C> newConnection(Addr addr, @Nullable ContextMap context, @Nullable TransportObserver observer) {
            return Single.defer(() -> {
                long connectStartTime = this.connectTracker.beforeConnectStart();
                return this.delegate().newConnection(addr, context, observer).beforeFinally(new ConnectSignalConsumer(connectStartTime, this.connectTracker)).shareContextOnSubscribe();
            });
        }
    }

    private final class ConnState {
        final List<C> connections;
        final State state;
        final int failedConnections;
        @Nullable
        HealthCheck healthCheck;

        private ConnState(List<C> connections, State state, @Nullable int failedConnections, HealthCheck healthCheck) {
            assert (state != State.UNHEALTHY || healthCheck != null);
            assert (state == State.UNHEALTHY || healthCheck == null);
            assert (state == State.UNHEALTHY || state == State.ACTIVE || failedConnections == 0);
            this.connections = connections;
            this.state = state;
            this.failedConnections = failedConnections;
            this.healthCheck = healthCheck;
        }

        ConnState toNextFailedConnection(Throwable cause) {
            assert (DefaultHost.this.healthCheckConfig != null);
            int nextFailedCount = FlowControlUtils.addWithOverflowProtection(this.failedConnections, 1);
            if (this.state == State.ACTIVE && ((DefaultHost)DefaultHost.this).healthCheckConfig.failedThreshold <= nextFailedCount) {
                return new ConnState(this.connections, State.UNHEALTHY, nextFailedCount, new HealthCheck(cause));
            }
            return new ConnState(this.connections, this.state, nextFailedCount, this.healthCheck);
        }

        ConnState toActiveNoFailures() {
            return new ConnState(this.connections, State.ACTIVE, 0, null);
        }

        ConnState toClosed() {
            return new ConnState(this.connections, State.CLOSED, 0, null);
        }

        ConnState toExpired() {
            return new ConnState(this.connections, State.EXPIRED, 0, null);
        }

        ConnState removeConnection(C connection) {
            List newList;
            int index = this.connections.indexOf(connection);
            if (index < 0) {
                return this;
            }
            if (this.connections.size() == 1) {
                newList = Collections.emptyList();
            } else {
                newList = new ArrayList(this.connections.size() - 1);
                for (int i = 0; i < this.connections.size(); ++i) {
                    if (i == index) continue;
                    newList.add(this.connections.get(i));
                }
            }
            return new ConnState(newList, this.state, this.failedConnections, this.healthCheck);
        }

        ConnState addNewConnection(C connection) {
            if (this.connections.contains(connection)) {
                return this;
            }
            ArrayList newList = new ArrayList(this.connections.size() + 1);
            newList.addAll(this.connections);
            newList.add(connection);
            return new ConnState(newList, State.ACTIVE, 0, null);
        }

        boolean isActive() {
            return this.state == State.ACTIVE;
        }

        boolean isUnhealthy() {
            return this.state == State.UNHEALTHY;
        }

        public String toString() {
            return "ConnState{state=" + (Object)((Object)this.state) + ", failedConnections=" + this.failedConnections + ", healthCheck=" + this.healthCheck + ", #connections=" + this.connections.size() + '}';
        }
    }

    private final class HealthCheck
    extends DelayedCancellable {
        private final Throwable lastError;

        private HealthCheck(Throwable lastError) {
            this.lastError = lastError;
        }

        public void schedule(Throwable originalCause) {
            assert (DefaultHost.this.healthCheckConfig != null);
            this.delayedCancellable(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> true, ((DefaultHost)DefaultHost.this).healthCheckConfig.healthCheckInterval, ((DefaultHost)DefaultHost.this).healthCheckConfig.jitter, ((DefaultHost)DefaultHost.this).healthCheckConfig.executor).apply(0, originalCause).beforeOnSubscribe(__ -> AsyncContext.clear()).concat(DefaultHost.this.newConnection(cxn -> true, false, null).retryWhen(RetryStrategies.retryWithConstantBackoffDeltaJitter(cause -> {
                LOGGER.debug("{}: health check failed for {}.", DefaultHost.this.lbDescription, DefaultHost.this, cause);
                return true;
            }, ((DefaultHost)DefaultHost.this).healthCheckConfig.healthCheckInterval, ((DefaultHost)DefaultHost.this).healthCheckConfig.jitter, ((DefaultHost)DefaultHost.this).healthCheckConfig.executor))).flatMapCompletable(newCnx -> {
                LOGGER.info("{}: health check passed for {}, marked this host as ACTIVE for the selection algorithm.", (Object)DefaultHost.this.lbDescription, (Object)DefaultHost.this);
                return Completable.completed();
            }).onErrorComplete(t -> {
                LOGGER.error("{}: health check terminated with an unexpected error for {}. Marking this host as ACTIVE as a fallback to allow connection attempts.", DefaultHost.this.lbDescription, DefaultHost.this, t);
                DefaultHost.this.markHealthy(this);
                return true;
            }).subscribe());
        }

        public String toString() {
            return "UNHEALTHY(" + this.lastError + ')';
        }
    }

    private static enum State {
        ACTIVE,
        UNHEALTHY,
        EXPIRED,
        CLOSED;

    }
}

