/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.transport.netty.internal;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCounted;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.internal.SubscribablePublisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.transport.netty.internal.ChannelCloseUtils;
import io.servicetalk.transport.netty.internal.CloseHandler;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NettyChannelPublisher<T>
extends SubscribablePublisher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelPublisher.class);
    private long requestCount;
    private boolean requested;
    private boolean inProcessPending;
    @Nullable
    private SubscriptionImpl subscription;
    @Nullable
    private Queue<Object> pending;
    @Nullable
    private Throwable fatalError;
    private final Channel channel;
    private final CloseHandler closeHandler;
    private final EventLoop eventLoop;
    private final Predicate<T> terminalSignalPredicate;

    NettyChannelPublisher(Channel channel, Predicate<T> terminalSignalPredicate, CloseHandler closeHandler) {
        this.eventLoop = channel.eventLoop();
        this.channel = channel;
        this.closeHandler = closeHandler;
        this.terminalSignalPredicate = Objects.requireNonNull(terminalSignalPredicate);
    }

    @Override
    protected void handleSubscribe(PublisherSource.Subscriber<? super T> nextSubscriber) {
        if (this.eventLoop.inEventLoop()) {
            this.subscribe0(nextSubscriber);
        } else {
            this.eventLoop.execute(() -> this.subscribe0(nextSubscriber));
        }
    }

    void channelRead(T data) {
        this.assertInEventloop();
        if (data instanceof ReferenceCounted) {
            this.channelReadReferenceCounted((ReferenceCounted)data);
            return;
        }
        if (this.fatalError != null) {
            return;
        }
        if (this.subscription == null || this.shouldBuffer()) {
            this.addPending(data);
            if (this.subscription != null) {
                this.processPending(this.subscription);
            }
        } else {
            this.emit(this.subscription, data);
        }
    }

    private void channelReadReferenceCounted(ReferenceCounted data) {
        try {
            data.release();
        }
        finally {
            this.pending = null;
            if (this.fatalError == null) {
                this.fatalError = new IllegalArgumentException("Reference counted leaked netty's pipeline. Object: " + data.getClass().getSimpleName());
                this.exceptionCaught0(this.fatalError);
            }
            ChannelCloseUtils.close(this.channel, this.fatalError);
        }
    }

    void onReadComplete() {
        this.assertInEventloop();
        this.requested = false;
        if (this.requestCount > 0L) {
            this.requestChannel();
        }
    }

    void exceptionCaught(Throwable throwable) {
        this.assertInEventloop();
        if (this.fatalError != null) {
            return;
        }
        this.exceptionCaught0(throwable);
    }

    private void exceptionCaught0(Throwable throwable) {
        ChannelCloseUtils.assignConnectionError(this.channel, throwable);
        if (this.subscription == null || this.shouldBuffer()) {
            this.addPending(TerminalNotification.error(throwable));
            if (this.subscription != null) {
                this.processPending(this.subscription);
            }
        } else {
            this.sendErrorToTarget(this.subscription, throwable);
        }
    }

    void channelInboundClosed(Throwable cause) {
        this.assertInEventloop();
        if (this.fatalError == null) {
            this.fatalError = cause;
            this.exceptionCaught0(this.fatalError);
        }
    }

    private void requestN(long n, SubscriptionImpl forSubscription) {
        if (forSubscription != this.subscription) {
            return;
        }
        if (SubscriberUtils.isRequestNValid(n)) {
            this.requestCount = FlowControlUtils.addWithOverflowProtection(this.requestCount, n);
            if (!this.processPending(forSubscription) && !this.requested && this.requestCount > 0L) {
                this.requestChannel();
            }
        } else {
            this.resetSubscription();
            IllegalArgumentException cause = SubscriberUtils.newExceptionForInvalidRequestN(n);
            forSubscription.associatedSub.onError(cause);
            ChannelCloseUtils.close(this.channel, (Throwable)cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processPending(SubscriptionImpl target) {
        if (!this.inProcessPending && this.pending != null && !this.pending.isEmpty()) {
            this.inProcessPending = true;
            try {
                Object p;
                while (this.requestCount > 0L && (p = this.pending.poll()) != null) {
                    if (p instanceof TerminalNotification) {
                        this.sendErrorToTarget(target, (TerminalNotification)p);
                        boolean bl = true;
                        return bl;
                    }
                    if (!this.emit(target, p)) continue;
                    if (this.subscription == target || this.subscription == null) {
                        boolean bl = true;
                        return bl;
                    }
                    target = this.subscription;
                }
                if (this.pending.peek() instanceof TerminalNotification) {
                    this.sendErrorToTarget(target, (TerminalNotification)this.pending.poll());
                    boolean bl = true;
                    return bl;
                }
            }
            finally {
                this.inProcessPending = false;
            }
        }
        return false;
    }

    private boolean emit(SubscriptionImpl target, Object next) {
        --this.requestCount;
        Object t = next;
        boolean isLast = this.terminalSignalPredicate.test(t);
        if (isLast) {
            this.resetSubscription();
        }
        try {
            target.associatedSub.onNext(t);
        }
        catch (Throwable cause) {
            this.sendErrorToTarget(target, cause);
            return true;
        }
        if (isLast) {
            target.associatedSub.onComplete();
            return true;
        }
        return false;
    }

    private void sendErrorToTarget(SubscriptionImpl target, TerminalNotification terminal) {
        Throwable throwable = terminal.cause();
        assert (throwable != null);
        this.sendErrorToTarget(target, throwable);
    }

    private void sendErrorToTarget(SubscriptionImpl target, Throwable throwable) {
        this.resetSubscription();
        try {
            target.associatedSub.onError(throwable);
        }
        finally {
            this.closeChannelInbound();
        }
    }

    private void cancel(SubscriptionImpl forSubscription) {
        if (forSubscription != this.subscription) {
            return;
        }
        LOGGER.debug("{} Cancelling subscription", (Object)this.channel);
        this.resetSubscription();
        this.pending = null;
        if (this.fatalError == null) {
            this.fatalError = StacklessClosedChannelException.newInstance(NettyChannelPublisher.class, "cancel");
        }
        this.closeChannelInbound();
    }

    private void closeChannelInbound() {
        this.closeHandler.closeChannelInbound(this.channel);
    }

    private void resetSubscription() {
        this.subscription = null;
        this.requestCount = 0L;
    }

    private void requestChannel() {
        this.requested = true;
        this.channel.read();
    }

    private void addPending(Object p) {
        if (this.pending == null) {
            this.pending = new ArrayDeque<Object>(4);
        }
        this.pending.add(p);
    }

    private boolean shouldBuffer() {
        return this.pending != null && !this.pending.isEmpty() || this.requestCount == 0L;
    }

    private void subscribe0(PublisherSource.Subscriber<? super T> subscriber) {
        SubscriptionImpl subscription = this.subscription;
        if (subscription != null) {
            SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new DuplicateSubscribeException(subscription.associatedSub, subscriber));
        } else {
            assert (this.requestCount == 0L);
            this.subscription = subscription = new SubscriptionImpl(subscriber);
            subscriber.onSubscribe(subscription);
            if (subscription == this.subscription && !this.processPending(subscription) && this.fatalError != null && (this.pending == null || this.pending.isEmpty())) {
                this.sendErrorToTarget(subscription, this.fatalError);
            }
        }
    }

    private void assertInEventloop() {
        assert (this.eventLoop.inEventLoop()) : "Must be called from the associated eventloop.";
    }

    private final class SubscriptionImpl
    implements PublisherSource.Subscription {
        final PublisherSource.Subscriber<? super T> associatedSub;

        private SubscriptionImpl(PublisherSource.Subscriber<? super T> associatedSub) {
            this.associatedSub = associatedSub;
        }

        @Override
        public void request(long n) {
            if (NettyChannelPublisher.this.eventLoop.inEventLoop()) {
                NettyChannelPublisher.this.requestN(n, this);
            } else {
                NettyChannelPublisher.this.eventLoop.execute(() -> NettyChannelPublisher.this.requestN(n, this));
            }
        }

        @Override
        public void cancel() {
            if (NettyChannelPublisher.this.eventLoop.inEventLoop()) {
                NettyChannelPublisher.this.cancel(this);
            } else {
                NettyChannelPublisher.this.eventLoop.execute(() -> NettyChannelPublisher.this.cancel(this));
            }
        }
    }
}

