/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionRejectedException;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.LoadBalancerFactory;
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.NoAvailableHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancer<C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);
    private static final List<?> CLOSED_LIST = new ArrayList(0);
    private static final AtomicReferenceFieldUpdater<RoundRobinLoadBalancer, List> activeHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, List.class, "activeHosts");
    private static final AtomicIntegerFieldUpdater<RoundRobinLoadBalancer> indexUpdater = AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "index");
    private static final int MIN_SEARCH_SPACE = 64;
    private static final float SEARCH_FACTOR = 0.75f;
    private volatile int index;
    private volatile List<Host<ResolvedAddress, C>> activeHosts = Collections.emptyList();
    private final Publisher<Object> eventStream;
    private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
    private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
    private final ListenableAsyncCloseable asyncCloseable;

    public RoundRobinLoadBalancer(final Publisher<? extends ServiceDiscovererEvent<ResolvedAddress>> eventPublisher, ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory) {
        final PublisherSource.Processor eventStreamProcessor = Processors.newPublisherProcessorDropHeadOnOverflow(32);
        this.eventStream = SourceAdapters.fromSource(eventStreamProcessor);
        this.connectionFactory = Objects.requireNonNull(connectionFactory);
        SourceAdapters.toSource(eventPublisher).subscribe(new PublisherSource.Subscriber<ServiceDiscovererEvent<ResolvedAddress>>(){

            @Override
            public void onSubscribe(PublisherSource.Subscription s) {
                s.request(Long.MAX_VALUE);
                RoundRobinLoadBalancer.this.discoveryCancellable.nextCancellable(s);
            }

            @Override
            public void onNext(ServiceDiscovererEvent<ResolvedAddress> event) {
                LOGGER.debug("Load balancer {}, received new ServiceDiscoverer event {}.", (Object)RoundRobinLoadBalancer.this, (Object)event);
                List activeAddresses = (List)activeHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, oldHosts -> {
                    if (oldHosts == CLOSED_LIST) {
                        return CLOSED_LIST;
                    }
                    Object addr = Objects.requireNonNull(event.address());
                    List oldHostsTyped = oldHosts;
                    if (event.isAvailable()) {
                        if (oldHostsTyped.isEmpty()) {
                            return Collections.singletonList(new Host(addr));
                        }
                        ArrayList newHosts = new ArrayList(oldHostsTyped.size() + 1);
                        newHosts.addAll(oldHostsTyped);
                        newHosts.add(new Host(addr));
                        return newHosts;
                    }
                    if (oldHostsTyped.isEmpty()) {
                        return Collections.emptyList();
                    }
                    ArrayList newHosts = new ArrayList(oldHostsTyped.size() - 1);
                    for (int i = 0; i < oldHostsTyped.size(); ++i) {
                        Host host = (Host)oldHostsTyped.get(i);
                        if (host.address.equals(addr)) {
                            host.markInactive();
                            for (int x = i + 1; x < oldHostsTyped.size(); ++x) {
                                newHosts.add(oldHostsTyped.get(x));
                            }
                            return newHosts.isEmpty() ? Collections.emptyList() : newHosts;
                        }
                        newHosts.add(host);
                    }
                    return newHosts;
                });
                LOGGER.debug("Load balancer {} now using {} addresses: {}", RoundRobinLoadBalancer.this, activeAddresses.size(), activeAddresses);
                if (event.isAvailable()) {
                    if (activeAddresses.size() == 1) {
                        eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT);
                    }
                } else if (activeAddresses.isEmpty()) {
                    eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT);
                }
            }

            @Override
            public void onError(Throwable t) {
                List hosts = RoundRobinLoadBalancer.this.activeHosts;
                eventStreamProcessor.onError(t);
                LOGGER.error("Load balancer {}. Service discoverer {} emitted an error. Last seen addresses (size {}) {}", RoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
            }

            @Override
            public void onComplete() {
                List hosts = RoundRobinLoadBalancer.this.activeHosts;
                eventStreamProcessor.onComplete();
                LOGGER.error("Load balancer {}. Service discoverer {} completed. Last seen addresses (size {}) {}", RoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts);
            }
        });
        this.asyncCloseable = AsyncCloseables.toAsyncCloseable(graceful -> {
            List<?> currentList = activeHostsUpdater.getAndSet(this, CLOSED_LIST);
            this.discoveryCancellable.cancel();
            eventStreamProcessor.onComplete();
            CompositeCloseable cc = AsyncCloseables.newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory);
            return graceful ? cc.closeAsyncGracefully() : cc.closeAsync();
        });
    }

    public static <ResolvedAddress, C extends LoadBalancedConnection> RoundRobinLoadBalancerFactory<ResolvedAddress, C> newRoundRobinFactory() {
        return new RoundRobinLoadBalancerFactory();
    }

    @Override
    public Single<C> selectConnection(Predicate<C> selector) {
        return Single.defer(() -> this.selectConnection0(selector).subscribeShareContext());
    }

    @Override
    public Publisher<Object> eventStream() {
        return this.eventStream;
    }

    private Single<C> selectConnection0(Predicate<C> selector) {
        List<Host<ResolvedAddress, C>> activeHosts = this.activeHosts;
        if (activeHosts.isEmpty()) {
            return activeHosts == CLOSED_LIST ? RoundRobinLoadBalancer.failedLBClosed() : Single.failed(StacklessNoAvailableHostException.newInstance("No hosts are available to connect.", RoundRobinLoadBalancer.class, "selectConnection0(...)"));
        }
        int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % activeHosts.size();
        Host<ResolvedAddress, C> host = activeHosts.get(cursor);
        assert (host != null) : "Host can't be null.";
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        List connections = ((Host)host).connections;
        int size = connections.size();
        int attempts = size < 64 ? size : (int)((float)size * 0.75f);
        for (int i = 0; i < attempts; ++i) {
            LoadBalancedConnection connection = (LoadBalancedConnection)connections.get(rnd.nextInt(size));
            if (!selector.test(connection)) continue;
            return Single.succeeded(connection);
        }
        return this.connectionFactory.newConnection(host.address, null).flatMap(newCnx -> {
            if (!selector.test(newCnx)) {
                return newCnx.closeAsync().concat(Single.failed(new ConnectionRejectedException("Newly created connection " + newCnx + " rejected by the selection filter.")));
            }
            if (host.addConnection(newCnx)) {
                return Single.succeeded(newCnx);
            }
            return newCnx.closeAsync().concat(this.activeHosts == CLOSED_LIST ? RoundRobinLoadBalancer.failedLBClosed() : Single.failed(new ConnectionRejectedException("Failed to add newly created connection for host: " + host.address + ", host inactive? " + host.isInactive())));
        });
    }

    @Override
    public Completable onClose() {
        return this.asyncCloseable.onClose();
    }

    @Override
    public Completable closeAsync() {
        return this.asyncCloseable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.asyncCloseable.closeAsyncGracefully();
    }

    List<Map.Entry<ResolvedAddress, List<C>>> activeAddresses() {
        return this.activeHosts.stream().map(Host::asEntry).collect(Collectors.toList());
    }

    private static <T> Single<T> failedLBClosed() {
        return Single.failed(new IllegalStateException("LoadBalancer has closed"));
    }

    private static final class StacklessNoAvailableHostException
    extends NoAvailableHostException {
        private StacklessNoAvailableHostException(String message) {
            super(message);
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }

        public static StacklessNoAvailableHostException newInstance(String message, Class<?> clazz, String method) {
            return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method);
        }
    }

    private static class Host<Addr, C extends ListenableAsyncCloseable>
    implements AsyncCloseable {
        private static final AtomicReferenceFieldUpdater<Host, List> connectionsUpdater = AtomicReferenceFieldUpdater.newUpdater(Host.class, List.class, "connections");
        final Addr address;
        private volatile List<C> connections = Collections.emptyList();

        Host(Addr address) {
            this.address = Objects.requireNonNull(address);
        }

        void markInactive() {
            List toRemove = connectionsUpdater.getAndSet(this, CLOSED_LIST);
            LOGGER.debug("Closing {} connection(s) gracefully to inactive address: {}", (Object)toRemove.size(), (Object)this.address);
            for (ListenableAsyncCloseable conn : toRemove) {
                conn.closeAsyncGracefully().subscribe();
            }
        }

        boolean isInactive() {
            return this.connections == CLOSED_LIST;
        }

        boolean addConnection(C connection) {
            ArrayList<C> connectionAdded;
            List<C> existing;
            do {
                if ((existing = this.connections) == CLOSED_LIST) {
                    return false;
                }
                connectionAdded = new ArrayList<C>(existing);
                connectionAdded.add(connection);
            } while (!connectionsUpdater.compareAndSet(this, existing, connectionAdded));
            connection.onClose().beforeFinally(() -> {
                ArrayList<C> connectionRemoved;
                List<C> existing;
                while ((existing = this.connections) != CLOSED_LIST && (connectionRemoved = new ArrayList<C>(existing)).remove(connection) && !connectionsUpdater.compareAndSet(this, existing, connectionRemoved)) {
                }
            }).subscribe();
            return true;
        }

        Map.Entry<Addr, List<C>> asEntry() {
            return new AbstractMap.SimpleImmutableEntry<Addr, ArrayList<C>>(this.address, new ArrayList<C>(this.connections));
        }

        @Override
        public Completable closeAsync() {
            return this.doClose(AsyncCloseable::closeAsync);
        }

        @Override
        public Completable closeAsyncGracefully() {
            return this.doClose(AsyncCloseable::closeAsyncGracefully);
        }

        private Completable doClose(Function<? super C, Completable> closeFunction) {
            return Single.defer(() -> Single.succeeded(connectionsUpdater.getAndSet(this, CLOSED_LIST))).flatMapCompletable(list -> Completable.mergeAllDelayError(list.stream().map(closeFunction)::iterator));
        }

        public String toString() {
            return "Host{address=" + this.address + ", removed=" + (this.connections == CLOSED_LIST) + '}';
        }
    }

    public static final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
    implements LoadBalancerFactory<ResolvedAddress, C> {
        @Override
        public <T extends C> LoadBalancer<T> newLoadBalancer(Publisher<? extends ServiceDiscovererEvent<ResolvedAddress>> eventPublisher, ConnectionFactory<ResolvedAddress, T> connectionFactory) {
            return new RoundRobinLoadBalancer<ResolvedAddress, T>(eventPublisher, connectionFactory);
        }
    }
}

