/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.netty;

import io.netty.channel.Channel;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnection;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.WriteDemandEstimator;
import io.servicetalk.transport.netty.internal.WriteDemandEstimators;
import io.servicetalk.utils.internal.PlatformDependent;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;

final class NettyPipelinedConnection<Req, Resp>
implements NettyConnectionContext {
    private static final AtomicIntegerFieldUpdater<NettyPipelinedConnection> writeQueueLockUpdater = AtomicIntegerFieldUpdater.newUpdater(NettyPipelinedConnection.class, "writeQueueLock");
    private static final AtomicIntegerFieldUpdater<NettyPipelinedConnection> readQueueLockUpdater = AtomicIntegerFieldUpdater.newUpdater(NettyPipelinedConnection.class, "readQueueLock");
    private final NettyConnection<Resp, Req> connection;
    private final Queue<WriteTask> writeQueue;
    private final Queue<PublisherSource.Subscriber<? super Resp>> readQueue;
    private volatile int writeQueueLock;
    private volatile int readQueueLock;

    NettyPipelinedConnection(NettyConnection<Resp, Req> connection) {
        this.connection = Objects.requireNonNull(connection);
        this.writeQueue = PlatformDependent.newUnboundedMpscQueue();
        this.readQueue = PlatformDependent.newUnboundedMpscQueue();
    }

    Publisher<Resp> write(Publisher<Req> requestPublisher) {
        return this.write(requestPublisher, this.connection::defaultFlushStrategy, WriteDemandEstimators::newDefaultEstimator);
    }

    Publisher<Resp> write(final Publisher<Req> requestPublisher, final Supplier<FlushStrategy> flushStrategySupplier, final Supplier<WriteDemandEstimator> writeDemandEstimatorSupplier) {
        return new Publisher<Resp>(){

            @Override
            protected void handleSubscribe(PublisherSource.Subscriber<? super Resp> subscriber) {
                WriteTask nextWriteTask;
                try {
                    nextWriteTask = (WriteTask)NettyPipelinedConnection.this.addAndTryPoll(NettyPipelinedConnection.this.writeQueue, writeQueueLockUpdater, new WriteTask(subscriber, requestPublisher, flushStrategySupplier, writeDemandEstimatorSupplier));
                }
                catch (Throwable cause) {
                    NettyPipelinedConnection.this.closeConnection(subscriber, cause);
                    return;
                }
                if (nextWriteTask != null) {
                    nextWriteTask.run();
                }
            }
        };
    }

    @Override
    public SocketAddress localAddress() {
        return this.connection.localAddress();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.connection.remoteAddress();
    }

    @Override
    @Nullable
    public SSLSession sslSession() {
        return this.connection.sslSession();
    }

    @Override
    public ExecutionContext executionContext() {
        return this.connection.executionContext();
    }

    @Override
    @Nullable
    public <T> T socketOption(SocketOption<T> option) {
        return this.connection.socketOption(option);
    }

    @Override
    public ConnectionInfo.Protocol protocol() {
        return this.connection.protocol();
    }

    @Override
    public Single<Throwable> transportError() {
        return this.connection.transportError();
    }

    @Override
    public Completable onClosing() {
        return this.connection.onClosing();
    }

    @Override
    public Completable onClose() {
        return this.connection.onClose();
    }

    @Override
    public Completable closeAsync() {
        return this.connection.closeAsync();
    }

    @Override
    public Completable closeAsyncGracefully() {
        return this.connection.closeAsyncGracefully();
    }

    @Override
    public Channel nettyChannel() {
        return this.connection.nettyChannel();
    }

    public String toString() {
        return this.connection.toString();
    }

    @Override
    public Cancellable updateFlushStrategy(NettyConnectionContext.FlushStrategyProvider strategyProvider) {
        return this.connection.updateFlushStrategy(strategyProvider);
    }

    @Override
    public FlushStrategy defaultFlushStrategy() {
        return this.connection.defaultFlushStrategy();
    }

    private void closeConnection(PublisherSource.Subscriber<? super Resp> subscriber, Throwable cause) {
        SourceAdapters.toSource(this.connection.closeAsync().concat(Publisher.failed(cause))).subscribe(subscriber);
    }

    private void handleWriteSetupError(PublisherSource.Subscriber<? super Resp> subscriber, Throwable cause) {
        this.closeConnection(subscriber, cause);
        while (true) {
            WriteTask nextWriteTask;
            if ((nextWriteTask = this.writeQueue.poll()) != null) {
                SubscriberUtils.deliverErrorFromSource(nextWriteTask.subscriber, cause);
                continue;
            }
            if (ConcurrentUtils.releaseLock(writeQueueLockUpdater, this) || !ConcurrentUtils.tryAcquireLock(writeQueueLockUpdater, this)) break;
        }
    }

    private void handleReadSetupError(PublisherSource.Subscriber<? super Resp> subscriber, Throwable cause) {
        this.closeConnection(subscriber, cause);
        while (true) {
            PublisherSource.Subscriber<? super Resp> nextSubscriber;
            if ((nextSubscriber = this.readQueue.poll()) != null) {
                SubscriberUtils.deliverErrorFromSource(nextSubscriber, cause);
                continue;
            }
            if (ConcurrentUtils.releaseLock(readQueueLockUpdater, this) || !ConcurrentUtils.tryAcquireLock(readQueueLockUpdater, this)) break;
        }
    }

    @Nullable
    private <T> T addAndTryPoll(Queue<T> queue, AtomicIntegerFieldUpdater<NettyPipelinedConnection> lockUpdater, T item) {
        queue.add(item);
        while (ConcurrentUtils.tryAcquireLock(lockUpdater, this)) {
            T next = queue.poll();
            if (next != null) {
                return next;
            }
            if (!ConcurrentUtils.releaseLock(lockUpdater, this)) continue;
            return null;
        }
        return null;
    }

    @Nullable
    private <T> T pollWithLockAcquired(Queue<T> queue, AtomicIntegerFieldUpdater<NettyPipelinedConnection> lockUpdater) {
        try {
            do {
                T next;
                if ((next = queue.poll()) != null) {
                    return next;
                }
                if (!ConcurrentUtils.releaseLock(lockUpdater, this)) continue;
                return null;
            } while (ConcurrentUtils.tryAcquireLock(lockUpdater, this));
            return null;
        }
        catch (Throwable cause) {
            this.connection.closeAsync().subscribe();
            throw cause;
        }
    }

    private final class WriteTask {
        private final PublisherSource.Subscriber<? super Resp> subscriber;
        private final Publisher<Req> requestPublisher;
        private final Supplier<FlushStrategy> flushStrategySupplier;
        private final Supplier<WriteDemandEstimator> writeDemandEstimatorSupplier;

        private WriteTask(PublisherSource.Subscriber<? super Resp> subscriber, Publisher<Req> requestPublisher, Supplier<FlushStrategy> flushStrategySupplier, Supplier<WriteDemandEstimator> writeDemandEstimatorSupplier) {
            this.subscriber = subscriber;
            this.requestPublisher = requestPublisher;
            this.flushStrategySupplier = flushStrategySupplier;
            this.writeDemandEstimatorSupplier = writeDemandEstimatorSupplier;
        }

        void run() {
            PublisherSource src;
            try {
                src = SourceAdapters.toSource(NettyPipelinedConnection.this.connection.write(this.requestPublisher, this.flushStrategySupplier, this.writeDemandEstimatorSupplier).afterFinally(() -> {
                    WriteTask nextWriteTask = (WriteTask)NettyPipelinedConnection.this.pollWithLockAcquired(NettyPipelinedConnection.this.writeQueue, writeQueueLockUpdater);
                    if (nextWriteTask != null) {
                        nextWriteTask.run();
                    }
                }).mergeDelayError(new Publisher<Resp>(){

                    @Override
                    protected void handleSubscribe(PublisherSource.Subscriber<? super Resp> rSubscriber) {
                        PublisherSource.Subscriber nextReadSubscriber;
                        try {
                            nextReadSubscriber = (PublisherSource.Subscriber)NettyPipelinedConnection.this.addAndTryPoll(NettyPipelinedConnection.this.readQueue, readQueueLockUpdater, rSubscriber);
                        }
                        catch (Throwable cause) {
                            NettyPipelinedConnection.this.closeConnection(rSubscriber, cause);
                            return;
                        }
                        WriteTask.this.tryStartRead(nextReadSubscriber);
                    }
                }));
            }
            catch (Throwable cause) {
                NettyPipelinedConnection.this.handleWriteSetupError(this.subscriber, cause);
                return;
            }
            src.subscribe(this.subscriber);
        }

        private void tryStartRead(@Nullable PublisherSource.Subscriber<? super Resp> subscriber) {
            PublisherSource src;
            if (subscriber == null) {
                return;
            }
            try {
                src = SourceAdapters.toSource(NettyPipelinedConnection.this.connection.read().afterFinally(() -> this.tryStartRead((PublisherSource.Subscriber)NettyPipelinedConnection.this.pollWithLockAcquired(NettyPipelinedConnection.this.readQueue, readQueueLockUpdater))));
            }
            catch (Throwable cause) {
                NettyPipelinedConnection.this.handleReadSetupError(subscriber, cause);
                return;
            }
            src.subscribe(subscriber);
        }
    }
}

