/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.dns.discovery.netty;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.ResolvedAddressTypes;
import io.netty.resolver.dns.DefaultDnsCache;
import io.netty.resolver.dns.DnsCache;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscoverer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.client.api.internal.ServiceDiscovererUtils;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.EmptySubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.RejectedSubscribeError;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.dns.discovery.netty.DnsResolverAddressTypes;
import io.servicetalk.dns.discovery.netty.DnsServerAddressStream;
import io.servicetalk.dns.discovery.netty.DnsServerAddressStreamProvider;
import io.servicetalk.dns.discovery.netty.MinTtlCache;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DefaultDnsServiceDiscoverer
implements ServiceDiscoverer<String, InetAddress, ServiceDiscovererEvent<InetAddress>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDnsServiceDiscoverer.class);
    private static final Comparator<InetAddress> INET_ADDRESS_COMPARATOR = Comparator.comparing(o -> ByteBuffer.wrap(o.getAddress()));
    private static final Cancellable TERMINATED = () -> {};
    private final CompletableSource.Processor closeCompletable = Processors.newCompletableProcessor();
    private final Map<String, List<DiscoverEntry>> registerMap = new HashMap<String, List<DiscoverEntry>>(8);
    private final EventLoopAwareNettyIoExecutor nettyIoExecutor;
    private final DnsNameResolver resolver;
    private final MinTtlCache ttlCache;
    private final Predicate<Throwable> invalidateHostsOnDnsFailure;
    private boolean closed;

    DefaultDnsServiceDiscoverer(IoExecutor ioExecutor, int minTTL, @Nullable Integer ndots, Predicate<Throwable> invalidateHostsOnDnsFailure, @Nullable Boolean optResourceEnabled, @Nullable Duration queryTimeout, @Nullable DnsResolverAddressTypes dnsResolverAddressTypes, @Nullable DnsServerAddressStreamProvider dnsServerAddressStreamProvider) {
        this.nettyIoExecutor = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor((IoExecutor)ioExecutor).next();
        this.ttlCache = new MinTtlCache((DnsCache)new DefaultDnsCache(minTTL, Integer.MAX_VALUE, minTTL), minTTL);
        this.invalidateHostsOnDnsFailure = invalidateHostsOnDnsFailure;
        EventLoop eventLoop = this.nettyIoExecutor.eventLoopGroup().next();
        Class socketChannelClass = BuilderUtils.socketChannel((EventLoopGroup)eventLoop, InetSocketAddress.class);
        DnsNameResolverBuilder builder = new DnsNameResolverBuilder(eventLoop).resolveCache((DnsCache)this.ttlCache).channelType(BuilderUtils.datagramChannel((EventLoopGroup)eventLoop)).socketChannelType(socketChannelClass).completeOncePreferredResolved(true);
        if (queryTimeout != null) {
            builder.queryTimeoutMillis(queryTimeout.toMillis());
        }
        if (ndots != null) {
            builder.ndots(ndots.intValue());
        }
        if (optResourceEnabled != null) {
            builder.optResourceEnabled(optResourceEnabled.booleanValue());
        }
        if (dnsServerAddressStreamProvider != null) {
            builder.nameServerProvider(DefaultDnsServiceDiscoverer.toNettyType(dnsServerAddressStreamProvider));
        }
        if (dnsResolverAddressTypes != null) {
            builder.resolvedAddressTypes(DefaultDnsServiceDiscoverer.toNettyType(dnsResolverAddressTypes));
        }
        this.resolver = builder.build();
        LOGGER.debug("Created a new DNS discoverer {} with minimum TTL (seconds): {}, ndots: {}, optResourceEnabled {}, dnsResolverAddressTypes {}, dnsServerAddressStreamProvider {}.", new Object[]{this, minTTL, ndots, optResourceEnabled, dnsResolverAddressTypes, dnsServerAddressStreamProvider});
    }

    public Publisher<ServiceDiscovererEvent<InetAddress>> discover(String address) {
        DiscoverEntry entry;
        if (this.nettyIoExecutor.isCurrentThreadEventLoop()) {
            if (this.closed) {
                return Publisher.failed((Throwable)new IllegalStateException(DefaultDnsServiceDiscoverer.class.getSimpleName() + " closed!"));
            }
            entry = new DiscoverEntry(address);
            this.addEntry0(entry);
        } else {
            entry = new DiscoverEntry(address);
            this.nettyIoExecutor.asExecutor().execute(() -> {
                if (this.closed) {
                    entry.close0();
                } else {
                    this.addEntry0(entry);
                }
            });
        }
        return entry.publisher;
    }

    private void addEntry0(DiscoverEntry entry) {
        this.assertInEventloop();
        this.registerMap.computeIfAbsent(entry.inetHost, k -> new ArrayList(2)).add(entry);
    }

    private void removeEntry0(DiscoverEntry entry) {
        this.assertInEventloop();
        LOGGER.debug("DNS discoverer {}, cancelled DNS resolution for {}.", (Object)this, (Object)entry.inetHost);
        List<DiscoverEntry> entries = this.registerMap.get(entry.inetHost);
        if (entries == null) {
            return;
        }
        entries.remove(entry);
        if (entries.isEmpty()) {
            this.registerMap.remove(entry.inetHost);
        }
    }

    public Completable onClose() {
        return SourceAdapters.fromSource((CompletableSource)this.closeCompletable);
    }

    public Completable closeAsync() {
        return new SubscribableCompletable(){

            protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
                DefaultDnsServiceDiscoverer.this.closeCompletable.subscribe(subscriber);
                if (DefaultDnsServiceDiscoverer.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                    DefaultDnsServiceDiscoverer.this.closeAsync0();
                } else {
                    DefaultDnsServiceDiscoverer.this.nettyIoExecutor.asExecutor().execute(() -> DefaultDnsServiceDiscoverer.this.closeAsync0());
                }
            }
        };
    }

    private void closeAsync0() {
        this.assertInEventloop();
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.resolver.close();
        Throwable aggregateCause = null;
        for (Map.Entry<String, List<DiscoverEntry>> mapEntry : this.registerMap.entrySet()) {
            for (DiscoverEntry entry : mapEntry.getValue()) {
                try {
                    entry.close0();
                }
                catch (Throwable cause) {
                    if (aggregateCause == null) {
                        aggregateCause = new RuntimeException("Unexpected exception completing " + entry + " when closing " + this, cause);
                        continue;
                    }
                    aggregateCause.addSuppressed(cause);
                }
            }
        }
        this.registerMap.clear();
        if (aggregateCause != null) {
            LOGGER.debug("Closed with error", aggregateCause);
            this.closeCompletable.onError(aggregateCause);
        } else {
            LOGGER.debug("Successfully closed");
            this.closeCompletable.onComplete();
        }
    }

    private void assertInEventloop() {
        assert (this.nettyIoExecutor.isCurrentThreadEventLoop()) : "Must be called from the associated eventloop.";
    }

    private static ResolvedAddressTypes toNettyType(DnsResolverAddressTypes dnsResolverAddressTypes) {
        switch (dnsResolverAddressTypes) {
            case IPV4_ONLY: {
                return ResolvedAddressTypes.IPV4_ONLY;
            }
            case IPV6_ONLY: {
                return ResolvedAddressTypes.IPV6_ONLY;
            }
            case IPV6_PREFERRED: {
                return ResolvedAddressTypes.IPV6_PREFERRED;
            }
            case IPV4_PREFERRED: {
                return ResolvedAddressTypes.IPV4_PREFERRED;
            }
        }
        throw new Error();
    }

    private static io.netty.resolver.dns.DnsServerAddressStreamProvider toNettyType(DnsServerAddressStreamProvider provider) {
        return hostname -> new ServiceTalkToNettyDnsServerAddressStream(provider.nameServerAddressStream(hostname));
    }

    private static final class ClosedServiceDiscovererException
    extends RuntimeException
    implements RejectedSubscribeError {
        ClosedServiceDiscovererException(String message) {
            super(message);
        }
    }

    private static final class ServiceTalkToNettyDnsServerAddressStream
    implements io.netty.resolver.dns.DnsServerAddressStream {
        private final DnsServerAddressStream stream;

        ServiceTalkToNettyDnsServerAddressStream(DnsServerAddressStream stream) {
            this.stream = stream;
        }

        public InetSocketAddress next() {
            return this.stream.next();
        }

        public int size() {
            return this.stream.size();
        }

        public io.netty.resolver.dns.DnsServerAddressStream duplicate() {
            return new ServiceTalkToNettyDnsServerAddressStream(this.stream.duplicate());
        }
    }

    private final class DiscoverEntry {
        private final String inetHost;
        private final EntriesPublisher entriesPublisher = new EntriesPublisher();
        private final Publisher<ServiceDiscovererEvent<InetAddress>> publisher;

        DiscoverEntry(String inetHost) {
            this.inetHost = inetHost;
            this.publisher = new EntriesPublisher().flatMapConcatIterable(Function.identity());
        }

        void close0() {
            this.entriesPublisher.close0();
        }

        private final class EntriesPublisher
        extends SubscribablePublisher<Iterable<ServiceDiscovererEvent<InetAddress>>> {
            @Nullable
            private PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> discoverySubscriber;
            @Nullable
            private EntriesPublisherSubscription subscription;

            private EntriesPublisher() {
            }

            protected void handleSubscribe(PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> subscriber) {
                if (DefaultDnsServiceDiscoverer.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                    this.handleSubscribe0(subscriber);
                } else {
                    DefaultDnsServiceDiscoverer.this.nettyIoExecutor.asExecutor().execute(() -> this.handleSubscribe0(subscriber));
                }
            }

            private void handleSubscribe0(PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> subscriber) {
                DefaultDnsServiceDiscoverer.this.assertInEventloop();
                if (this.discoverySubscriber != null) {
                    subscriber.onSubscribe((PublisherSource.Subscription)EmptySubscription.EMPTY_SUBSCRIPTION);
                    subscriber.onError((Throwable)new DuplicateSubscribeException(this.discoverySubscriber, subscriber));
                } else if (DefaultDnsServiceDiscoverer.this.closed) {
                    subscriber.onSubscribe((PublisherSource.Subscription)EmptySubscription.EMPTY_SUBSCRIPTION);
                    subscriber.onError((Throwable)new ClosedServiceDiscovererException(DefaultDnsServiceDiscoverer.this + " has been closed!"));
                } else {
                    this.subscription = new EntriesPublisherSubscription(subscriber);
                    this.discoverySubscriber = subscriber;
                    LOGGER.debug("DNS discoverer {}, starting DNS resolution for {}.", (Object)DefaultDnsServiceDiscoverer.this, (Object)DiscoverEntry.this.inetHost);
                    subscriber.onSubscribe((PublisherSource.Subscription)this.subscription);
                }
            }

            void close0() {
                DefaultDnsServiceDiscoverer.this.assertInEventloop();
                PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> oldSubscriber = this.discoverySubscriber;
                this.discoverySubscriber = null;
                if (oldSubscriber != null) {
                    assert (this.subscription != null);
                    this.subscription.cancelWithoutRemove0();
                    oldSubscriber.onError((Throwable)new ClosedServiceDiscovererException(DefaultDnsServiceDiscoverer.this + " has been closed!"));
                }
            }

            private final class EntriesPublisherSubscription
            implements PublisherSource.Subscription {
                private final PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> subscriber;
                private long pendingRequests;
                private List<InetAddress> activeAddresses;
                private long resolveDoneNoScheduleTime;
                @Nullable
                private Cancellable cancellableForQuery;
                private long ttlNanos;

                EntriesPublisherSubscription(PublisherSource.Subscriber<? super Iterable<ServiceDiscovererEvent<InetAddress>>> subscriber) {
                    this.subscriber = subscriber;
                    this.activeAddresses = Collections.emptyList();
                    this.ttlNanos = -1L;
                }

                public void request(long n) {
                    if (DefaultDnsServiceDiscoverer.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                        this.request0(n);
                    } else {
                        DefaultDnsServiceDiscoverer.this.nettyIoExecutor.asExecutor().execute(() -> this.request0(n));
                    }
                }

                public void cancel() {
                    if (DefaultDnsServiceDiscoverer.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                        this.cancel0();
                    } else {
                        DefaultDnsServiceDiscoverer.this.nettyIoExecutor.asExecutor().execute(this::cancel0);
                    }
                }

                private void request0(long n) {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    if (!SubscriberUtils.isRequestNValid((long)n)) {
                        this.handleError0(SubscriberUtils.newExceptionForInvalidRequestN((long)n), __ -> false);
                        return;
                    }
                    this.pendingRequests = FlowControlUtils.addWithOverflowProtectionIfNotNegative((long)this.pendingRequests, (long)n);
                    if (this.cancellableForQuery == null) {
                        if (this.ttlNanos < 0L) {
                            this.doQuery0();
                        } else {
                            long durationNs = System.nanoTime() - this.resolveDoneNoScheduleTime;
                            if (durationNs > this.ttlNanos) {
                                this.doQuery0();
                            } else {
                                this.scheduleQuery0(this.ttlNanos - durationNs);
                            }
                        }
                    }
                }

                private void doQuery0() {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    LOGGER.trace("DNS discoverer {}, querying DNS for {}.", (Object)DefaultDnsServiceDiscoverer.this, (Object)DiscoverEntry.this.inetHost);
                    DefaultDnsServiceDiscoverer.this.ttlCache.prepareForResolution(DiscoverEntry.this.inetHost);
                    Future addressFuture = DefaultDnsServiceDiscoverer.this.resolver.resolveAll(DiscoverEntry.this.inetHost);
                    this.cancellableForQuery = () -> addressFuture.cancel(true);
                    if (addressFuture.isDone()) {
                        this.handleResolveDone0((Future<List<InetAddress>>)addressFuture);
                    } else {
                        addressFuture.addListener((GenericFutureListener)((FutureListener)this::handleResolveDone0));
                    }
                }

                private void cancel0() {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    DefaultDnsServiceDiscoverer.this.removeEntry0(DiscoverEntry.this);
                    this.cancelWithoutRemove0();
                }

                private void cancelWithoutRemove0() {
                    if (this.cancellableForQuery != null) {
                        this.cancellableForQuery = TERMINATED;
                        EntriesPublisher.this.discoverySubscriber = null;
                        this.pendingRequests = -1L;
                        this.cancellableForQuery.cancel();
                    }
                }

                private void scheduleQuery0(long nanos) {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    LOGGER.trace("DNS discoverer {}, scheduling DNS query for {} after {} nanos.", new Object[]{DefaultDnsServiceDiscoverer.this, DiscoverEntry.this.inetHost, nanos});
                    this.cancellableForQuery = DefaultDnsServiceDiscoverer.this.nettyIoExecutor.asExecutor().schedule(this::doQuery0, nanos, TimeUnit.NANOSECONDS);
                }

                private void handleResolveDone0(Future<List<InetAddress>> addressFuture) {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    if (EntriesPublisher.this.discoverySubscriber != null) {
                        Throwable cause = addressFuture.cause();
                        if (cause != null) {
                            this.handleError0(cause, DefaultDnsServiceDiscoverer.this.invalidateHostsOnDnsFailure);
                        } else {
                            List addresses = (List)addressFuture.getNow();
                            List events = ServiceDiscovererUtils.calculateDifference(this.activeAddresses, (List)addresses, (Comparator)INET_ADDRESS_COMPARATOR);
                            this.ttlNanos = TimeUnit.SECONDS.toNanos(DefaultDnsServiceDiscoverer.this.ttlCache.minTtl(DiscoverEntry.this.inetHost));
                            if (events != null) {
                                --this.pendingRequests;
                                if (this.pendingRequests > 0L) {
                                    this.scheduleQuery0(this.ttlNanos);
                                } else {
                                    this.resolveDoneNoScheduleTime = System.nanoTime();
                                    this.cancellableForQuery = null;
                                }
                                this.activeAddresses = addresses;
                                try {
                                    LOGGER.debug("DNS discoverer {}, sending events for address {}: (size {}) {}.", new Object[]{DefaultDnsServiceDiscoverer.this, DiscoverEntry.this.inetHost, events.size(), events});
                                    this.subscriber.onNext((Object)events);
                                }
                                catch (Throwable error) {
                                    this.handleError0(error, __ -> false);
                                }
                            } else {
                                LOGGER.trace("DNS discoverer {}, resolution done but no changes observed for {}. Resolution result: (size {}) {}", new Object[]{DefaultDnsServiceDiscoverer.this, DiscoverEntry.this.inetHost, addresses.size(), addresses});
                                this.scheduleQuery0(this.ttlNanos);
                            }
                        }
                    }
                }

                private void handleError0(Throwable cause, Predicate<Throwable> invalidateHostsOnDnsFailure) {
                    DefaultDnsServiceDiscoverer.this.assertInEventloop();
                    LOGGER.debug("DNS discoverer {}, DNS lookup failed for {}.", new Object[]{DefaultDnsServiceDiscoverer.this, DiscoverEntry.this.inetHost, cause});
                    boolean wasAlreadyTerminated = EntriesPublisher.this.discoverySubscriber == null;
                    EntriesPublisher.this.discoverySubscriber = null;
                    this.cancel0();
                    if (wasAlreadyTerminated) {
                        return;
                    }
                    if (invalidateHostsOnDnsFailure.test(cause)) {
                        List<InetAddress> addresses = this.activeAddresses;
                        ArrayList<DefaultServiceDiscovererEvent> events = new ArrayList<DefaultServiceDiscovererEvent>(addresses.size());
                        if (addresses instanceof RandomAccess) {
                            for (int i = 0; i < addresses.size(); ++i) {
                                events.add(new DefaultServiceDiscovererEvent((Object)addresses.get(i), false));
                            }
                        } else {
                            for (InetAddress address : addresses) {
                                events.add(new DefaultServiceDiscovererEvent((Object)address, false));
                            }
                        }
                        try {
                            this.subscriber.onNext(events);
                        }
                        catch (Throwable e) {
                            LOGGER.warn("Exception from subscriber while handling error", e);
                        }
                    }
                    this.subscriber.onError(cause);
                }
            }
        }
    }
}

