/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.transport.netty.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.SocketOptionUtils;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.Objects;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;

public final class ConnectionObserverInitializer
implements ChannelInitializer {
    private final ConnectionObserver observer;
    private final Function<Channel, ConnectionInfo> connectionInfoFactory;
    private final boolean handshakeOnActive;
    private final boolean client;

    @Deprecated
    public ConnectionObserverInitializer(ConnectionObserver observer, boolean handshakeOnActive, boolean client) {
        this(observer, PartialConnectionInfo::new, handshakeOnActive, client);
    }

    public ConnectionObserverInitializer(ConnectionObserver observer, Function<Channel, ConnectionInfo> connectionInfoFactory, boolean handshakeOnActive, boolean client) {
        this.observer = Objects.requireNonNull(observer);
        this.connectionInfoFactory = Objects.requireNonNull(connectionInfoFactory);
        this.handshakeOnActive = handshakeOnActive;
        this.client = client;
    }

    @Override
    public void init(Channel channel) {
        channel.closeFuture().addListener(future -> {
            Throwable t = ChannelCloseUtils.channelError(channel);
            if (t == null) {
                this.observer.connectionClosed();
            } else {
                this.observer.connectionClosed(t);
            }
        });
        channel.pipeline().addLast(new ConnectionObserverHandler(this.observer, this.connectionInfoFactory, this.handshakeOnActive, this.isFastOpen(channel)));
    }

    private boolean isFastOpen(Channel channel) {
        return this.client && this.handshakeOnActive && Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT)) && (Epoll.isTcpFastOpenClientSideAvailable() || KQueue.isTcpFastOpenClientSideAvailable());
    }

    private static final class PartialConnectionInfo
    implements ConnectionInfo {
        private static final ConnectionInfo.Protocol TCP_PROTOCOL = () -> "TCP";
        private final Channel channel;

        PartialConnectionInfo(Channel channel) {
            this.channel = channel;
        }

        @Override
        public SocketAddress localAddress() {
            return this.channel.localAddress();
        }

        @Override
        public SocketAddress remoteAddress() {
            return this.channel.remoteAddress();
        }

        @Override
        public ExecutionContext<?> executionContext() {
            return null;
        }

        @Override
        @Nullable
        public SslConfig sslConfig() {
            return null;
        }

        @Override
        @Nullable
        public SSLSession sslSession() {
            return null;
        }

        @Override
        @Nullable
        public <T> T socketOption(SocketOption<T> option) {
            return SocketOptionUtils.getOption(option, this.channel.config(), 0L);
        }

        @Override
        public ConnectionInfo.Protocol protocol() {
            return TCP_PROTOCOL;
        }

        public String toString() {
            return this.channel.toString();
        }
    }

    static final class ConnectionObserverHandler
    extends ChannelDuplexHandler {
        private final ConnectionObserver observer;
        private final Function<Channel, ConnectionInfo> connectionInfoFactory;
        private final boolean handshakeOnActive;
        private boolean tcpHandshakeComplete;
        @Nullable
        private ConnectionObserver.SecurityHandshakeObserver handshakeObserver;

        ConnectionObserverHandler(ConnectionObserver observer, Function<Channel, ConnectionInfo> connectionInfoFactory, boolean handshakeOnActive, boolean fastOpen) {
            this.observer = observer;
            this.connectionInfoFactory = connectionInfoFactory;
            this.handshakeOnActive = handshakeOnActive;
            if (fastOpen) {
                this.reportSecurityHandshakeStarting();
            }
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            if (channel.isActive()) {
                this.reportTcpHandshakeComplete(channel);
                if (this.handshakeOnActive) {
                    this.reportSecurityHandshakeStarting();
                }
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            this.reportTcpHandshakeComplete(ctx.channel());
            if (this.handshakeOnActive) {
                this.reportSecurityHandshakeStarting();
            }
            ctx.fireChannelActive();
        }

        private void reportTcpHandshakeComplete(Channel channel) {
            if (!this.tcpHandshakeComplete) {
                this.tcpHandshakeComplete = true;
                this.observer.onTransportHandshakeComplete(this.connectionInfoFactory.apply(channel));
            }
        }

        void reportSecurityHandshakeStarting() {
            if (this.handshakeObserver == null) {
                this.handshakeObserver = this.observer.onSecurityHandshake();
            }
        }

        @Nullable
        ConnectionObserver.SecurityHandshakeObserver handshakeObserver() {
            return this.handshakeObserver;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof ByteBuf) {
                this.observer.onDataRead(((ByteBuf)msg).readableBytes());
            } else if (msg instanceof ByteBufHolder) {
                this.observer.onDataRead(((ByteBufHolder)msg).content().readableBytes());
            }
            ctx.fireChannelRead(msg);
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            if (msg instanceof ByteBuf) {
                this.observer.onDataWrite(((ByteBuf)msg).readableBytes());
            } else if (msg instanceof ByteBufHolder) {
                this.observer.onDataWrite(((ByteBufHolder)msg).content().readableBytes());
            }
            ctx.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            this.observer.onFlush();
            ctx.flush();
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            this.observer.connectionWritabilityChanged(ctx.channel().isWritable());
            ctx.fireChannelWritabilityChanged();
        }
    }
}

