/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.tcp.netty.internal;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AbstractAddressResolver;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.NoopAddressResolver;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.servicetalk.client.api.RetryableConnectException;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpClientConfig;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.FileDescriptorSocketAddress;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TcpConnector {
    private TcpConnector() {
    }

    public static <C extends ListenableAsyncCloseable> Single<C> connect(final @Nullable SocketAddress localAddress, final Object resolvedRemoteAddress, final ReadOnlyTcpClientConfig config, final boolean autoRead, final ExecutionContext executionContext, final BiFunction<Channel, ConnectionObserver, Single<? extends C>> connectionFactory, final TransportObserver observer) {
        Objects.requireNonNull(resolvedRemoteAddress);
        Objects.requireNonNull(config);
        Objects.requireNonNull(executionContext);
        Objects.requireNonNull(connectionFactory);
        Objects.requireNonNull(observer);
        return new SubscribableSingle<C>(){

            @Override
            protected void handleSubscribe(SingleSource.Subscriber<? super C> subscriber) {
                ConnectHandler connectHandler = new ConnectHandler(subscriber, connectionFactory, observer.onNewConnection(localAddress, resolvedRemoteAddress));
                try {
                    Future connectFuture = TcpConnector.connect0(localAddress, resolvedRemoteAddress, config, autoRead, executionContext, connectHandler);
                    connectHandler.connectFuture(connectFuture);
                    connectFuture.addListener(f -> {
                        Throwable cause = f.cause();
                        if (cause != null) {
                            if (cause instanceof ConnectTimeoutException) {
                                String msg = resolvedRemoteAddress instanceof FileDescriptorSocketAddress ? "Failed to register: " + resolvedRemoteAddress : "Failed to connect: " + resolvedRemoteAddress + " (localAddress: " + localAddress + ")";
                                cause = new io.servicetalk.client.api.ConnectTimeoutException(msg, cause);
                            } else if (cause instanceof ConnectException) {
                                cause = new RetryableConnectException((ConnectException)cause);
                            }
                            connectHandler.connectFailed(cause);
                        }
                    });
                }
                catch (Throwable t) {
                    connectHandler.unexpectedFailure(t);
                }
            }
        };
    }

    private static Future<?> connect0(@Nullable SocketAddress localAddress, Object resolvedRemoteAddress, ReadOnlyTcpClientConfig config, boolean autoRead, ExecutionContext executionContext, final Consumer<? super Channel> subscriber) {
        ChannelInitializer<Channel> handler = new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel channel) {
                subscriber.accept(channel);
            }
        };
        EventLoop loop = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor()).eventLoopGroup().next();
        if (!(resolvedRemoteAddress instanceof FileDescriptorSocketAddress)) {
            return TcpConnector.connectWithBootstrap(localAddress, resolvedRemoteAddress, config, autoRead, loop, handler);
        }
        if (localAddress != null) {
            return loop.newFailedFuture(new IllegalArgumentException("local address cannot be specified when " + FileDescriptorSocketAddress.class.getSimpleName() + " is used"));
        }
        Channel channel = BuilderUtils.socketChannel((EventLoopGroup)loop, (FileDescriptorSocketAddress)resolvedRemoteAddress);
        if (channel == null) {
            return loop.newFailedFuture(new IllegalArgumentException(FileDescriptorSocketAddress.class.getSimpleName() + " not supported"));
        }
        return TcpConnector.initFileDescriptorBasedChannel(config, autoRead, loop, channel, handler);
    }

    private static ChannelFuture connectWithBootstrap(@Nullable SocketAddress localAddress, Object resolvedRemoteAddress, ReadOnlyTcpClientConfig config, boolean autoRead, EventLoop loop, ChannelHandler handler) {
        SocketAddress nettyresolvedRemoteAddress = BuilderUtils.toNettyAddress(resolvedRemoteAddress);
        Bootstrap bs = new Bootstrap();
        bs.resolver(NoopNettyAddressResolverGroup.INSTANCE);
        bs.group(loop);
        bs.channel(BuilderUtils.socketChannel((EventLoopGroup)loop, nettyresolvedRemoteAddress.getClass()));
        bs.handler(handler);
        for (Map.Entry<ChannelOption, Object> opt : config.options().entrySet()) {
            bs.option(opt.getKey(), opt.getValue());
        }
        bs.option(ChannelOption.AUTO_READ, autoRead);
        bs.option(ChannelOption.ALLOCATOR, CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR);
        return bs.connect(nettyresolvedRemoteAddress, localAddress);
    }

    private static ChannelFuture initFileDescriptorBasedChannel(ReadOnlyTcpClientConfig config, boolean autoRead, EventLoop loop, Channel channel, ChannelHandler handler) {
        for (Map.Entry<ChannelOption, Object> opt : config.options().entrySet()) {
            channel.config().setOption(opt.getKey(), opt.getValue());
        }
        channel.config().setOption(ChannelOption.AUTO_READ, autoRead);
        channel.config().setAllocator(CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR);
        channel.pipeline().addLast(handler);
        return loop.register(channel);
    }

    private static final class ConnectHandler<C extends ListenableAsyncCloseable>
    implements Consumer<Channel> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ConnectHandler.class);
        private static final AtomicIntegerFieldUpdater<ConnectHandler> terminatedUpdater = AtomicIntegerFieldUpdater.newUpdater(ConnectHandler.class, "terminated");
        private final DelayedCancellable futureCancellable = new DelayedCancellable();
        private final DelayedCancellable flatMapCancellable = new DelayedCancellable();
        private final SingleSource.Subscriber<? super C> target;
        private final BiFunction<Channel, ConnectionObserver, Single<? extends C>> connectionFactory;
        private final ConnectionObserver connectionObserver;
        private volatile int terminated;

        ConnectHandler(SingleSource.Subscriber<? super C> target, BiFunction<Channel, ConnectionObserver, Single<? extends C>> connectionFactory, ConnectionObserver connectionObserver) {
            this.target = target;
            this.connectionFactory = connectionFactory;
            target.onSubscribe(() -> {
                try {
                    this.futureCancellable.cancel();
                }
                finally {
                    this.flatMapCancellable.cancel();
                }
            });
            this.connectionObserver = connectionObserver;
        }

        @Override
        public void accept(final Channel channel) {
            SourceAdapters.toSource(this.connectionFactory.apply(channel, this.connectionObserver).shareContextOnSubscribe()).subscribe(new SingleSource.Subscriber<C>(){

                @Override
                public void onSubscribe(Cancellable cancellable) {
                    flatMapCancellable.delayedCancellable(cancellable);
                }

                @Override
                public void onSuccess(@Nullable C connection) {
                    if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                        target.onSuccess(connection);
                    } else {
                        LOGGER.debug("Connection {} created for a channel: {} but connect failed previously. Closing connection.", connection, (Object)channel);
                        if (connection != null) {
                            connection.closeAsync().subscribe();
                        }
                    }
                }

                @Override
                public void onError(Throwable t) {
                    if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                        target.onError(t);
                    } else {
                        LOGGER.debug("Ignored duplicate connect failure for channel: {}.", (Object)channel, (Object)t);
                    }
                }
            });
        }

        void connectFuture(Future<?> connectFuture) {
            this.futureCancellable.delayedCancellable(() -> connectFuture.cancel(false));
        }

        void connectFailed(Throwable cause) {
            if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                this.target.onError(cause);
            }
        }

        void unexpectedFailure(Throwable cause) {
            if (terminatedUpdater.compareAndSet(this, 0, 1)) {
                this.target.onError(cause);
            }
        }
    }

    private static final class NoopNettyAddressResolverGroup
    extends AddressResolverGroup<SocketAddress> {
        static final AddressResolverGroup<SocketAddress> INSTANCE = new NoopNettyAddressResolverGroup();
        private static final AbstractAddressResolver<SocketAddress> NOOP_ADDRESS_RESOLVER = new NoopAddressResolver(ImmediateEventExecutor.INSTANCE);

        private NoopNettyAddressResolverGroup() {
        }

        @Override
        protected AddressResolver<SocketAddress> newResolver(EventExecutor executor) {
            return NOOP_ADDRESS_RESOLVER;
        }

        @Override
        public AddressResolver<SocketAddress> getResolver(EventExecutor executor) {
            return NOOP_ADDRESS_RESOLVER;
        }
    }
}

