/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DuplexChannel;
import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.servicetalk.http.netty.H2KeepAlivePolicies;
import io.servicetalk.http.netty.H2ProtocolConfig;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.utils.internal.ThrowableUtils;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KeepAliveManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveManager.class);
    private static final AtomicIntegerFieldUpdater<KeepAliveManager> activeStreamsUpdater = AtomicIntegerFieldUpdater.newUpdater(KeepAliveManager.class, "activeStreams");
    private static final long GRACEFUL_CLOSE_PING_CONTENT = ThreadLocalRandom.current().nextLong() | 1L;
    private static final long KEEP_ALIVE_PING_CONTENT = ThreadLocalRandom.current().nextLong() & 0xFFFFFFFFFFFFFFFEL;
    static final ByteBuf LOCAL_GO_AWAY_CONTENT = KeepAliveManager.staticByteBufFromAscii("0.local");
    static final ByteBuf REMOTE_GO_AWAY_CONTENT = KeepAliveManager.staticByteBufFromAscii("1.remote");
    static final ByteBuf SECOND_GO_AWAY_CONTENT = KeepAliveManager.staticByteBufFromAscii("2.second");
    static final ByteBuf GC_TIMEOUT_GO_AWAY_CONTENT = KeepAliveManager.staticByteBufFromAscii("3.graceful-close-timeout");
    static final ByteBuf KA_TIMEOUT_GO_AWAY_CONTENT = KeepAliveManager.staticByteBufFromAscii("4.keep-alive-timeout");
    private volatile int activeStreams;
    private final Channel channel;
    private final long pingAckTimeoutNanos;
    private final boolean disallowKeepAliveWithoutActiveStreams;
    private final Scheduler scheduler;
    @Nullable
    private Object gracefulCloseState;
    @Nullable
    private Object keepAliveState;
    @Nullable
    private Future<?> inputShutdownTimeoutFuture;
    @Nullable
    private final GenericFutureListener<Future<? super Void>> pingWriteCompletionListener;

    KeepAliveManager(Channel channel, @Nullable H2ProtocolConfig.KeepAlivePolicy keepAlivePolicy) {
        this(channel, keepAlivePolicy, (task, delay, unit) -> channel.eventLoop().schedule(task, delay, unit), (ch, idlenessThresholdNanos, onIdle) -> ch.pipeline().addLast(new IdleStateHandler(0L, 0L, idlenessThresholdNanos, TimeUnit.NANOSECONDS){

            @Override
            protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
                onIdle.run();
            }
        }));
    }

    KeepAliveManager(Channel channel, @Nullable H2ProtocolConfig.KeepAlivePolicy keepAlivePolicy, Scheduler scheduler, IdlenessDetector idlenessDetector) {
        if (channel instanceof DuplexChannel) {
            channel.config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.TRUE);
            channel.config().setAutoClose(false);
        }
        this.channel = channel;
        this.scheduler = scheduler;
        if (keepAlivePolicy != null) {
            this.disallowKeepAliveWithoutActiveStreams = !keepAlivePolicy.withoutActiveStreams();
            this.pingAckTimeoutNanos = keepAlivePolicy.ackTimeout().toNanos();
            long idlenessThresholdNanos = keepAlivePolicy.idleDuration().toNanos();
            GenericFutureListener<Future> genericFutureListener = this.pingWriteCompletionListener = idlenessThresholdNanos > 0L ? future -> {
                assert (channel.eventLoop().inEventLoop());
                if (!future.isSuccess()) {
                    LOGGER.debug("{} Failed to write a PING frame after idleness is detected, closing the channel", (Object)channel, (Object)future.cause());
                    this.close0(future.cause());
                } else if (this.keepAliveState == State.KEEP_ALIVE_ACK_PENDING) {
                    this.keepAliveState = scheduler.afterDuration(() -> {
                        if (this.keepAliveState != null) {
                            this.keepAliveState = State.KEEP_ALIVE_ACK_TIMEDOUT;
                            long timeoutMillis = TimeUnit.NANOSECONDS.toMillis(this.pingAckTimeoutNanos);
                            LOGGER.debug("{} Timeout after {}ms waiting for keep-alive PING(ACK), writing GO_AWAY frame and closing the channel with activeStreams={}", this.channel, timeoutMillis, this.activeStreams);
                            StacklessTimeoutException cause = StacklessTimeoutException.newInstance("Timeout after " + timeoutMillis + "ms waiting for keep-alive PING(ACK)", KeepAliveManager.class, "keepAlivePingAckTimeout()");
                            channel.writeAndFlush(KeepAliveManager.newGoAwayFrame(Http2Error.NO_ERROR, KA_TIMEOUT_GO_AWAY_CONTENT)).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
                                Throwable closeCause = cause;
                                if (!f.isSuccess()) {
                                    closeCause = ThrowableUtils.addSuppressed(f.cause(), cause);
                                    LOGGER.debug("{} Failed to write the GO_AWAY frame after keep-alive PING(ACK) timeout, closing the channel", (Object)channel, (Object)closeCause);
                                }
                                this.close0(closeCause);
                            }));
                        }
                    }, this.pingAckTimeoutNanos, TimeUnit.NANOSECONDS);
                }
            } : null;
            if (idlenessThresholdNanos > 0L) {
                idlenessDetector.configure(channel, idlenessThresholdNanos, this::channelIdle);
            }
        } else {
            this.disallowKeepAliveWithoutActiveStreams = false;
            this.pingAckTimeoutNanos = H2KeepAlivePolicies.DEFAULT_ACK_TIMEOUT.toNanos();
            this.pingWriteCompletionListener = null;
        }
        LOGGER.debug("{} Configured for {}duplex channel with policy={}", channel, channel instanceof DuplexChannel ? "" : "non-", keepAlivePolicy);
    }

    void pingReceived(Http2PingFrame pingFrame) {
        assert (this.channel.eventLoop().inEventLoop());
        if (pingFrame.ack()) {
            long pingAckContent = pingFrame.content();
            if (pingAckContent == GRACEFUL_CLOSE_PING_CONTENT) {
                LOGGER.debug("{} Graceful close PING(ACK) received, writing the second GO_AWAY frame, activeStreams={}", (Object)this.channel, (Object)this.activeStreams);
                this.cancelIfStateIsAFuture(this.gracefulCloseState);
                this.gracefulCloseWriteSecondGoAway(null);
            } else if (pingAckContent == KEEP_ALIVE_PING_CONTENT) {
                LOGGER.trace("{} PING(ACK) received, activeStreams={}", (Object)this.channel, (Object)this.activeStreams);
                this.cancelIfStateIsAFuture(this.keepAliveState);
                this.keepAliveState = null;
            }
        } else {
            this.channel.writeAndFlush(new DefaultHttp2PingFrame(pingFrame.content(), true));
        }
    }

    void trackActiveStream(Http2StreamChannel streamChannel) {
        activeStreamsUpdater.incrementAndGet(this);
        streamChannel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
            if (activeStreamsUpdater.decrementAndGet(this) == 0 && this.gracefulCloseState == State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT) {
                this.close0(null);
            }
        }));
    }

    void channelClosed() {
        assert (this.channel.eventLoop().inEventLoop());
        LOGGER.debug("{} Channel closed with activeStreams={}, gracefulCloseState={}, keepAliveState={}", this.channel, this.activeStreams, this.gracefulCloseState, this.keepAliveState);
        this.cancelIfStateIsAFuture(this.gracefulCloseState);
        this.cancelIfStateIsAFuture(this.keepAliveState);
        this.cancelIfStateIsAFuture(this.inputShutdownTimeoutFuture);
        this.gracefulCloseState = State.CLOSED;
        this.keepAliveState = State.CLOSED;
        this.inputShutdownTimeoutFuture = null;
    }

    void initiateGracefulClose(Runnable whenInitiated, boolean local) {
        EventLoop eventLoop = this.channel.eventLoop();
        if (eventLoop.inEventLoop()) {
            this.doCloseAsyncGracefully0(whenInitiated, local);
        } else {
            eventLoop.execute(() -> this.doCloseAsyncGracefully0(whenInitiated, local));
        }
    }

    void channelIdle() {
        assert (this.channel.eventLoop().inEventLoop());
        assert (this.pingWriteCompletionListener != null);
        if (this.keepAliveState != null || this.disallowKeepAliveWithoutActiveStreams && this.activeStreams == 0) {
            return;
        }
        LOGGER.debug("{} Idleness detected with activeStreams={}", (Object)this.channel, (Object)this.activeStreams);
        this.keepAliveState = State.KEEP_ALIVE_ACK_PENDING;
        this.channel.writeAndFlush(new DefaultHttp2PingFrame(KEEP_ALIVE_PING_CONTENT, false)).addListener((GenericFutureListener<? extends Future<? super Void>>)this.pingWriteCompletionListener);
    }

    void channelOutputShutdown() {
        assert (this.channel.eventLoop().inEventLoop());
        this.channelHalfShutdown("output", DuplexChannel::isInputShutdown);
    }

    void channelInputShutdown() {
        assert (this.channel.eventLoop().inEventLoop());
        this.cancelIfStateIsAFuture(this.inputShutdownTimeoutFuture);
        this.inputShutdownTimeoutFuture = null;
        this.channelHalfShutdown("input", DuplexChannel::isOutputShutdown);
    }

    private void channelHalfShutdown(String side, Predicate<DuplexChannel> otherSideShutdown) {
        if (this.channel instanceof DuplexChannel) {
            DuplexChannel duplexChannel = (DuplexChannel)this.channel;
            if (otherSideShutdown.test(duplexChannel)) {
                LOGGER.debug("{} Observed {} shutdown, other side is shutdown too, closing the channel with activeStreams={}, gracefulCloseState={}, keepAliveState={}", this.channel, side, this.activeStreams, this.gracefulCloseState, this.keepAliveState);
                this.channel.close();
            } else if (this.gracefulCloseState != State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT && this.gracefulCloseState != State.CLOSED) {
                String state = this.gracefulCloseState == null ? "not started" : "in progress";
                IllegalStateException cause = new IllegalStateException("Observed " + side + " shutdown while graceful closure is " + state);
                LOGGER.debug("{} Observed {} shutdown while graceful closure is {}, must force channel closure with activeStreams={}, gracefulCloseState={}, keepAliveState={}", this.channel, side, state, this.activeStreams, this.gracefulCloseState, this.keepAliveState, cause);
                ChannelCloseUtils.close(this.channel, (Throwable)cause);
            }
        } else {
            LOGGER.debug("{} Observed {} shutdown, closing non-duplex channel with activeStreams={}, gracefulCloseState={}, keepAliveState={}", this.channel, side, this.activeStreams, this.gracefulCloseState, this.keepAliveState);
            this.channel.close();
        }
    }

    private void doCloseAsyncGracefully0(Runnable whenInitiated, boolean local) {
        assert (this.channel.eventLoop().inEventLoop());
        if (this.gracefulCloseState != null) {
            return;
        }
        LOGGER.debug("{} Close gracefully with activeStreams={}, keepAliveState={}", this.channel, this.activeStreams, this.keepAliveState);
        whenInitiated.run();
        this.gracefulCloseState = State.GRACEFUL_CLOSE_START;
        DefaultHttp2GoAwayFrame goAwayFrame = KeepAliveManager.newGoAwayFrame(Http2Error.NO_ERROR, local ? LOCAL_GO_AWAY_CONTENT : REMOTE_GO_AWAY_CONTENT);
        goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
        this.channel.write(goAwayFrame);
        this.channel.writeAndFlush(new DefaultHttp2PingFrame(GRACEFUL_CLOSE_PING_CONTENT)).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
            assert (this.channel.eventLoop().inEventLoop());
            if (!future.isSuccess()) {
                LOGGER.debug("{} Failed to write the first GO_AWAY and PING frames, closing the channel", (Object)this.channel, (Object)future.cause());
                this.close0(future.cause());
            } else if (this.gracefulCloseState == State.GRACEFUL_CLOSE_START) {
                this.gracefulCloseState = this.scheduler.afterDuration(() -> {
                    long timeoutMillis = TimeUnit.NANOSECONDS.toMillis(this.pingAckTimeoutNanos);
                    LOGGER.debug("{} Timeout after {}ms waiting for graceful close PING(ACK), writing the second GO_AWAY frame and closing the channel with activeStreams={}", this.channel, timeoutMillis, this.activeStreams);
                    this.gracefulCloseWriteSecondGoAway(StacklessTimeoutException.newInstance("Timeout after " + timeoutMillis + "ms waiting for graceful close PING(ACK)", KeepAliveManager.class, "gracefulClosePingAckTimeout()"));
                }, this.pingAckTimeoutNanos, TimeUnit.NANOSECONDS);
            }
        }));
    }

    private void gracefulCloseWriteSecondGoAway(@Nullable Throwable cause) {
        assert (this.channel.eventLoop().inEventLoop());
        if (this.gracefulCloseState == State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT) {
            return;
        }
        this.gracefulCloseState = State.GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT;
        this.channel.writeAndFlush(KeepAliveManager.newGoAwayFrame(Http2Error.NO_ERROR, cause == null ? SECOND_GO_AWAY_CONTENT : GC_TIMEOUT_GO_AWAY_CONTENT)).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
            if (!future.isSuccess()) {
                Throwable closeCause = cause == null ? future.cause() : ThrowableUtils.addSuppressed(future.cause(), cause);
                LOGGER.debug("{} Failed to write the second GO_AWAY frame{}, closing the channel", this.channel, cause == null ? "" : " after graceful close PING(ACK) timeout", closeCause);
                this.close0(closeCause);
            } else if (cause != null || this.activeStreams == 0) {
                this.close0(cause);
            }
        }));
    }

    private void close0(@Nullable Throwable cause) {
        assert (this.channel.eventLoop().inEventLoop());
        if (this.gracefulCloseState == State.CLOSED && this.keepAliveState == State.CLOSED) {
            return;
        }
        LOGGER.debug("{} Marking all states as CLOSED with activeStreams={}, gracefulCloseState={}, keepAliveState={}", this.channel, this.activeStreams, this.gracefulCloseState, this.keepAliveState);
        this.gracefulCloseState = State.CLOSED;
        this.keepAliveState = State.CLOSED;
        if (cause != null) {
            ChannelCloseUtils.close(this.channel, cause);
            return;
        }
        this.channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> this.closeNotifyAndShutdownOutput()));
    }

    private void closeNotifyAndShutdownOutput() {
        if (this.channel instanceof DuplexChannel) {
            SslHandler sslHandler = this.channel.pipeline().get(SslHandler.class);
            if (sslHandler != null) {
                sslHandler.closeOutbound().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f2 -> this.doShutdownOutput()));
            } else {
                this.doShutdownOutput();
            }
        } else {
            this.channel.close();
        }
    }

    private void doShutdownOutput() {
        DuplexChannel duplexChannel = (DuplexChannel)this.channel;
        duplexChannel.shutdownOutput().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
            if (duplexChannel.isInputShutdown()) {
                LOGGER.debug("{} Input and output shutdown, closing the channel", (Object)this.channel);
                this.channel.close();
            } else {
                this.inputShutdownTimeoutFuture = this.scheduler.afterDuration(() -> {
                    this.inputShutdownTimeoutFuture = null;
                    if (duplexChannel.isInputShutdown()) {
                        return;
                    }
                    long timeoutMillis = TimeUnit.NANOSECONDS.toMillis(this.pingAckTimeoutNanos);
                    LOGGER.debug("{} Timeout after {}ms waiting for InputShutdown, closing the channel", (Object)this.channel, (Object)timeoutMillis);
                    ChannelCloseUtils.close(this.channel, (Throwable)StacklessTimeoutException.newInstance("Timeout after " + timeoutMillis + "ms waiting for InputShutdown", KeepAliveManager.class, "doShutdownOutput()"));
                }, this.pingAckTimeoutNanos, TimeUnit.NANOSECONDS);
            }
        }));
    }

    private void cancelIfStateIsAFuture(@Nullable Object state) {
        if (state instanceof Future) {
            try {
                ((Future)state).cancel(true);
            }
            catch (Throwable t) {
                LOGGER.debug("{} Failed to cancel {} scheduled future", this.channel, state == this.keepAliveState ? "keep-alive" : (state == this.gracefulCloseState ? "graceful close" : "input shutdown"), t);
            }
        }
    }

    private static DefaultHttp2GoAwayFrame newGoAwayFrame(Http2Error error, ByteBuf content) {
        return new DefaultHttp2GoAwayFrame(error, content.duplicate());
    }

    private static ByteBuf staticByteBufFromAscii(String str) {
        ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(str.length());
        ByteBufUtil.writeAscii(buf, (CharSequence)str);
        return Unpooled.unreleasableBuffer(buf.asReadOnly());
    }

    private static final class StacklessTimeoutException
    extends TimeoutException {
        private static final long serialVersionUID = -8647261218787418981L;

        private StacklessTimeoutException(String message) {
            super(message);
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }

        static StacklessTimeoutException newInstance(String message, Class<?> clazz, String method) {
            return io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace(new StacklessTimeoutException(message), clazz, method);
        }
    }

    @FunctionalInterface
    static interface IdlenessDetector {
        public void configure(Channel var1, long var2, Runnable var4);
    }

    @FunctionalInterface
    static interface Scheduler {
        public Future<?> afterDuration(Runnable var1, long var2, TimeUnit var4);
    }

    private static enum State {
        GRACEFUL_CLOSE_START,
        GRACEFUL_CLOSE_SECOND_GO_AWAY_SENT,
        KEEP_ALIVE_ACK_PENDING,
        KEEP_ALIVE_ACK_TIMEDOUT,
        CLOSED;

    }
}

