/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.dns.discovery.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.dns.DefaultDnsQuestion;
import io.netty.handler.codec.dns.DefaultDnsRecordDecoder;
import io.netty.handler.codec.dns.DnsRawRecord;
import io.netty.handler.codec.dns.DnsRecord;
import io.netty.handler.codec.dns.DnsRecordType;
import io.netty.resolver.ResolvedAddressTypes;
import io.netty.resolver.dns.DefaultAuthoritativeDnsServerCache;
import io.netty.resolver.dns.DefaultDnsCache;
import io.netty.resolver.dns.DefaultDnsCnameCache;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.resolver.dns.NameServerComparator;
import io.netty.resolver.dns.NoopAuthoritativeDnsServerCache;
import io.netty.resolver.dns.NoopDnsCache;
import io.netty.resolver.dns.NoopDnsCnameCache;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.servicetalk.client.api.DefaultServiceDiscovererEvent;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncCloseables;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.PublisherOperator;
import io.servicetalk.concurrent.api.RepeatStrategies;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.CancelImmediatelySubscriber;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.RejectedSubscribeError;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.dns.discovery.netty.DefaultResolutionResult;
import io.servicetalk.dns.discovery.netty.DnsClient;
import io.servicetalk.dns.discovery.netty.DnsClients;
import io.servicetalk.dns.discovery.netty.DnsNameResolverBuilderUtils;
import io.servicetalk.dns.discovery.netty.DnsResolverAddressTypes;
import io.servicetalk.dns.discovery.netty.DnsServerAddressStream;
import io.servicetalk.dns.discovery.netty.DnsServerAddressStreamProvider;
import io.servicetalk.dns.discovery.netty.DnsServiceDiscovererObserver;
import io.servicetalk.dns.discovery.netty.MinTtlCache;
import io.servicetalk.dns.discovery.netty.ServiceDiscovererUtils;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.RandomAccess;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntFunction;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DefaultDnsClient
implements DnsClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDnsClient.class);
    private static final Comparator<InetAddress> INET_ADDRESS_COMPARATOR = Comparator.comparing(o -> ByteBuffer.wrap(o.getAddress()));
    private static final Comparator<HostAndPort> HOST_AND_PORT_COMPARATOR = Comparator.comparing(HostAndPort::hostName).thenComparingInt(HostAndPort::port);
    private static final Cancellable TERMINATED = () -> {};
    private final EventLoopAwareNettyIoExecutor nettyIoExecutor;
    private final DnsNameResolver resolver;
    private final MinTtlCache ttlCache;
    private final long maxTTLNanos;
    private final long ttlJitterNanos;
    private final ListenableAsyncCloseable asyncCloseable;
    @Nullable
    private final DnsServiceDiscovererObserver observer;
    private final ServiceDiscovererEvent.Status missingRecordStatus;
    private final IntFunction<? extends Completable> srvHostNameRepeater;
    private final int srvConcurrency;
    private final boolean srvFilterDuplicateEvents;
    private final boolean inactiveEventsOnError;
    private final DnsResolverAddressTypes addressTypes;
    private final String id;
    private boolean closed;

    DefaultDnsClient(String id, IoExecutor ioExecutor, int consolidateCacheSize, int minTTL, int maxTTL, int minCacheTTL, int maxCacheTTL, int negativeTTLCacheSeconds, long ttlJitterNanos, int srvConcurrency, boolean inactiveEventsOnError, boolean completeOncePreferredResolved, boolean srvFilterDuplicateEvents, Duration srvHostNameRepeatInitialDelay, Duration srvHostNameRepeatJitter, @Nullable Integer maxUdpPayloadSize, @Nullable Integer ndots, @Nullable Boolean optResourceEnabled, @Nullable Duration queryTimeout, DnsResolverAddressTypes dnsResolverAddressTypes, @Nullable SocketAddress localAddress, @Nullable DnsServerAddressStreamProvider dnsServerAddressStreamProvider, @Nullable DnsServiceDiscovererObserver observer, ServiceDiscovererEvent.Status missingRecordStatus) {
        this.srvConcurrency = srvConcurrency;
        this.srvFilterDuplicateEvents = srvFilterDuplicateEvents;
        this.inactiveEventsOnError = inactiveEventsOnError;
        this.nettyIoExecutor = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor(ioExecutor).next();
        this.srvHostNameRepeater = RepeatStrategies.repeatWithConstantBackoffDeltaJitter(srvHostNameRepeatInitialDelay, srvHostNameRepeatJitter, this.nettyIoExecutor);
        this.ttlCache = new MinTtlCache(maxCacheTTL == 0 && negativeTTLCacheSeconds == 0 ? NoopDnsCache.INSTANCE : new DefaultDnsCache(minCacheTTL, maxCacheTTL, negativeTTLCacheSeconds), minTTL, this.nettyIoExecutor);
        this.maxTTLNanos = TimeUnit.SECONDS.toNanos(maxTTL);
        this.ttlJitterNanos = ttlJitterNanos;
        this.addressTypes = dnsResolverAddressTypes;
        this.observer = observer;
        this.missingRecordStatus = missingRecordStatus;
        this.id = id + '@' + Integer.toHexString(System.identityHashCode(this));
        this.asyncCloseable = AsyncCloseables.toAsyncCloseable(graceful -> {
            if (this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                this.closeAsync0();
                return Completable.completed();
            }
            return this.nettyIoExecutor.submit(this::closeAsync0);
        });
        EventLoop eventLoop = this.nettyIoExecutor.eventLoopGroup().next();
        Class<? extends Channel> socketChannelClass = BuilderUtils.socketChannel((EventLoopGroup)eventLoop, InetSocketAddress.class);
        ResolvedAddressTypes resolvedAddressTypes = DnsResolverAddressTypes.toNettyType(this.addressTypes);
        DnsNameResolverBuilder builder = new DnsNameResolverBuilder(eventLoop).localAddress(localAddress).channelType(BuilderUtils.datagramChannel(eventLoop)).resolvedAddressTypes(resolvedAddressTypes).socketChannelType(socketChannelClass).completeOncePreferredResolved(completeOncePreferredResolved).resolveCache(this.ttlCache).cnameCache(maxCacheTTL == 0 ? NoopDnsCnameCache.INSTANCE : new DefaultDnsCnameCache(minCacheTTL, maxCacheTTL)).authoritativeDnsServerCache(maxCacheTTL == 0 ? NoopAuthoritativeDnsServerCache.INSTANCE : new DefaultAuthoritativeDnsServerCache(minCacheTTL, maxCacheTTL, new NameServerComparator(DnsResolverAddressTypes.preferredAddressType(resolvedAddressTypes).addressType())));
        DnsNameResolverBuilderUtils.consolidateCacheSize(id, builder, consolidateCacheSize);
        if (queryTimeout != null) {
            builder.queryTimeoutMillis(queryTimeout.toMillis());
        }
        if (maxUdpPayloadSize != null) {
            builder.maxPayloadSize(maxUdpPayloadSize);
        }
        if (ndots != null) {
            builder.ndots(ndots);
        }
        if (optResourceEnabled != null) {
            builder.optResourceEnabled(optResourceEnabled);
        }
        if (dnsServerAddressStreamProvider != null) {
            builder.nameServerProvider(DefaultDnsClient.toNettyType(dnsServerAddressStreamProvider));
        }
        this.resolver = builder.build();
    }

    public String toString() {
        return this.id;
    }

    MinTtlCache ttlCache() {
        return this.ttlCache;
    }

    @Nullable
    private DnsServiceDiscovererObserver.DnsDiscoveryObserver newDiscoveryObserver(String address) {
        if (this.observer == null) {
            return null;
        }
        try {
            return this.observer.onNewDiscovery(this.id, address);
        }
        catch (Throwable unexpected) {
            LOGGER.warn("{} unexpected exception from {} while reporting new DNS discovery for {}", this, this.observer, address, unexpected);
            return null;
        }
    }

    @Override
    public Publisher<Collection<ServiceDiscovererEvent<InetAddress>>> dnsQuery(String address) {
        Objects.requireNonNull(address);
        return Publisher.defer(() -> {
            final DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver = this.newDiscoveryObserver(address);
            ARecordPublisher pub = new ARecordPublisher(address, discoveryObserver);
            ARecordPublisher events = this.inactiveEventsOnError ? DefaultDnsClient.recoverWithInactiveEvents(pub, false) : pub;
            return discoveryObserver == null ? events : events.beforeFinally(new TerminalSignalConsumer(){

                @Override
                public void onComplete() {
                }

                @Override
                public void onError(Throwable cause) {
                    try {
                        discoveryObserver.discoveryFailed(cause);
                    }
                    catch (Throwable unexpected) {
                        ThrowableUtils.addSuppressed(unexpected, cause);
                        LOGGER.warn("{} Unexpected exception from observer while reporting discovery failure", (Object)DefaultDnsClient.this, (Object)unexpected);
                    }
                }

                @Override
                public void cancel() {
                    try {
                        discoveryObserver.discoveryCancelled();
                    }
                    catch (Throwable unexpected) {
                        LOGGER.warn("{} Unexpected exception from observer while reporting discovery cancellation", (Object)DefaultDnsClient.this, (Object)unexpected);
                    }
                }
            });
        });
    }

    @Override
    public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> dnsSrvQuery(String serviceName) {
        Objects.requireNonNull(serviceName);
        return Publisher.defer(() -> {
            HashMap aRecordMap = new HashMap(8);
            HashMap availableAddresses = this.srvFilterDuplicateEvents ? new HashMap(8) : Collections.emptyMap();
            final DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver = this.newDiscoveryObserver(serviceName);
            Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> events = DefaultDnsClient.recoverWithInactiveEvents(new SrvRecordPublisher(serviceName, discoveryObserver), true).flatMapConcatIterable(Function.identity()).flatMapMerge(srvEvent -> {
                this.assertInEventloop();
                if (ServiceDiscovererEvent.Status.AVAILABLE.equals(srvEvent.status())) {
                    return Publisher.defer(() -> {
                        ARecordPublisher aPublisher = new ARecordPublisher(((HostAndPort)srvEvent.address()).hostName(), discoveryObserver);
                        ARecordPublisher prevAPublisher = aRecordMap.putIfAbsent(((HostAndPort)srvEvent.address()).hostName(), aPublisher);
                        if (prevAPublisher != null) {
                            return DefaultDnsClient.newDuplicateSrv(serviceName, ((HostAndPort)srvEvent.address()).hostName());
                        }
                        Publisher<Collection<ServiceDiscovererEvent<InetAddress>>> returnPub = DefaultDnsClient.recoverWithInactiveEvents(aPublisher, false);
                        return this.srvFilterDuplicateEvents ? DefaultDnsClient.srvFilterDups(returnPub, availableAddresses, ((HostAndPort)srvEvent.address()).port()) : returnPub.map(ev -> DnsClients.mapEventList(ev, inetAddress -> new InetSocketAddress((InetAddress)inetAddress, ((HostAndPort)srvEvent.address()).port())));
                    }).retryWhen(false, (i, cause) -> {
                        this.assertInEventloop();
                        return cause.getClass().equals(SrvAddressRemovedException.class) || aRecordMap.remove(((HostAndPort)srvEvent.address()).hostName()) == null ? Completable.failed(cause) : this.srvHostNameRepeater.apply(i);
                    }).onErrorComplete();
                }
                if (srvEvent instanceof SrvInactiveEvent) {
                    return Publisher.from(((SrvInactiveEvent)srvEvent).aggregatedEvents);
                }
                ARecordPublisher aPublisher = (ARecordPublisher)aRecordMap.remove(((HostAndPort)srvEvent.address()).hostName());
                if (aPublisher != null) {
                    aPublisher.cancelAndFail0(SrvAddressRemovedException.newInstance(DefaultDnsClient.class, "dnsSrvQuery"));
                }
                return Publisher.empty();
            }, this.srvConcurrency).liftSync(this.inactiveEventsOnError ? SrvInactiveCombinerOperator.EMIT : SrvInactiveCombinerOperator.NO_EMIT);
            return discoveryObserver == null ? events : events.beforeFinally(new TerminalSignalConsumer(){

                @Override
                public void onComplete() {
                }

                @Override
                public void onError(Throwable cause) {
                    try {
                        discoveryObserver.discoveryFailed(cause);
                    }
                    catch (Throwable unexpected) {
                        ThrowableUtils.addSuppressed(unexpected, cause);
                        LOGGER.warn("{} Unexpected exception from observer while reporting discovery failure", (Object)DefaultDnsClient.this, (Object)unexpected);
                    }
                }

                @Override
                public void cancel() {
                    try {
                        discoveryObserver.discoveryCancelled();
                    }
                    catch (Throwable unexpected) {
                        LOGGER.warn("{} Unexpected exception from observer while reporting discovery cancellation", (Object)DefaultDnsClient.this, (Object)unexpected);
                    }
                }
            });
        });
    }

    @Override
    public Completable onClose() {
        return this.asyncCloseable.onClose();
    }

    @Override
    public Completable onClosing() {
        return this.asyncCloseable.onClosing();
    }

    @Override
    public Completable closeAsync() {
        return this.asyncCloseable.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.asyncCloseable.closeAsyncGracefully();
    }

    private void closeAsync0() {
        this.assertInEventloop();
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.resolver.close();
        this.ttlCache.clear();
    }

    private void assertInEventloop() {
        assert (this.nettyIoExecutor.isCurrentThreadEventLoop());
    }

    private static Publisher<? extends Collection<ServiceDiscovererEvent<InetSocketAddress>>> srvFilterDups(Publisher<? extends Collection<ServiceDiscovererEvent<InetAddress>>> returnPub, Map<InetSocketAddress, Integer> availableAddresses, int port) {
        return returnPub.map(events -> {
            ArrayList<DefaultServiceDiscovererEvent<InetSocketAddress>> mappedEvents = new ArrayList<DefaultServiceDiscovererEvent<InetSocketAddress>>(events.size());
            for (ServiceDiscovererEvent event : events) {
                InetSocketAddress addr = new InetSocketAddress((InetAddress)event.address(), port);
                ServiceDiscovererEvent.Status status = event.status();
                Integer count = (Integer)availableAddresses.get(addr);
                if (ServiceDiscovererEvent.Status.AVAILABLE.equals(status)) {
                    if (count == null) {
                        mappedEvents.add(new DefaultServiceDiscovererEvent<InetSocketAddress>(addr, status));
                        availableAddresses.put(addr, 1);
                        continue;
                    }
                    availableAddresses.put(addr, count + 1);
                    continue;
                }
                if (count == null) {
                    throw new IllegalStateException("null count for: " + addr);
                }
                if (count == 1) {
                    mappedEvents.add(new DefaultServiceDiscovererEvent<InetSocketAddress>(addr, status));
                    availableAddresses.remove(addr);
                    continue;
                }
                availableAddresses.put(addr, count - 1);
            }
            return mappedEvents;
        }).filter(events -> !events.isEmpty());
    }

    private static <T, A> Publisher<? extends Collection<ServiceDiscovererEvent<T>>> recoverWithInactiveEvents(AbstractDnsPublisher<T> pub, boolean generateAggregateEvent) {
        return pub.onErrorResume(cause -> {
            List events;
            AbstractDnsPublisher.AbstractDnsSubscription subscription = pub.subscription;
            if (subscription != null && !(events = subscription.generateInactiveEvent()).isEmpty()) {
                return (generateAggregateEvent ? Publisher.from(Collections.singletonList(new SrvInactiveEvent(subscription.missingRecordStatus())), events) : Publisher.from(events)).concat(Publisher.failed(cause));
            }
            return Publisher.failed(cause);
        });
    }

    private static <T> Publisher<T> newDuplicateSrv(String serviceName, String resolvedAddress) {
        return Publisher.failed(new IllegalStateException("Duplicate SRV entry for SRV name " + serviceName + " for address " + resolvedAddress));
    }

    private static io.netty.resolver.dns.DnsServerAddressStreamProvider toNettyType(DnsServerAddressStreamProvider provider) {
        return hostname -> new ServiceTalkToNettyDnsServerAddressStream(provider.nameServerAddressStream(hostname));
    }

    private static final class SrvAddressRemovedException
    extends RuntimeException {
        private static final long serialVersionUID = -4083873869084533456L;

        private SrvAddressRemovedException() {
        }

        static SrvAddressRemovedException newInstance(Class<?> clazz, String method) {
            return io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace(new SrvAddressRemovedException(), clazz, method);
        }
    }

    private static final class ClosedDnsServiceDiscovererException
    extends ClosedChannelException
    implements RejectedSubscribeError {
        private static final long serialVersionUID = -8092675984257002148L;

        private ClosedDnsServiceDiscovererException() {
        }
    }

    private static final class ServiceTalkToNettyDnsServerAddressStream
    implements io.netty.resolver.dns.DnsServerAddressStream {
        private final DnsServerAddressStream stream;

        ServiceTalkToNettyDnsServerAddressStream(DnsServerAddressStream stream) {
            this.stream = stream;
        }

        @Override
        public InetSocketAddress next() {
            return this.stream.next();
        }

        @Override
        public int size() {
            return this.stream.size();
        }

        @Override
        public io.netty.resolver.dns.DnsServerAddressStream duplicate() {
            return new ServiceTalkToNettyDnsServerAddressStream(this.stream.duplicate());
        }
    }

    private static final class SrvAggregateList<T>
    extends ArrayList<T> {
        private static final long serialVersionUID = -6105010311426084245L;

        private SrvAggregateList() {
        }
    }

    private static final class SrvInactiveEvent<T, A>
    implements ServiceDiscovererEvent<T> {
        private final ServiceDiscovererEvent.Status missingRecordStatus;
        private final List<ServiceDiscovererEvent<A>> aggregatedEvents = new SrvAggregateList<ServiceDiscovererEvent<A>>();

        SrvInactiveEvent(ServiceDiscovererEvent.Status missingRecordStatus) {
            this.missingRecordStatus = missingRecordStatus;
        }

        @Override
        public T address() {
            throw new IllegalStateException("address method should not be called when isAvailable is false!");
        }

        @Override
        public ServiceDiscovererEvent.Status status() {
            return this.missingRecordStatus;
        }
    }

    private static final class SrvInactiveCombinerOperator
    implements PublisherOperator<Collection<ServiceDiscovererEvent<InetSocketAddress>>, Collection<ServiceDiscovererEvent<InetSocketAddress>>> {
        static final SrvInactiveCombinerOperator EMIT = new SrvInactiveCombinerOperator(true);
        static final SrvInactiveCombinerOperator NO_EMIT = new SrvInactiveCombinerOperator(false);
        private final boolean emitAggregatedEvents;

        private SrvInactiveCombinerOperator(boolean emitAggregatedEvents) {
            this.emitAggregatedEvents = emitAggregatedEvents;
        }

        @Override
        public PublisherSource.Subscriber<? super Collection<ServiceDiscovererEvent<InetSocketAddress>>> apply(final PublisherSource.Subscriber<? super Collection<ServiceDiscovererEvent<InetSocketAddress>>> subscriber) {
            return new PublisherSource.Subscriber<Collection<ServiceDiscovererEvent<InetSocketAddress>>>(){
                @Nullable
                private List<ServiceDiscovererEvent<InetSocketAddress>> aggregatedEvents;
                @Nullable
                private PublisherSource.Subscription subscription;

                @Override
                public void onSubscribe(PublisherSource.Subscription s) {
                    this.subscription = s;
                    subscriber.onSubscribe(s);
                }

                @Override
                public void onNext(@Nullable Collection<ServiceDiscovererEvent<InetSocketAddress>> evts) {
                    assert (this.subscription != null);
                    if (this.aggregatedEvents != null) {
                        if (evts != null && emitAggregatedEvents) {
                            this.aggregatedEvents.addAll(evts);
                        }
                        this.subscription.request(1L);
                    } else if (evts instanceof SrvAggregateList) {
                        this.aggregatedEvents = (List)evts;
                        this.subscription.request(1L);
                    } else {
                        subscriber.onNext(evts);
                    }
                }

                @Override
                public void onError(Throwable t) {
                    try {
                        if (this.aggregatedEvents != null && emitAggregatedEvents) {
                            subscriber.onNext(this.aggregatedEvents);
                        }
                    }
                    finally {
                        subscriber.onError(t);
                    }
                }

                @Override
                public void onComplete() {
                    subscriber.onComplete();
                }
            };
        }
    }

    private abstract class AbstractDnsPublisher<T>
    extends SubscribablePublisher<List<ServiceDiscovererEvent<T>>> {
        protected final String name;
        @Nullable
        protected final DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver;
        @Nullable
        AbstractDnsSubscription subscription;

        AbstractDnsPublisher(@Nullable String name, DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver) {
            this.name = name;
            this.discoveryObserver = discoveryObserver;
            LOGGER.debug("{} initializing a new publisher for {}.", (Object)DefaultDnsClient.this, (Object)this);
        }

        protected abstract AbstractDnsSubscription newSubscription(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<T>>> var1);

        @Override
        protected final void handleSubscribe(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<T>>> subscriber) {
            if (DefaultDnsClient.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                this.handleSubscribe0(subscriber);
            } else {
                DefaultDnsClient.this.nettyIoExecutor.execute(() -> this.handleSubscribe0(subscriber));
            }
        }

        private void handleSubscribe0(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<T>>> subscriber) {
            DefaultDnsClient.this.assertInEventloop();
            if (this.subscription != null) {
                SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new DuplicateSubscribeException(this.subscription, subscriber));
            } else if (DefaultDnsClient.this.closed) {
                SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new ClosedDnsServiceDiscovererException());
            } else {
                this.subscription = this.newSubscription(subscriber);
                try {
                    subscriber.onSubscribe(this.subscription);
                }
                catch (Throwable cause) {
                    SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, cause);
                }
            }
        }

        final void cancelAndFail0(Throwable cause) {
            DefaultDnsClient.this.assertInEventloop();
            if (this.subscription != null) {
                this.subscription.cancelAndTerminate0(cause);
            } else {
                this.subscription = this.newSubscription(CancelImmediatelySubscriber.INSTANCE);
            }
        }

        abstract class AbstractDnsSubscription
        implements PublisherSource.Subscription {
            private final PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<T>>> subscriber;
            private long pendingRequests;
            private List<T> activeAddresses;
            private long resolveDoneNoScheduleTime;
            @Nullable
            private Cancellable cancellableForQuery;
            private long ttlNanos;

            AbstractDnsSubscription(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<T>>> subscriber) {
                this.subscriber = subscriber;
                this.activeAddresses = Collections.emptyList();
                this.ttlNanos = -1L;
            }

            protected abstract Future<DnsAnswer<T>> doDnsQuery(boolean var1);

            protected abstract Comparator<T> comparator();

            protected final ServiceDiscovererEvent.Status missingRecordStatus() {
                return DefaultDnsClient.this.missingRecordStatus;
            }

            @Override
            public final void request(long n) {
                if (DefaultDnsClient.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                    this.request0(n);
                } else {
                    DefaultDnsClient.this.nettyIoExecutor.execute(() -> this.request0(n));
                }
            }

            @Override
            public final void cancel() {
                if (DefaultDnsClient.this.nettyIoExecutor.isCurrentThreadEventLoop()) {
                    this.cancel0();
                } else {
                    DefaultDnsClient.this.nettyIoExecutor.execute(this::cancel0);
                }
            }

            private void request0(long n) {
                DefaultDnsClient.this.assertInEventloop();
                if (!SubscriberUtils.isRequestNValid(n)) {
                    this.handleTerminalError0(SubscriberUtils.newExceptionForInvalidRequestN(n));
                    return;
                }
                this.pendingRequests = FlowControlUtils.addWithOverflowProtection(this.pendingRequests, n);
                if (this.cancellableForQuery == null) {
                    if (this.ttlNanos < 0L) {
                        this.doQuery0(false);
                    } else {
                        long durationNs = DefaultDnsClient.this.nettyIoExecutor.currentTime(TimeUnit.NANOSECONDS) - this.resolveDoneNoScheduleTime;
                        if (durationNs > this.ttlNanos) {
                            this.doQuery0(false);
                        } else {
                            this.scheduleQuery0(this.ttlNanos - durationNs, this.ttlNanos);
                        }
                    }
                }
            }

            private void executeScheduledQuery0() {
                this.doQuery0(true);
            }

            private void doQuery0(boolean scheduledQuery) {
                DefaultDnsClient.this.assertInEventloop();
                if (DefaultDnsClient.this.closed) {
                    this.handleTerminalError0(new ClosedDnsServiceDiscovererException());
                } else {
                    DnsServiceDiscovererObserver.DnsResolutionObserver resolutionObserver = this.newResolutionObserver();
                    LOGGER.trace("{} querying DNS for {}.", (Object)DefaultDnsClient.this, (Object)AbstractDnsPublisher.this);
                    Future addressFuture = this.doDnsQuery(scheduledQuery);
                    this.cancellableForQuery = () -> addressFuture.cancel(true);
                    if (addressFuture.isDone()) {
                        this.handleResolveDone0(addressFuture, resolutionObserver);
                    } else {
                        addressFuture.addListener(f -> this.handleResolveDone0(f, resolutionObserver));
                    }
                }
            }

            @Nullable
            private DnsServiceDiscovererObserver.DnsResolutionObserver newResolutionObserver() {
                DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver = AbstractDnsPublisher.this.discoveryObserver;
                if (discoveryObserver == null) {
                    return null;
                }
                try {
                    return discoveryObserver.onNewResolution(AbstractDnsPublisher.this.name);
                }
                catch (Throwable unexpected) {
                    LOGGER.warn("{} unexpected exception from {} while reporting new DNS resolution for: {}", DefaultDnsClient.this, DefaultDnsClient.this.observer, AbstractDnsPublisher.this.name, unexpected);
                    return null;
                }
            }

            private void cancel0() {
                DefaultDnsClient.this.assertInEventloop();
                LOGGER.debug("{} subscription for {} is cancelled.", (Object)DefaultDnsClient.this, (Object)AbstractDnsPublisher.this);
                Cancellable oldCancellable = this.cancellableForQuery;
                this.cancellableForQuery = TERMINATED;
                if (oldCancellable != null) {
                    oldCancellable.cancel();
                }
            }

            private void cancelAndTerminate0(Throwable cause) {
                DefaultDnsClient.this.assertInEventloop();
                LOGGER.debug("{} subscription for {} will be cancelled and terminated with an error.", DefaultDnsClient.this, AbstractDnsPublisher.this, cause);
                try {
                    this.cancel0();
                }
                finally {
                    SubscriberUtils.safeOnError(this.subscriber, cause);
                }
            }

            private void scheduleQuery0(long remainingTtlNanos) {
                this.scheduleQuery0(remainingTtlNanos, remainingTtlNanos);
            }

            private void scheduleQuery0(long remainingTtlNanos, long originalTtlNanos) {
                DefaultDnsClient.this.assertInEventloop();
                long delay = ThreadLocalRandom.current().nextLong(remainingTtlNanos, FlowControlUtils.addWithOverflowProtection(remainingTtlNanos, DefaultDnsClient.this.ttlJitterNanos));
                LOGGER.debug("{} scheduling DNS query for {} after {}ms (TTL={}s, jitter={}ms).", DefaultDnsClient.this, AbstractDnsPublisher.this, TimeUnit.NANOSECONDS.toMillis(delay), TimeUnit.NANOSECONDS.toSeconds(originalTtlNanos), TimeUnit.NANOSECONDS.toMillis(DefaultDnsClient.this.ttlJitterNanos));
                this.cancellableForQuery = DefaultDnsClient.this.nettyIoExecutor.schedule(this::executeScheduledQuery0, delay, TimeUnit.NANOSECONDS);
            }

            private void handleResolveDone0(Future<DnsAnswer<T>> addressFuture, @Nullable DnsServiceDiscovererObserver.DnsResolutionObserver resolutionObserver) {
                DefaultDnsClient.this.assertInEventloop();
                assert (this.pendingRequests > 0L);
                if (this.cancellableForQuery == TERMINATED) {
                    return;
                }
                Throwable cause = addressFuture.cause();
                if (cause != null) {
                    this.reportResolutionFailed(resolutionObserver, cause);
                    this.cancelAndTerminate0(cause);
                } else {
                    List events;
                    DnsAnswer dnsAnswer = addressFuture.getNow();
                    List addresses = dnsAnswer.answer();
                    this.ttlNanos = dnsAnswer.ttlNanos();
                    if (this.ttlNanos > DefaultDnsClient.this.maxTTLNanos) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("{} result for {} has TTL={} > maxTTL={}", DefaultDnsClient.this, AbstractDnsPublisher.this, TimeUnit.NANOSECONDS.toSeconds(this.ttlNanos), TimeUnit.NANOSECONDS.toSeconds(DefaultDnsClient.this.maxTTLNanos));
                        }
                        this.ttlNanos = DefaultDnsClient.this.maxTTLNanos;
                    }
                    if ((events = ServiceDiscovererUtils.calculateDifference(this.activeAddresses, addresses, this.comparator(), resolutionObserver == null ? null : (nAvailable, nMissing) -> this.reportResolutionResult(resolutionObserver, new DefaultResolutionResult(addresses.size(), (int)TimeUnit.NANOSECONDS.toSeconds(this.ttlNanos), nAvailable, nMissing)), DefaultDnsClient.this.missingRecordStatus)) != null) {
                        this.activeAddresses = addresses;
                        if (--this.pendingRequests > 0L) {
                            this.scheduleQuery0(this.ttlNanos);
                        } else {
                            this.resolveDoneNoScheduleTime = DefaultDnsClient.this.nettyIoExecutor.currentTime(TimeUnit.NANOSECONDS);
                            this.cancellableForQuery = null;
                        }
                        try {
                            Collections.shuffle(events);
                            LOGGER.debug("{} sending events for {} (size={}, TTL={}s): {}.", DefaultDnsClient.this, AbstractDnsPublisher.this, events.size(), TimeUnit.NANOSECONDS.toSeconds(this.ttlNanos), events);
                            this.subscriber.onNext(events);
                        }
                        catch (Throwable error) {
                            this.handleTerminalError0(error);
                        }
                    } else {
                        LOGGER.trace("{} resolution is complete but no changes detected for {} based on result (size={}, TTL={}s) {}.", DefaultDnsClient.this, AbstractDnsPublisher.this, this.activeAddresses.size(), TimeUnit.NANOSECONDS.toSeconds(this.ttlNanos), this.activeAddresses);
                        this.scheduleQuery0(this.ttlNanos);
                    }
                }
            }

            private void reportResolutionFailed(@Nullable DnsServiceDiscovererObserver.DnsResolutionObserver resolutionObserver, Throwable cause) {
                if (resolutionObserver == null) {
                    return;
                }
                try {
                    resolutionObserver.resolutionFailed(cause);
                }
                catch (Throwable unexpected) {
                    ThrowableUtils.addSuppressed(unexpected, cause);
                    LOGGER.warn("{} unexpected exception from {} while reporting DNS resolution failure", DefaultDnsClient.this, resolutionObserver, unexpected);
                }
            }

            private void reportResolutionResult(DnsServiceDiscovererObserver.DnsResolutionObserver resolutionObserver, DnsServiceDiscovererObserver.ResolutionResult result) {
                try {
                    resolutionObserver.resolutionCompleted(result);
                }
                catch (Throwable unexpected) {
                    LOGGER.warn("{} unexpected exception from {} while reporting DNS resolution result {}", DefaultDnsClient.this, resolutionObserver, result, unexpected);
                }
            }

            private void handleTerminalError0(Throwable cause) {
                DefaultDnsClient.this.assertInEventloop();
                if (this.cancellableForQuery != TERMINATED) {
                    this.cancelAndTerminate0(cause);
                }
            }

            private List<ServiceDiscovererEvent<T>> generateInactiveEvent() {
                ArrayList events = new ArrayList(this.activeAddresses.size());
                if (this.activeAddresses instanceof RandomAccess) {
                    for (int i = 0; i < this.activeAddresses.size(); ++i) {
                        events.add(new DefaultServiceDiscovererEvent(this.activeAddresses.get(i), DefaultDnsClient.this.missingRecordStatus));
                    }
                } else {
                    for (Object address : this.activeAddresses) {
                        events.add(new DefaultServiceDiscovererEvent(address, DefaultDnsClient.this.missingRecordStatus));
                    }
                }
                this.activeAddresses = Collections.emptyList();
                return events;
            }
        }
    }

    private static final class DnsAnswer<T> {
        private final List<T> answer;
        private final long ttlNanos;

        DnsAnswer(List<T> answer, long ttlNanos) {
            this.answer = answer;
            this.ttlNanos = ttlNanos;
        }

        List<T> answer() {
            return this.answer;
        }

        long ttlNanos() {
            return this.ttlNanos;
        }
    }

    private class ARecordPublisher
    extends AbstractDnsPublisher<InetAddress> {
        ARecordPublisher(@Nullable String inetHost, DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver) {
            super(inetHost, discoveryObserver);
        }

        public String toString() {
            return "A* records of " + this.name + " @" + Integer.toHexString(this.hashCode());
        }

        @Override
        protected AbstractDnsPublisher.AbstractDnsSubscription newSubscription(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<InetAddress>>> subscriber) {
            return new AbstractDnsPublisher.AbstractDnsSubscription(subscriber){

                protected Future<DnsAnswer<InetAddress>> doDnsQuery(boolean scheduledQuery) {
                    if (scheduledQuery) {
                        DefaultDnsClient.this.ttlCache.prepareForResolution(ARecordPublisher.this.name);
                    }
                    Promise<DnsAnswer<InetAddress>> dnsAnswerPromise = DefaultDnsClient.this.nettyIoExecutor.eventLoopGroup().next().newPromise();
                    DefaultDnsClient.this.resolver.resolveAll(ARecordPublisher.this.name).addListener(completedFuture -> {
                        Throwable cause = completedFuture.cause();
                        if (cause != null) {
                            dnsAnswerPromise.tryFailure(cause);
                        } else {
                            DnsAnswer<InetAddress> dnsAnswer;
                            List original = (List)completedFuture.getNow();
                            long minTTLSeconds = DefaultDnsClient.this.ttlCache.minTtl(ARecordPublisher.this.name);
                            LOGGER.trace("{} original result for {} (size={}, TTL={}s): {}.", DefaultDnsClient.this, ARecordPublisher.this, original.size(), minTTLSeconds, original);
                            try {
                                dnsAnswer = new DnsAnswer<InetAddress>(this.toAddresses(original), TimeUnit.SECONDS.toNanos(minTTLSeconds));
                            }
                            catch (Throwable cause2) {
                                dnsAnswerPromise.tryFailure(cause2);
                                return;
                            }
                            dnsAnswerPromise.trySuccess(dnsAnswer);
                        }
                    });
                    return dnsAnswerPromise;
                }

                protected Comparator<InetAddress> comparator() {
                    return INET_ADDRESS_COMPARATOR;
                }

                private List<InetAddress> toAddresses(List<InetAddress> original) {
                    if (DefaultDnsClient.this.addressTypes == DnsResolverAddressTypes.IPV4_PREFERRED || DefaultDnsClient.this.addressTypes == DnsResolverAddressTypes.IPV6_PREFERRED) {
                        int ipv4Cnt = 0;
                        int ipv6Cnt = 0;
                        for (InetAddress address : original) {
                            if (address instanceof Inet4Address) {
                                ++ipv4Cnt;
                                continue;
                            }
                            assert (address instanceof Inet6Address);
                            ++ipv6Cnt;
                        }
                        if (ipv4Cnt > 0 && ipv6Cnt > 0) {
                            int capacity = DefaultDnsClient.this.addressTypes == DnsResolverAddressTypes.IPV4_PREFERRED ? ipv4Cnt : ipv6Cnt;
                            ArrayList<InetAddress> result = new ArrayList<InetAddress>(capacity);
                            for (InetAddress address : original) {
                                if ((DefaultDnsClient.this.addressTypes != DnsResolverAddressTypes.IPV4_PREFERRED || !(address instanceof Inet4Address)) && (DefaultDnsClient.this.addressTypes != DnsResolverAddressTypes.IPV6_PREFERRED || !(address instanceof Inet6Address))) continue;
                                result.add(address);
                            }
                            assert (result.size() == capacity);
                            return result;
                        }
                    }
                    return new ArrayList<InetAddress>(original);
                }
            };
        }
    }

    private final class SrvRecordPublisher
    extends AbstractDnsPublisher<HostAndPort> {
        private SrvRecordPublisher(@Nullable String serviceName, DnsServiceDiscovererObserver.DnsDiscoveryObserver discoveryObserver) {
            super(serviceName, discoveryObserver);
        }

        public String toString() {
            return "SRV records of " + this.name + " @" + Integer.toHexString(this.hashCode());
        }

        @Override
        protected AbstractDnsPublisher.AbstractDnsSubscription newSubscription(PublisherSource.Subscriber<? super List<ServiceDiscovererEvent<HostAndPort>>> subscriber) {
            return new AbstractDnsPublisher.AbstractDnsSubscription(subscriber){

                protected Future<DnsAnswer<HostAndPort>> doDnsQuery(boolean scheduledQuery) {
                    Promise<DnsAnswer<HostAndPort>> promise = DefaultDnsClient.this.nettyIoExecutor.eventLoopGroup().next().newPromise();
                    DefaultDnsClient.this.resolver.resolveAll(new DefaultDnsQuestion(SrvRecordPublisher.this.name, DnsRecordType.SRV)).addListener(completedFuture -> {
                        Throwable cause = completedFuture.cause();
                        if (cause != null) {
                            promise.tryFailure(cause);
                        } else {
                            DnsAnswer dnsAnswer;
                            long minTTLSeconds = Long.MAX_VALUE;
                            List toRelease = null;
                            try {
                                List dnsRecords = (List)completedFuture.getNow();
                                toRelease = dnsRecords;
                                ArrayList<HostAndPort> hostAndPorts = new ArrayList<HostAndPort>(dnsRecords.size());
                                Iterator iterator = dnsRecords.iterator();
                                while (iterator.hasNext()) {
                                    DnsRecord dnsRecord = (DnsRecord)iterator.next();
                                    if (!DnsRecordType.SRV.equals(dnsRecord.type()) || !(dnsRecord instanceof DnsRawRecord)) {
                                        throw new IllegalArgumentException("Unsupported DNS record type for SRV query: " + dnsRecord);
                                    }
                                    if (dnsRecord.timeToLive() < minTTLSeconds) {
                                        minTTLSeconds = dnsRecord.timeToLive();
                                    }
                                    ByteBuf content = ((DnsRawRecord)dnsRecord).content();
                                    content.skipBytes(4);
                                    int port = content.readUnsignedShort();
                                    hostAndPorts.add(HostAndPort.of(DefaultDnsRecordDecoder.decodeName(content), port));
                                }
                                LOGGER.trace("{} original result for {} (size={}, TTL={}s): {}.", DefaultDnsClient.this, SrvRecordPublisher.this, toRelease.size(), minTTLSeconds, toRelease);
                                dnsAnswer = new DnsAnswer(hostAndPorts, TimeUnit.SECONDS.toNanos(minTTLSeconds));
                            }
                            catch (Throwable cause2) {
                                promise.tryFailure(cause2);
                                return;
                            }
                            finally {
                                if (toRelease != null) {
                                    for (DnsRecord dnsRecord : toRelease) {
                                        ReferenceCountUtil.release(dnsRecord);
                                    }
                                }
                            }
                            promise.trySuccess(dnsAnswer);
                        }
                    });
                    return promise;
                }

                protected Comparator<HostAndPort> comparator() {
                    return HOST_AND_PORT_COMPARATOR;
                }
            };
        }
    }
}

