/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancerReadyEvent;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.DefaultHost;
import io.servicetalk.loadbalancer.Exceptions;
import io.servicetalk.loadbalancer.HealthCheckConfig;
import io.servicetalk.loadbalancer.Host;
import io.servicetalk.loadbalancer.HostSelector;
import io.servicetalk.loadbalancer.NormalizedTimeSourceExecutor;
import io.servicetalk.loadbalancer.RoundRobinSelector;
import io.servicetalk.loadbalancer.TestableLoadBalancer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements TestableLoadBalancer<ResolvedAddress, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NewRoundRobinLoadBalancer.class);
    private static final AtomicReferenceFieldUpdater<NewRoundRobinLoadBalancer, List> usedHostsUpdater = AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts");
    private static final AtomicLongFieldUpdater<NewRoundRobinLoadBalancer> nextResubscribeTimeUpdater = AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime");
    private static final long RESUBSCRIBING = -1L;
    private volatile long nextResubscribeTime = -1L;
    private volatile List<Host<ResolvedAddress, C>> usedHosts = Collections.emptyList();
    private final String targetResource;
    private final String lbDescription;
    private final HostSelector<ResolvedAddress, C> hostSelector;
    private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
    private final PublisherSource.Processor<Object, Object> eventStreamProcessor = Processors.newPublisherProcessorDropHeadOnOverflow(32);
    private final Publisher<Object> eventStream;
    private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
    private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
    private final int linearSearchSpace;
    @Nullable
    private final HealthCheckConfig healthCheckConfig;
    private final ListenableAsyncCloseable asyncCloseable;

    NewRoundRobinLoadBalancer(String id, String targetResourceName, Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher, ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory, int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
        this.targetResource = Objects.requireNonNull(targetResourceName);
        this.lbDescription = this.makeDescription(id, this.targetResource);
        this.hostSelector = new RoundRobinSelector(this.targetResource);
        this.eventPublisher = Objects.requireNonNull(eventPublisher);
        this.eventStream = SourceAdapters.fromSource(this.eventStreamProcessor).replay(1);
        this.connectionFactory = Objects.requireNonNull(connectionFactory);
        this.linearSearchSpace = linearSearchSpace;
        this.healthCheckConfig = healthCheckConfig;
        this.asyncCloseable = AsyncCloseables.toAsyncCloseable(graceful -> {
            List<Host<ResolvedAddress, C>> currentList;
            this.discoveryCancellable.cancel();
            this.eventStreamProcessor.onComplete();
            while (!NewRoundRobinLoadBalancer.isClosedList(currentList = this.usedHosts) && !usedHostsUpdater.compareAndSet(this, currentList, new ClosedList(currentList))) {
            }
            CompositeCloseable compositeCloseable = AsyncCloseables.newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory);
            LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.", this, graceful ? "" : "non", currentList.size(), currentList);
            return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()).beforeOnError(t -> {
                if (!graceful) {
                    this.usedHosts = new ClosedList<Host<ResolvedAddress, C>>(Collections.emptyList());
                }
            }).beforeOnComplete(() -> {
                this.usedHosts = new ClosedList<Host<ResolvedAddress, C>>(Collections.emptyList());
            });
        });
        this.eventStream.ignoreElements().subscribe();
        this.subscribeToEvents(false);
    }

    private void subscribeToEvents(boolean resubscribe) {
        assert (this.nextResubscribeTime == -1L);
        if (resubscribe) {
            LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", (Object)this);
            this.discoveryCancellable.cancelCurrent();
        }
        SourceAdapters.toSource(this.eventPublisher).subscribe(new EventSubscriber(resubscribe));
        if (this.healthCheckConfig != null) {
            assert (this.healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor);
            this.nextResubscribeTime = NewRoundRobinLoadBalancer.nextResubscribeTime(this.healthCheckConfig, this);
        }
    }

    private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(HealthCheckConfig config, NewRoundRobinLoadBalancer<R, C> lb) {
        long lower = config.healthCheckResubscribeLowerBound;
        long upper = config.healthCheckResubscribeUpperBound;
        long currentTime = config.executor.currentTime(TimeUnit.NANOSECONDS);
        long result = currentTime + (lower == upper ? lower : ThreadLocalRandom.current().nextLong(lower, upper));
        LOGGER.debug("{}: current time {}, next resubscribe attempt can be performed at {}.", lb, currentTime, result);
        return result;
    }

    private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(List<Host<ResolvedAddress, C>> usedHosts) {
        boolean allUnhealthy = !usedHosts.isEmpty();
        for (Host<ResolvedAddress, C> host : usedHosts) {
            if (host.isUnhealthy()) continue;
            allUnhealthy = false;
            break;
        }
        return allUnhealthy;
    }

    private static <ResolvedAddress> boolean onlyAvailable(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
        boolean onlyAvailable = !events.isEmpty();
        for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
            if (ServiceDiscovererEvent.Status.AVAILABLE.equals(event.status())) continue;
            onlyAvailable = false;
            break;
        }
        return onlyAvailable;
    }

    private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAvailable(Host<ResolvedAddress, C> host, Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
        boolean available = false;
        for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
            if (!host.address().equals(event.address())) continue;
            available = true;
            break;
        }
        return !available;
    }

    private static <T> Single<T> failedLBClosed(String targetResource) {
        return Single.failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
    }

    @Override
    public Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context) {
        return Single.defer(() -> this.selectConnection0(selector, context, false).shareContextOnSubscribe());
    }

    @Override
    public Single<C> newConnection(@Nullable ContextMap context) {
        return Single.defer(() -> this.selectConnection0(c -> true, context, true).shareContextOnSubscribe());
    }

    private Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context, boolean forceNewConnectionAndReserve) {
        List<Host<ResolvedAddress, C>> currentHosts = this.usedHosts;
        if (currentHosts.isEmpty()) {
            return NewRoundRobinLoadBalancer.isClosedList(currentHosts) ? NewRoundRobinLoadBalancer.failedLBClosed(this.targetResource) : Single.failed(Exceptions.StacklessNoAvailableHostException.newInstance("No hosts are available to connect for " + this.targetResource + ".", this.getClass(), "selectConnection0(...)"));
        }
        Single<C> result = this.hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve);
        if (this.healthCheckConfig != null) {
            result = result.beforeOnError(exn -> {
                long currNextResubscribeTime;
                if (exn instanceof Exceptions.StacklessNoActiveHostException && NewRoundRobinLoadBalancer.allUnhealthy(currentHosts) && (currNextResubscribeTime = this.nextResubscribeTime) >= 0L && this.healthCheckConfig.executor.currentTime(TimeUnit.NANOSECONDS) >= currNextResubscribeTime && nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, -1L)) {
                    this.subscribeToEvents(true);
                }
            });
        }
        return result;
    }

    @Override
    public Publisher<Object> eventStream() {
        return this.eventStream;
    }

    public String toString() {
        return this.lbDescription;
    }

    @Override
    public Completable onClose() {
        return this.asyncCloseable.onClose();
    }

    @Override
    public Completable onClosing() {
        return this.asyncCloseable.onClosing();
    }

    @Override
    public Completable closeAsync() {
        return this.asyncCloseable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.asyncCloseable.closeAsyncGracefully();
    }

    @Override
    public List<Map.Entry<ResolvedAddress, List<C>>> usedAddresses() {
        return this.usedHosts.stream().map(host -> ((DefaultHost)host).asEntry()).collect(Collectors.toList());
    }

    private static boolean isClosedList(List<?> list) {
        return list.getClass().equals(ClosedList.class);
    }

    private String makeDescription(String id, String targetResource) {
        return "NewRoundRobinLoadBalancer{id=" + id + '@' + Integer.toHexString(System.identityHashCode(this)) + ", targetResource=" + targetResource + '}';
    }

    private static final class ClosedList<T>
    implements List<T> {
        private final List<T> delegate;

        private ClosedList(List<T> delegate) {
            this.delegate = Objects.requireNonNull(delegate);
        }

        @Override
        public int size() {
            return this.delegate.size();
        }

        @Override
        public boolean isEmpty() {
            return this.delegate.isEmpty();
        }

        @Override
        public boolean contains(Object o) {
            return this.delegate.contains(o);
        }

        @Override
        public Iterator<T> iterator() {
            return this.delegate.iterator();
        }

        @Override
        public void forEach(Consumer<? super T> action) {
            this.delegate.forEach(action);
        }

        @Override
        public Object[] toArray() {
            return this.delegate.toArray();
        }

        @Override
        public <T1> T1[] toArray(T1[] a) {
            return this.delegate.toArray(a);
        }

        @Override
        public boolean add(T t) {
            return this.delegate.add(t);
        }

        @Override
        public boolean remove(Object o) {
            return this.delegate.remove(o);
        }

        @Override
        public boolean containsAll(Collection<?> c) {
            return this.delegate.containsAll(c);
        }

        @Override
        public boolean addAll(Collection<? extends T> c) {
            return this.delegate.addAll(c);
        }

        @Override
        public boolean addAll(int index, Collection<? extends T> c) {
            return this.delegate.addAll(c);
        }

        @Override
        public boolean removeAll(Collection<?> c) {
            return this.delegate.removeAll(c);
        }

        @Override
        public boolean removeIf(Predicate<? super T> filter) {
            return this.delegate.removeIf(filter);
        }

        @Override
        public boolean retainAll(Collection<?> c) {
            return this.delegate.retainAll(c);
        }

        @Override
        public void replaceAll(UnaryOperator<T> operator) {
            this.delegate.replaceAll(operator);
        }

        @Override
        public void sort(Comparator<? super T> c) {
            this.delegate.sort(c);
        }

        @Override
        public void clear() {
            this.delegate.clear();
        }

        @Override
        public T get(int index) {
            return this.delegate.get(index);
        }

        @Override
        public T set(int index, T element) {
            return this.delegate.set(index, element);
        }

        @Override
        public void add(int index, T element) {
            this.delegate.add(index, element);
        }

        @Override
        public T remove(int index) {
            return this.delegate.remove(index);
        }

        @Override
        public int indexOf(Object o) {
            return this.delegate.indexOf(o);
        }

        @Override
        public int lastIndexOf(Object o) {
            return this.delegate.lastIndexOf(o);
        }

        @Override
        public ListIterator<T> listIterator() {
            return this.delegate.listIterator();
        }

        @Override
        public ListIterator<T> listIterator(int index) {
            return this.delegate.listIterator(index);
        }

        @Override
        public List<T> subList(int fromIndex, int toIndex) {
            return new ClosedList<T>(this.delegate.subList(fromIndex, toIndex));
        }

        @Override
        public Spliterator<T> spliterator() {
            return this.delegate.spliterator();
        }

        @Override
        public Stream<T> stream() {
            return this.delegate.stream();
        }

        @Override
        public Stream<T> parallelStream() {
            return this.delegate.parallelStream();
        }
    }

    private final class EventSubscriber
    implements PublisherSource.Subscriber<Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> {
        private boolean firstEventsAfterResubscribe;

        EventSubscriber(boolean resubscribe) {
            this.firstEventsAfterResubscribe = resubscribe;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            s.request(Long.MAX_VALUE);
            NewRoundRobinLoadBalancer.this.discoveryCancellable.nextCancellable(s);
        }

        @Override
        public void onNext(@Nullable Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
            if (events == null) {
                LOGGER.debug("{}: unexpectedly received null instead of events.", (Object)NewRoundRobinLoadBalancer.this);
                return;
            }
            for (ServiceDiscovererEvent event : events) {
                ServiceDiscovererEvent.Status eventStatus = event.status();
                LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.", NewRoundRobinLoadBalancer.this, event, eventStatus);
                List usedAddresses = (List)usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, oldHosts -> {
                    if (NewRoundRobinLoadBalancer.isClosedList(oldHosts)) {
                        return oldHosts;
                    }
                    Object addr = Objects.requireNonNull(event.address());
                    List oldHostsTyped = oldHosts;
                    if (ServiceDiscovererEvent.Status.AVAILABLE.equals(eventStatus)) {
                        return this.addHostToList(oldHostsTyped, addr);
                    }
                    if (ServiceDiscovererEvent.Status.EXPIRED.equals(eventStatus)) {
                        if (oldHostsTyped.isEmpty()) {
                            return Collections.emptyList();
                        }
                        return this.markHostAsExpired(oldHostsTyped, addr);
                    }
                    if (ServiceDiscovererEvent.Status.UNAVAILABLE.equals(eventStatus)) {
                        return this.listWithHostRemoved(oldHostsTyped, host -> {
                            boolean match = host.address().equals(addr);
                            if (match) {
                                host.markClosed();
                            }
                            return match;
                        });
                    }
                    LOGGER.error("{}: Unexpected Status in event: {} (mapped to {}). Leaving usedHosts unchanged: {}", NewRoundRobinLoadBalancer.this, event, eventStatus, oldHosts);
                    return oldHosts;
                });
                LOGGER.debug("{}: now using addresses (size={}): {}.", NewRoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses);
                if (ServiceDiscovererEvent.Status.AVAILABLE.equals(eventStatus)) {
                    if (usedAddresses.size() != 1) continue;
                    NewRoundRobinLoadBalancer.this.eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT);
                    continue;
                }
                if (!usedAddresses.isEmpty()) continue;
                NewRoundRobinLoadBalancer.this.eventStreamProcessor.onNext(LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT);
            }
            List currentHosts = NewRoundRobinLoadBalancer.this.usedHosts;
            if (this.firstEventsAfterResubscribe) {
                if (events.isEmpty()) {
                    return;
                }
                this.firstEventsAfterResubscribe = false;
                if (!NewRoundRobinLoadBalancer.onlyAvailable(events)) {
                    return;
                }
                for (Host host : currentHosts) {
                    if (!NewRoundRobinLoadBalancer.notAvailable(host, events)) continue;
                    host.closeAsyncGracefully().subscribe();
                }
            }
        }

        private List<Host<ResolvedAddress, C>> markHostAsExpired(List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
            for (Host host : oldHostsTyped) {
                if (!host.address().equals(addr)) continue;
                host.markExpired();
                break;
            }
            return oldHostsTyped;
        }

        private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
            DefaultHost host = new DefaultHost(NewRoundRobinLoadBalancer.this.toString(), addr, NewRoundRobinLoadBalancer.this.connectionFactory, NewRoundRobinLoadBalancer.this.linearSearchSpace, NewRoundRobinLoadBalancer.this.healthCheckConfig);
            host.onClose().afterFinally(() -> {
                List cfr_ignored_0 = (List)usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> {
                    List previousHostsTyped = previousHosts;
                    return this.listWithHostRemoved(previousHostsTyped, current -> current == host);
                });
            }).subscribe();
            return host;
        }

        private List<Host<ResolvedAddress, C>> addHostToList(List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
            if (oldHostsTyped.isEmpty()) {
                return Collections.singletonList(this.createHost(addr));
            }
            for (Host host : oldHostsTyped) {
                if (!host.address().equals(addr)) continue;
                if (!host.markActiveIfNotClosed()) break;
                return oldHostsTyped;
            }
            ArrayList newHosts = new ArrayList(oldHostsTyped.size() + 1);
            newHosts.addAll(oldHostsTyped);
            newHosts.add(this.createHost(addr));
            return newHosts;
        }

        private List<Host<ResolvedAddress, C>> listWithHostRemoved(List<Host<ResolvedAddress, C>> oldHostsTyped, Predicate<Host<ResolvedAddress, C>> hostPredicate) {
            if (oldHostsTyped.isEmpty()) {
                return oldHostsTyped;
            }
            ArrayList newHosts = new ArrayList(oldHostsTyped.size() - 1);
            for (int i = 0; i < oldHostsTyped.size(); ++i) {
                Host current = oldHostsTyped.get(i);
                if (hostPredicate.test(current)) {
                    for (int x = i + 1; x < oldHostsTyped.size(); ++x) {
                        newHosts.add(oldHostsTyped.get(x));
                    }
                    return newHosts.isEmpty() ? Collections.emptyList() : newHosts;
                }
                newHosts.add(current);
            }
            return newHosts;
        }

        @Override
        public void onError(Throwable t) {
            List hosts = NewRoundRobinLoadBalancer.this.usedHosts;
            if (NewRoundRobinLoadBalancer.this.healthCheckConfig == null) {
                NewRoundRobinLoadBalancer.this.eventStreamProcessor.onError(t);
            }
            LOGGER.error("{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.", NewRoundRobinLoadBalancer.this, NewRoundRobinLoadBalancer.this.eventPublisher, hosts.size(), hosts, t);
        }

        @Override
        public void onComplete() {
            List hosts = NewRoundRobinLoadBalancer.this.usedHosts;
            if (NewRoundRobinLoadBalancer.this.healthCheckConfig == null) {
                NewRoundRobinLoadBalancer.this.eventStreamProcessor.onComplete();
            }
            LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.", NewRoundRobinLoadBalancer.this, hosts.size(), hosts);
        }
    }
}

