/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.RecyclableArrayList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;

public abstract class BackpressureManagingHandler
extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(BackpressureManagingHandler.class);
    private RecyclableArrayList buffer;
    private int currentBufferIndex;
    private State currentState = State.Buffering;
    private boolean continueDraining;
    private final BytesWriteInterceptor bytesWriteInterceptor;

    protected BackpressureManagingHandler(String thisHandlerName) {
        this.bytesWriteInterceptor = new BytesWriteInterceptor(thisHandlerName);
    }

    @Override
    public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (State.Stopped != this.currentState && !this.shouldReadMore(ctx)) {
            this.currentState = State.Buffering;
        }
        switch (this.currentState) {
            case ReadRequested: {
                this.currentState = State.Reading;
            }
            case Reading: {
                this.newMessage(ctx, msg);
                break;
            }
            case Buffering: 
            case DrainingBuffer: {
                if (null == this.buffer) {
                    this.buffer = RecyclableArrayList.newInstance();
                }
                this.buffer.add(msg);
                break;
            }
            case Stopped: {
                logger.warn("Message read after handler removed, discarding the same. Message class: " + msg.getClass().getName());
                ReferenceCountUtil.release(msg);
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.pipeline().addFirst(this.bytesWriteInterceptor);
        this.currentState = State.Buffering;
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.currentState = State.Stopped;
        if (null != this.buffer) {
            if (!this.buffer.isEmpty()) {
                for (Object item : this.buffer) {
                    ReferenceCountUtil.release(item);
                }
            }
            this.buffer.recycle();
            this.buffer = null;
        }
    }

    @Override
    public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        switch (this.currentState) {
            case ReadRequested: {
                break;
            }
            case Reading: {
                this.currentState = State.Buffering;
                break;
            }
            case Buffering: {
                break;
            }
            case DrainingBuffer: {
                break;
            }
        }
        ctx.fireChannelReadComplete();
        if (!ctx.channel().config().isAutoRead() && this.shouldReadMore(ctx)) {
            this.read(ctx);
        }
    }

    @Override
    public final void read(ChannelHandlerContext ctx) throws Exception {
        switch (this.currentState) {
            case ReadRequested: {
                ctx.read();
                break;
            }
            case Reading: {
                break;
            }
            case Buffering: {
                this.currentState = State.DrainingBuffer;
                this.continueDraining = true;
                while (this.continueDraining && null != this.buffer && this.currentBufferIndex < this.buffer.size()) {
                    Object nextItem = this.buffer.get(this.currentBufferIndex++);
                    this.newMessage(ctx, nextItem);
                    this.continueDraining = false;
                    this.channelReadComplete(ctx);
                }
                if (this.continueDraining) {
                    if (null != this.buffer) {
                        this.recycleBuffer();
                    }
                    this.currentState = State.ReadRequested;
                    ctx.read();
                    break;
                }
                this.currentState = State.Buffering;
                if (null == this.buffer || this.currentBufferIndex < this.buffer.size()) break;
                this.recycleBuffer();
                break;
            }
            case DrainingBuffer: {
                this.continueDraining = true;
                break;
            }
            case Stopped: {
                ctx.read();
            }
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Observable) {
            Observable observable = (Observable)msg;
            WriteStreamSubscriber subscriber = new WriteStreamSubscriber(ctx, promise);
            this.bytesWriteInterceptor.addSubscriber(subscriber);
            subscriber.subscribeTo(observable);
        } else {
            ctx.write(msg, promise);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        RequestReadIfRequiredEvent requestReadIfRequiredEvent;
        if (evt instanceof RequestReadIfRequiredEvent && (requestReadIfRequiredEvent = (RequestReadIfRequiredEvent)evt).shouldReadMore(ctx)) {
            this.read(ctx);
        }
        super.userEventTriggered(ctx, evt);
    }

    protected abstract void newMessage(ChannelHandlerContext var1, Object var2);

    protected abstract boolean shouldReadMore(ChannelHandlerContext var1);

    RecyclableArrayList getBuffer() {
        return this.buffer;
    }

    int getCurrentBufferIndex() {
        return this.currentBufferIndex;
    }

    State getCurrentState() {
        return this.currentState;
    }

    private void recycleBuffer() {
        this.buffer.recycle();
        this.currentBufferIndex = 0;
        this.buffer = null;
    }

    static class WriteStreamSubscriber
    extends Subscriber<Object> {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise overarchingWritePromise;
        private final Object guard = new Object();
        private boolean isDone;
        private Scheduler.Worker writeWorker;
        private boolean atleastOneWriteEnqueued;
        private boolean isPromiseCompletedOnWriteComplete;
        private int listeningTo;

        WriteStreamSubscriber(ChannelHandlerContext ctx, ChannelPromise promise) {
            this.ctx = ctx;
            this.overarchingWritePromise = promise;
            promise.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        WriteStreamSubscriber.this.unsubscribe();
                    }
                }
            });
        }

        @Override
        public void onStart() {
            this.request(1L);
        }

        @Override
        public void onCompleted() {
            this.onTermination(null);
        }

        @Override
        public void onError(Throwable e) {
            this.onTermination(e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onNext(Object nextItem) {
            boolean enqueue;
            boolean inEL = this.ctx.channel().eventLoop().inEventLoop();
            Object object = this.guard;
            synchronized (object) {
                if (null == this.writeWorker) {
                    if (!inEL) {
                        this.atleastOneWriteEnqueued = true;
                    } else if (this.atleastOneWriteEnqueued) {
                        this.writeWorker = Schedulers.computation().createWorker();
                    }
                }
                enqueue = null != this.writeWorker && inEL;
            }
            ChannelFuture channelFuture = enqueue ? this.enqueueWrite(nextItem) : this.ctx.write(nextItem);
            Object object2 = this.guard;
            synchronized (object2) {
                ++this.listeningTo;
            }
            channelFuture.addListener(new ChannelFutureListener(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    boolean _isPromiseCompletedOnWriteComplete;
                    if (WriteStreamSubscriber.this.overarchingWritePromise.isDone()) {
                        return;
                    }
                    Object object = WriteStreamSubscriber.this.guard;
                    synchronized (object) {
                        WriteStreamSubscriber.this.listeningTo--;
                        if (0 == WriteStreamSubscriber.this.listeningTo && WriteStreamSubscriber.this.isDone) {
                            WriteStreamSubscriber.this.isPromiseCompletedOnWriteComplete = true;
                        }
                        _isPromiseCompletedOnWriteComplete = WriteStreamSubscriber.this.isPromiseCompletedOnWriteComplete;
                    }
                    if (!future.isSuccess()) {
                        WriteStreamSubscriber.this.overarchingWritePromise.tryFailure(future.cause());
                        WriteStreamSubscriber.this.unsubscribe();
                    } else if (_isPromiseCompletedOnWriteComplete) {
                        WriteStreamSubscriber.this.overarchingWritePromise.trySuccess();
                    }
                }
            });
        }

        private ChannelFuture enqueueWrite(final Object nextItem) {
            final ChannelPromise toReturn = this.ctx.channel().newPromise();
            this.writeWorker.schedule(new Action0(){

                @Override
                public void call() {
                    WriteStreamSubscriber.this.ctx.write(nextItem, toReturn);
                }
            });
            return toReturn;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onTermination(Throwable throwableIfAny) {
            boolean _shouldCompletePromise;
            boolean flush;
            Object object = this.guard;
            synchronized (object) {
                flush = this.atleastOneWriteEnqueued;
                this.isDone = true;
                int _listeningTo = this.listeningTo;
                _shouldCompletePromise = 0 == _listeningTo && !this.isPromiseCompletedOnWriteComplete;
            }
            if (flush) {
                this.writeWorker.schedule(new Action0(){

                    @Override
                    public void call() {
                        WriteStreamSubscriber.this.ctx.flush();
                    }
                });
            }
            if (null != throwableIfAny) {
                this.overarchingWritePromise.tryFailure(throwableIfAny);
            } else if (_shouldCompletePromise) {
                this.overarchingWritePromise.trySuccess();
            }
        }

        private void requestMore(long more) {
            this.request(more);
        }

        public void subscribeTo(Observable observable) {
            observable.subscribe(this);
        }
    }

    static final class BytesWriteInterceptor
    extends ChannelDuplexHandler {
        static final String WRITE_INSPECTOR_HANDLER_NAME = "write-inspector";
        private final ConcurrentLinkedQueue<WriteStreamSubscriber> subscribers = new ConcurrentLinkedQueue();
        private final String parentHandlerName;
        private boolean messageReceived;

        BytesWriteInterceptor(String parentHandlerName) {
            this.parentHandlerName = parentHandlerName;
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ctx.write(msg, promise);
            this.messageReceived = true;
            this.requestMoreIfWritable(ctx.channel());
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            WriteInspector writeInspector = new WriteInspector(this);
            ChannelHandler parent = ctx.pipeline().get(this.parentHandlerName);
            if (null != parent) {
                ctx.pipeline().addBefore(this.parentHandlerName, WRITE_INSPECTOR_HANDLER_NAME, writeInspector);
            }
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isWritable()) {
                this.requestMoreIfWritable(ctx.channel());
            }
            super.channelWritabilityChanged(ctx);
        }

        public void addSubscriber(final WriteStreamSubscriber streamSubscriber) {
            streamSubscriber.add(Subscriptions.create(new Action0(){

                @Override
                public void call() {
                    BytesWriteInterceptor.this.subscribers.remove(streamSubscriber);
                }
            }));
            this.subscribers.add(streamSubscriber);
        }

        List<WriteStreamSubscriber> getSubscribers() {
            return Collections.unmodifiableList(new ArrayList<WriteStreamSubscriber>(this.subscribers));
        }

        private void requestMoreIfWritable(Channel channel) {
            for (WriteStreamSubscriber subscriber : this.subscribers) {
                if (subscriber.isUnsubscribed() || !channel.isWritable()) continue;
                subscriber.requestMore(1L);
            }
        }
    }

    static final class WriteInspector
    extends ChannelDuplexHandler {
        private final BytesWriteInterceptor bytesWriteInterceptor;

        WriteInspector(BytesWriteInterceptor bytesWriteInterceptor) {
            this.bytesWriteInterceptor = bytesWriteInterceptor;
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            this.bytesWriteInterceptor.messageReceived = false;
            ctx.write(msg, promise);
            if (!this.bytesWriteInterceptor.messageReceived) {
                this.bytesWriteInterceptor.requestMoreIfWritable(ctx.channel());
            }
        }
    }

    protected static abstract class RequestReadIfRequiredEvent {
        protected RequestReadIfRequiredEvent() {
        }

        protected abstract boolean shouldReadMore(ChannelHandlerContext var1);
    }

    static enum State {
        ReadRequested,
        Reading,
        Buffering,
        DrainingBuffer,
        Stopped;

    }
}

