/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.tcp.netty.internal;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Executors;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.tcp.netty.internal.EarlyConnectionAcceptorHandler;
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpServerConfig;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.EarlyConnectionAcceptor;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.api.LateConnectionAcceptor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.BuilderUtils;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelSet;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor;
import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors;
import io.servicetalk.transport.netty.internal.ExecutionContextUtils;
import io.servicetalk.transport.netty.internal.InfluencerConnectionAcceptor;
import io.servicetalk.transport.netty.internal.NettyServerContext;
import io.servicetalk.transport.netty.internal.SocketOptionUtils;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TcpServerBinder {
    private static final Logger LOGGER = LoggerFactory.getLogger(TcpServerBinder.class);

    private TcpServerBinder() {
    }

    @Deprecated
    public static <CC extends ConnectionContext> Single<ServerContext> bind(SocketAddress listenAddress, ReadOnlyTcpServerConfig config, boolean autoRead, ExecutionContext<?> executionContext, @Nullable InfluencerConnectionAcceptor connectionAcceptor, BiFunction<Channel, ConnectionObserver, Single<CC>> connectionFunction, Consumer<CC> connectionConsumer) {
        return TcpServerBinder.bind(listenAddress, config, executionContext, connectionAcceptor, connectionFunction, connectionConsumer, null, null);
    }

    public static <CC extends ConnectionContext> Single<ServerContext> bind(SocketAddress listenAddress, final ReadOnlyTcpServerConfig config, final ExecutionContext<?> executionContext, final @Nullable InfluencerConnectionAcceptor connectionAcceptor, final BiFunction<Channel, ConnectionObserver, Single<CC>> connectionFunction, final Consumer<CC> connectionConsumer, final @Nullable EarlyConnectionAcceptor earlyConnectionAcceptor, final @Nullable LateConnectionAcceptor lateConnectionAcceptor) {
        Objects.requireNonNull(connectionFunction);
        Objects.requireNonNull(connectionConsumer);
        listenAddress = BuilderUtils.toNettyAddress(listenAddress);
        EventLoopAwareNettyIoExecutor nettyIoExecutor = EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor());
        ServerBootstrap bs = new ServerBootstrap();
        TcpServerBinder.configure(config, bs, nettyIoExecutor.eventLoopGroup(), listenAddress.getClass());
        final ChannelSet channelSet = new ChannelSet(executionContext.executionStrategy().isCloseOffloaded() ? executionContext.executor() : Executors.immediate());
        bs.handler(new ChannelInboundHandlerAdapter(){

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                if (msg instanceof ReferenceCounted) {
                    try {
                        throw new IllegalArgumentException("Unexpected ReferenceCounted msg in 'accept' pipeline: " + msg);
                    }
                    catch (Throwable throwable) {
                        ((ReferenceCounted)msg).release();
                        throw throwable;
                    }
                }
                if (msg instanceof Channel) {
                    Channel channel = (Channel)msg;
                    if (!channel.isActive()) {
                        channel.close();
                        LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg);
                        return;
                    }
                    if (!channelSet.addIfAbsent(channel)) {
                        LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
                        return;
                    }
                }
                ctx.fireChannelRead(msg);
            }
        });
        bs.childHandler(new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel channel) {
                Single connectionSingle = (Single)connectionFunction.apply(channel, config.transportObserver().onNewConnection(channel.localAddress(), channel.remoteAddress()));
                connectionSingle = TcpServerBinder.wrapConnectionAcceptors(connectionSingle, channel, executionContext, config, earlyConnectionAcceptor, lateConnectionAcceptor, connectionAcceptor);
                connectionSingle.beforeOnError(cause -> {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Failed to create a connection for remote address {}", (Object)channel.remoteAddress(), cause);
                    }
                    ChannelCloseUtils.close(channel, cause);
                }).subscribe(connectionConsumer);
            }
        });
        final ChannelFuture future = bs.bind(listenAddress);
        return new SubscribableSingle<ServerContext>(){

            @Override
            protected void handleSubscribe(SingleSource.Subscriber<? super ServerContext> subscriber) {
                subscriber.onSubscribe(() -> future.cancel(true));
                future.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<ChannelFuture>)f -> {
                    Channel channel = f.channel();
                    Throwable cause = f.cause();
                    if (cause == null) {
                        subscriber.onSuccess(NettyServerContext.wrap(channel, channelSet, connectionAcceptor, executionContext));
                    } else {
                        ChannelCloseUtils.close(channel, f.cause());
                        subscriber.onError(f.cause());
                    }
                }));
            }
        };
    }

    private static <CC extends ConnectionContext> Single<CC> wrapConnectionAcceptors(Single<CC> connection, final Channel channel, ExecutionContext<?> executionContext, final ReadOnlyTcpServerConfig config, @Nullable EarlyConnectionAcceptor earlyConnectionAcceptor, @Nullable LateConnectionAcceptor lateConnectionAcceptor, @Nullable InfluencerConnectionAcceptor connectionAcceptor) {
        Executor offloadExecutor = executionContext.executor();
        if (earlyConnectionAcceptor != null) {
            final ExecutionContext<?> channelExecutionContext = ExecutionContextUtils.channelExecutionContext(channel, executionContext);
            ConnectionInfo info = new ConnectionInfo(){

                @Override
                public SocketAddress localAddress() {
                    return channel.localAddress();
                }

                @Override
                public SocketAddress remoteAddress() {
                    return channel.remoteAddress();
                }

                @Override
                public ExecutionContext<?> executionContext() {
                    return channelExecutionContext;
                }

                @Override
                @Nullable
                public SslConfig sslConfig() {
                    return config.sslConfig();
                }

                @Override
                @Nullable
                public SSLSession sslSession() {
                    return null;
                }

                @Override
                @Nullable
                public <T> T socketOption(SocketOption<T> option) {
                    return SocketOptionUtils.getOption(option, channel.config(), config.idleTimeoutMs());
                }

                @Override
                public ConnectionInfo.Protocol protocol() {
                    return () -> "TCP";
                }
            };
            EarlyConnectionAcceptorHandler acceptorHandler = new EarlyConnectionAcceptorHandler();
            channel.pipeline().addLast(acceptorHandler);
            Completable earlyCompletable = Completable.defer(() -> earlyConnectionAcceptor.accept(info));
            if (earlyConnectionAcceptor.requiredOffloads().isConnectOffloaded()) {
                earlyCompletable = earlyCompletable.subscribeOn(offloadExecutor);
            }
            connection = earlyCompletable.publishOn(channelExecutionContext.ioExecutor(), () -> !channel.eventLoop().inEventLoop()).concat(connection).whenOnSuccess(ignored -> acceptorHandler.releaseEvents());
        }
        if (lateConnectionAcceptor != null) {
            connection = connection.flatMap(conn -> {
                Single deferred = Single.defer(() -> lateConnectionAcceptor.accept((ConnectionInfo)conn).concat(Single.succeeded(conn)));
                if (lateConnectionAcceptor.requiredOffloads().isConnectOffloaded()) {
                    deferred = deferred.subscribeOn(offloadExecutor, IoThreadFactory.IoThread::currentThreadIsIoThread);
                }
                return deferred;
            });
        }
        if (connectionAcceptor != null) {
            connection = connection.flatMap(conn -> {
                Single deferred = Single.defer(() -> connectionAcceptor.accept((ConnectionContext)conn).concat(Single.succeeded(conn)));
                if (connectionAcceptor.requiredOffloads().isConnectOffloaded()) {
                    deferred = deferred.subscribeOn(offloadExecutor);
                }
                return deferred;
            });
        }
        return connection;
    }

    private static void configure(ReadOnlyTcpServerConfig config, ServerBootstrap bs, @Nullable EventLoopGroup eventLoopGroup, Class<? extends SocketAddress> bindAddressClass) {
        ChannelOption option;
        if (eventLoopGroup == null) {
            throw new IllegalStateException("IoExecutor must be specified before building");
        }
        bs.group(eventLoopGroup);
        bs.channel(BuilderUtils.serverChannel(eventLoopGroup, bindAddressClass));
        for (Map.Entry<ChannelOption, Object> opt : config.options().entrySet()) {
            option = opt.getKey();
            bs.childOption(option, opt.getValue());
        }
        for (Map.Entry<ChannelOption, Object> opt : config.listenOptions().entrySet()) {
            option = opt.getKey();
            bs.option(option, opt.getValue());
        }
        bs.childOption(ChannelOption.AUTO_READ, false);
        PooledByteBufAllocator byteBufAllocator = CopyByteBufHandlerChannelInitializer.POOLED_ALLOCATOR;
        bs.option(ChannelOption.ALLOCATOR, byteBufAllocator);
        bs.childOption(ChannelOption.ALLOCATOR, byteBufAllocator);
    }
}

