/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.utils;

import io.servicetalk.client.api.RequestRejectedException;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.StreamingHttpConnectionFilter;
import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.RetryableException;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class IdleTimeoutConnectionFilter
implements StreamingHttpConnectionFilterFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(IdleTimeoutConnectionFilter.class);
    private static final Cancellable CANCELLED = () -> {};
    private final long timeoutNs;
    @Nullable
    private final Executor timeoutExecutor;

    public IdleTimeoutConnectionFilter(Duration timeout) {
        this.timeoutNs = IdleTimeoutConnectionFilter.ensureNotNegative(timeout).toNanos();
        this.timeoutExecutor = null;
    }

    public IdleTimeoutConnectionFilter(Duration timeout, Executor timeoutExecutor) {
        this.timeoutNs = IdleTimeoutConnectionFilter.ensureNotNegative(timeout).toNanos();
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor);
    }

    private static Duration ensureNotNegative(Duration timeout) {
        if (timeout.isNegative()) {
            throw new IllegalArgumentException("Negative timeout: " + timeout.toNanos() + " ns (expected: >=0)");
        }
        return timeout;
    }

    private static Executor contextExecutor(ExecutionContext<HttpExecutionStrategy> context) {
        return context.executionStrategy().hasOffloads() ? context.executor() : context.ioExecutor();
    }

    @Override
    public StreamingHttpConnectionFilter create(FilterableStreamingHttpConnection connection) {
        if (this.timeoutNs == 0L) {
            return new DisabledIdleTimeoutConnectionFilter(connection);
        }
        return new IdleTimeoutConnectionFilterImpl(connection, this.timeoutNs, this.timeoutExecutor != null ? this.timeoutExecutor : IdleTimeoutConnectionFilter.contextExecutor(connection.executionContext()));
    }

    @Override
    public HttpExecutionStrategy requiredOffloads() {
        return HttpExecutionStrategies.offloadNone();
    }

    public String toString() {
        return this.getClass().getName() + "{timeoutNs=" + this.timeoutNs + ", timeoutExecutor=" + this.timeoutExecutor + '}';
    }

    private static final class RetryableClosedChannelException
    extends ClosedChannelException
    implements RetryableException {
        private static final long serialVersionUID = 5678979395131901139L;
        private final String message;

        RetryableClosedChannelException(FilterableStreamingHttpConnection connection, long timeoutNs) {
            this.message = "Connection " + connection + " was closed due to " + TimeUnit.NANOSECONDS.toMillis(timeoutNs) + " ms of inactivity";
        }

        @Override
        public String getMessage() {
            return this.message;
        }
    }

    private static final class DisabledIdleTimeoutConnectionFilter
    extends StreamingHttpConnectionFilter {
        DisabledIdleTimeoutConnectionFilter(FilterableStreamingHttpConnection delegate) {
            super(delegate);
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName() + '(' + this.delegate() + ')';
        }
    }

    private static final class IdleTimeoutConnectionFilterImpl
    extends StreamingHttpConnectionFilter
    implements Runnable {
        private static final AtomicIntegerFieldUpdater<IdleTimeoutConnectionFilterImpl> requestsUpdater = AtomicIntegerFieldUpdater.newUpdater(IdleTimeoutConnectionFilterImpl.class, "requests");
        private static final AtomicReferenceFieldUpdater<IdleTimeoutConnectionFilterImpl, Cancellable> timeoutTaskUpdater = AtomicReferenceFieldUpdater.newUpdater(IdleTimeoutConnectionFilterImpl.class, Cancellable.class, "timeoutTask");
        private volatile int requests;
        @Nullable
        private volatile Cancellable timeoutTask;
        private final long timeoutNs;
        private final Executor timeoutExecutor;
        private volatile long lastResponseTime;

        IdleTimeoutConnectionFilterImpl(FilterableStreamingHttpConnection connection, long timeoutNs, Executor timeoutExecutor) {
            super(connection);
            this.timeoutNs = timeoutNs;
            this.timeoutExecutor = timeoutExecutor;
            connection.onClose().whenFinally(this::cancelTask).subscribe();
            this.lastResponseTime = this.nanoTime();
            this.timeoutTask = this.timeoutExecutor.schedule(this, timeoutNs, TimeUnit.NANOSECONDS);
        }

        private long nanoTime() {
            return this.timeoutExecutor.currentTime(TimeUnit.NANOSECONDS);
        }

        private void cancelTask() {
            Cancellable oldTask = timeoutTaskUpdater.getAndSet(this, CANCELLED);
            if (oldTask != null) {
                oldTask.cancel();
            }
        }

        @Override
        public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
            return Single.defer(() -> {
                int prevInFlightRequests = requestsUpdater.getAndAccumulate(this, 1, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                if (prevInFlightRequests < 0) {
                    return Single.failed(new RetryableClosedChannelException(this.delegate(), this.timeoutNs));
                }
                if (prevInFlightRequests == Integer.MAX_VALUE) {
                    return Single.failed(new RequestRejectedException("Connection " + this.delegate() + " already processes Integer.MAX_VALUE other requests, it can not process more."));
                }
                return this.delegate().request(request).liftSync(new BeforeFinallyHttpOperator(() -> {
                    int remainingRequests = requestsUpdater.decrementAndGet(this);
                    assert (remainingRequests >= 0) : "Unexpected remaining requests value: " + remainingRequests;
                    if (remainingRequests == 0) {
                        this.lastResponseTime = this.nanoTime();
                    }
                })).shareContextOnSubscribe();
            });
        }

        private void updateIdleTimeout(long delayNs) {
            Cancellable newTask = this.timeoutExecutor.schedule(this, delayNs, TimeUnit.NANOSECONDS);
            if (!timeoutTaskUpdater.compareAndSet(this, null, newTask)) {
                assert (this.timeoutTask == CANCELLED) : "Unexpected timeoutTask: " + this.timeoutTask;
                newTask.cancel();
            }
        }

        @Override
        public void run() {
            long requests;
            block3: {
                long nextDelayNs;
                block4: {
                    Cancellable oldTask = timeoutTaskUpdater.getAndSet(this, null);
                    if (oldTask == CANCELLED) {
                        return;
                    }
                    do {
                        if ((requests = (long)this.requests) > 0L) {
                            this.updateIdleTimeout(this.timeoutNs);
                            return;
                        }
                        if (requests != 0L) break block3;
                        nextDelayNs = this.timeoutNs - (this.nanoTime() - this.lastResponseTime);
                        if (nextDelayNs > 0L) break block4;
                    } while (!requestsUpdater.compareAndSet(this, 0, Integer.MIN_VALUE));
                    FilterableStreamingHttpConnection connection = this.delegate();
                    LOGGER.debug("Closing connection {} after {} ms of inactivity", (Object)connection, (Object)TimeUnit.NANOSECONDS.toMillis(this.timeoutNs));
                    connection.closeAsync().subscribe();
                    return;
                }
                this.updateIdleTimeout(nextDelayNs);
                return;
            }
            LOGGER.warn("{} Unexpected concurrent requests value {}", (Object)this.delegate(), (Object)requests);
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName() + '[' + TimeUnit.NANOSECONDS.toMillis(this.timeoutNs) + " ms](" + this.delegate() + ')';
        }
    }
}

