/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import io.reactivex.netty.channel.BackpressureManagingHandler;
import io.reactivex.netty.channel.BytesInspector;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.ConnectionCreationFailedEvent;
import io.reactivex.netty.channel.ConnectionInputSubscriberEvent;
import io.reactivex.netty.channel.ConnectionInputSubscriberReplaceEvent;
import io.reactivex.netty.channel.ConnectionInputSubscriberResetEvent;
import io.reactivex.netty.channel.EmitConnectionEvent;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.events.EventPublisher;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;

public abstract class AbstractConnectionToChannelBridge<R, W>
extends BackpressureManagingHandler {
    private static final Logger logger = LoggerFactory.getLogger(AbstractConnectionToChannelBridge.class);
    private static final IllegalStateException ONLY_ONE_CONN_SUB_ALLOWED = new IllegalStateException("Only one subscriber allowed for connection observable.");
    private static final IllegalStateException ONLY_ONE_CONN_INPUT_SUB_ALLOWED = new IllegalStateException("Only one subscriber allowed for connection input.");
    private static final IllegalStateException LAZY_CONN_INPUT_SUB = new IllegalStateException("Channel is set to auto-read but the subscription was lazy.");
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private final AttributeKey<ConnectionEventListener> eventListenerAttributeKey;
    private final AttributeKey<EventPublisher> eventPublisherAttributeKey;
    protected ConnectionEventListener eventListener;
    protected EventPublisher eventPublisher;
    private Subscriber<? super Channel> newChannelSub;
    private ReadProducer<R> readProducer;
    private boolean raiseErrorOnInputSubscription;
    private boolean connectionEmitted;

    protected AbstractConnectionToChannelBridge(String thisHandlerName, ConnectionEventListener eventListener, EventPublisher eventPublisher) {
        super(thisHandlerName);
        if (null == eventListener) {
            throw new IllegalArgumentException("Event listener can not be null.");
        }
        if (null == eventPublisher) {
            throw new IllegalArgumentException("Event publisher can not be null.");
        }
        this.eventListener = eventListener;
        this.eventPublisher = eventPublisher;
        this.eventListenerAttributeKey = null;
        this.eventPublisherAttributeKey = null;
    }

    protected AbstractConnectionToChannelBridge(String thisHandlerName, AttributeKey<ConnectionEventListener> eventListenerAttributeKey, AttributeKey<EventPublisher> eventPublisherAttributeKey) {
        super(thisHandlerName);
        this.eventListenerAttributeKey = eventListenerAttributeKey;
        this.eventPublisherAttributeKey = eventPublisherAttributeKey;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (null == this.eventListener && null == this.eventPublisher) {
            this.eventListener = ctx.channel().attr(this.eventListenerAttributeKey).get();
            this.eventPublisher = ctx.channel().attr(this.eventPublisherAttributeKey).get();
        }
        if (null == this.eventPublisher) {
            logger.error("No Event publisher bound to the channel, closing channel.");
            ctx.channel().close();
            return;
        }
        if (this.eventPublisher.publishingEnabled() && null == this.eventListener) {
            logger.error("No Event listener bound to the channel and publising is enabled, closing channel.");
            ctx.channel().close();
            return;
        }
        ctx.pipeline().addFirst(new BytesInspector(this.eventPublisher, this.eventListener));
        super.handlerAdded(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.connectionEmitted && AbstractConnectionToChannelBridge.isValidToEmit(this.newChannelSub)) {
            this.emitNewConnection(ctx.channel());
            this.connectionEmitted = true;
        }
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (AbstractConnectionToChannelBridge.isValidToEmitToReadSubscriber(this.readProducer)) {
            this.readProducer.sendOnError(CLOSED_CHANNEL_EXCEPTION);
        }
        super.channelUnregistered(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof EmitConnectionEvent) {
            if (!this.connectionEmitted) {
                this.emitNewConnection(ctx.channel());
                this.connectionEmitted = true;
            }
        } else if (evt instanceof ConnectionCreationFailedEvent) {
            if (AbstractConnectionToChannelBridge.isValidToEmit(this.newChannelSub)) {
                this.newChannelSub.onError(((ConnectionCreationFailedEvent)evt).getThrowable());
            }
        } else if (evt instanceof ChannelSubscriberEvent) {
            ChannelSubscriberEvent channelSubscriberEvent = (ChannelSubscriberEvent)evt;
            this.newConnectionSubscriber(channelSubscriberEvent);
        } else if (evt instanceof ConnectionInputSubscriberEvent) {
            ConnectionInputSubscriberEvent event = (ConnectionInputSubscriberEvent)evt;
            this.newConnectionInputSubscriber(ctx.channel(), event.getSubscriber(), false);
        } else if (evt instanceof ConnectionInputSubscriberResetEvent) {
            this.resetConnectionInputSubscriber();
        } else if (evt instanceof ConnectionInputSubscriberReplaceEvent) {
            ConnectionInputSubscriberReplaceEvent event = (ConnectionInputSubscriberReplaceEvent)evt;
            this.replaceConnectionInputSubscriber(ctx.channel(), event);
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void newMessage(ChannelHandlerContext ctx, Object msg) {
        if (AbstractConnectionToChannelBridge.isValidToEmitToReadSubscriber(this.readProducer)) {
            try {
                this.readProducer.sendOnNext(msg);
            }
            catch (ClassCastException e) {
                ReferenceCountUtil.release(msg);
                this.readProducer.sendOnError(e);
            }
        } else {
            if (logger.isWarnEnabled()) {
                logger.warn("Data received on channel, but no subscriber registered. Discarding data. Message class: " + msg.getClass().getName() + ", channel: " + ctx.channel());
            }
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public boolean shouldReadMore(ChannelHandlerContext ctx) {
        return null != this.readProducer && this.readProducer.shouldReadMore(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!this.connectionEmitted && AbstractConnectionToChannelBridge.isValidToEmit(this.newChannelSub)) {
            this.newChannelSub.onError(cause);
        } else if (AbstractConnectionToChannelBridge.isValidToEmitToReadSubscriber(this.readProducer)) {
            this.readProducer.sendOnError(cause);
        } else {
            logger.info("Exception in the pipeline and none of the subscribers are active.", cause);
        }
    }

    protected static boolean isValidToEmit(Subscriber<?> subscriber) {
        return null != subscriber && !subscriber.isUnsubscribed();
    }

    private static boolean isValidToEmitToReadSubscriber(ReadProducer<?> readProducer) {
        return null != readProducer && !((ReadProducer)readProducer).subscriber.isUnsubscribed();
    }

    protected boolean connectionInputSubscriberExists(Channel channel) {
        assert (channel.eventLoop().inEventLoop());
        return null != this.readProducer && null != ((ReadProducer)this.readProducer).subscriber && !((ReadProducer)this.readProducer).subscriber.isUnsubscribed();
    }

    protected void onNewReadSubscriber(Subscriber<? super R> subscriber) {
    }

    protected final void checkEagerSubscriptionIfConfigured(Channel channel) {
        if (channel.config().isAutoRead() && null == this.readProducer) {
            this.raiseErrorOnInputSubscription = true;
            Subscriber discardAll = ConnectionInputSubscriberEvent.discardAllInput().getSubscriber();
            ReadProducer producer = new ReadProducer(discardAll, channel);
            discardAll.setProducer(producer);
            this.readProducer = producer;
        }
    }

    protected final Subscriber<? super Channel> getNewChannelSub() {
        return this.newChannelSub;
    }

    private void emitNewConnection(Channel channel) {
        if (AbstractConnectionToChannelBridge.isValidToEmit(this.newChannelSub)) {
            try {
                this.newChannelSub.onNext(channel);
                this.connectionEmitted = true;
                this.checkEagerSubscriptionIfConfigured(channel);
                this.newChannelSub.onCompleted();
            }
            catch (Exception e) {
                logger.error("Error emitting a new connection. Closing this channel.", e);
                channel.close();
            }
        } else {
            channel.close();
        }
    }

    private void resetConnectionInputSubscriber() {
        Subscriber connInputSub;
        Subscriber subscriber = connInputSub = null == this.readProducer ? null : ((ReadProducer)this.readProducer).subscriber;
        if (AbstractConnectionToChannelBridge.isValidToEmit(connInputSub)) {
            connInputSub.onCompleted();
        }
        this.raiseErrorOnInputSubscription = false;
        this.readProducer = null;
    }

    private void newConnectionInputSubscriber(Channel channel, Subscriber<? super R> subscriber, boolean replace) {
        Subscriber connInputSub;
        Subscriber subscriber2 = connInputSub = null == this.readProducer ? null : ((ReadProducer)this.readProducer).subscriber;
        if (AbstractConnectionToChannelBridge.isValidToEmit(connInputSub)) {
            if (!replace) {
                subscriber.onError(ONLY_ONE_CONN_INPUT_SUB_ALLOWED);
            } else {
                this.setNewReadProducer(channel, subscriber);
                connInputSub.onCompleted();
            }
        } else if (this.raiseErrorOnInputSubscription) {
            subscriber.onError(LAZY_CONN_INPUT_SUB);
        } else {
            this.setNewReadProducer(channel, subscriber);
        }
    }

    private void setNewReadProducer(Channel channel, Subscriber<? super R> subscriber) {
        ReadProducer<? super R> producer = new ReadProducer<R>(subscriber, channel);
        subscriber.setProducer(producer);
        this.onNewReadSubscriber(subscriber);
        this.readProducer = producer;
    }

    private void replaceConnectionInputSubscriber(Channel channel, ConnectionInputSubscriberReplaceEvent<R, W> event) {
        ConnectionInputSubscriberEvent<R, W> newSubEvent = event.getNewSubEvent();
        this.newConnectionInputSubscriber(channel, newSubEvent.getSubscriber(), true);
    }

    private void newConnectionSubscriber(ChannelSubscriberEvent<R, W> event) {
        if (null == this.newChannelSub) {
            this.newChannelSub = event.getSubscriber();
        } else {
            event.getSubscriber().onError(ONLY_ONE_CONN_SUB_ALLOWED);
        }
    }

    static {
        ONLY_ONE_CONN_INPUT_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        ONLY_ONE_CONN_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        LAZY_CONN_INPUT_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
    }

    static final class ReadProducer<T>
    extends BackpressureManagingHandler.RequestReadIfRequiredEvent
    implements Producer {
        private static final AtomicLongFieldUpdater<ReadProducer> REQUEST_UPDATER = AtomicLongFieldUpdater.newUpdater(ReadProducer.class, "requested");
        private volatile long requested;
        private final Subscriber<? super T> subscriber;
        private final Channel channel;

        ReadProducer(Subscriber<? super T> subscriber, Channel channel) {
            this.subscriber = subscriber;
            this.channel = channel;
        }

        @Override
        public void request(long n) {
            if (Long.MAX_VALUE != this.requested) {
                if (Long.MAX_VALUE == n) {
                    REQUEST_UPDATER.set(this, Long.MAX_VALUE);
                } else {
                    long next;
                    long current;
                    do {
                        if ((next = (current = this.requested) + n) >= 0L) continue;
                        next = Long.MAX_VALUE;
                    } while (!REQUEST_UPDATER.compareAndSet(this, current, next));
                }
            }
            if (!this.channel.config().isAutoRead()) {
                this.channel.pipeline().fireUserEventTriggered(this);
            }
        }

        public void sendOnError(Throwable throwable) {
            this.subscriber.onError(throwable);
        }

        public void sendOnComplete() {
            this.subscriber.onCompleted();
        }

        public void sendOnNext(T nextItem) {
            if (this.requested > 0L) {
                if (REQUEST_UPDATER.get(this) != Long.MAX_VALUE) {
                    REQUEST_UPDATER.decrementAndGet(this);
                }
                this.subscriber.onNext(nextItem);
            } else {
                this.subscriber.onError(new MissingBackpressureException("Received more data on the channel than demanded by the subscriber."));
            }
        }

        @Override
        protected boolean shouldReadMore(ChannelHandlerContext ctx) {
            return !this.subscriber.isUnsubscribed() && REQUEST_UPDATER.get(this) > 0L;
        }

        long getRequested() {
            return this.requested;
        }

        public String toString() {
            return "ReadProducer{requested=" + this.requested + '}';
        }
    }
}

