/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.entitystream;

import com.linkedin.entitystream.AbortedException;
import com.linkedin.entitystream.EntityStream;
import com.linkedin.entitystream.Observer;
import com.linkedin.entitystream.ReadHandle;
import com.linkedin.entitystream.Reader;
import com.linkedin.entitystream.WriteHandle;
import com.linkedin.entitystream.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EntityStreamImpl<T>
implements EntityStream<T> {
    private static final Logger LOG = LoggerFactory.getLogger(EntityStreamImpl.class);
    private final Writer<? extends T> _writer;
    private final Object _lock;
    private List<Observer<? super T>> _observers;
    private Reader<? super T> _reader;
    private int _remaining;
    private boolean _notifyWritePossible;
    private State _state;

    EntityStreamImpl(Writer<? extends T> writer) {
        this._writer = writer;
        this._lock = new Object();
        this._observers = new ArrayList<Observer<? super T>>();
        this._remaining = 0;
        this._notifyWritePossible = true;
        this._state = State.UNINITIALIZED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addObserver(Observer<? super T> o) {
        Object object = this._lock;
        synchronized (object) {
            this.checkInit();
            this._observers.add(o);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setReader(Reader<? super T> r) {
        block17: {
            ReadHandle rh;
            Object object = this._lock;
            synchronized (object) {
                this.checkInit();
                this._state = State.ACTIVE;
                this._reader = r;
                this._observers = Collections.unmodifiableList(this._observers);
            }
            WriteHandleImpl wh = new WriteHandleImpl();
            Throwable writerInitEx = null;
            try {
                this._writer.onInit(wh);
            }
            catch (Throwable ex) {
                LOG.warn("Writer throws exception at onInit", ex);
                Object object2 = this._lock;
                synchronized (object2) {
                    this._state = State.ABORTED;
                }
                this.safeAbortWriter(ex);
                writerInitEx = ex;
            }
            final AtomicBoolean notified = new AtomicBoolean(false);
            if (writerInitEx == null) {
                rh = new ReadHandleImpl();
            } else {
                final Throwable cause = writerInitEx;
                rh = new ReadHandle(){

                    @Override
                    public void request(int n) {
                        this.notifyError();
                    }

                    @Override
                    public void cancel() {
                        this.notifyError();
                    }

                    void notifyError() {
                        if (notified.compareAndSet(false, true)) {
                            EntityStreamImpl.this.safeNotifyErrorToObservers(cause);
                            EntityStreamImpl.this.safeNotifyErrorToReader(cause);
                        }
                    }
                };
            }
            try {
                this._reader.onInit(rh);
            }
            catch (RuntimeException ex) {
                LOG.warn("Reader throws exception at onInit", ex);
                Object object3 = this._lock;
                synchronized (object3) {
                    if (this._state != State.ACTIVE && this._state != State.ABORT_REQUESTED && writerInitEx == null) {
                        return;
                    }
                    this._state = State.ABORTED;
                }
                if (writerInitEx == null) {
                    this.doCancel(ex, true);
                }
                if (!notified.compareAndSet(false, true)) break block17;
                this.safeNotifyErrorToObservers(ex);
                this.safeNotifyErrorToReader(ex);
            }
        }
    }

    private void checkInit() {
        if (this._state != State.UNINITIALIZED) {
            throw new IllegalStateException("EntityStream had already been initialized and can no longer accept Observers or Reader");
        }
    }

    private void safeAbortWriter(Throwable throwable) {
        try {
            this._writer.onAbort(throwable);
        }
        catch (Throwable ex) {
            LOG.warn("Writer throws exception at onAbort", ex);
        }
    }

    private void safeNotifyErrorToObservers(Throwable throwable) {
        for (Observer<T> observer : this._observers) {
            try {
                observer.onError(throwable);
            }
            catch (Throwable ex) {
                LOG.warn("Observer throws exception at onError, ignored.", ex);
            }
        }
    }

    private void safeNotifyErrorToReader(Throwable throwable) {
        try {
            this._reader.onError(throwable);
        }
        catch (Throwable ex) {
            LOG.error("Reader throws exception at onError", ex);
        }
    }

    private void doCancel(Throwable e, boolean notifyReader) {
        this.safeAbortWriter(e);
        this.safeNotifyErrorToObservers(e);
        if (notifyReader) {
            this.safeNotifyErrorToReader(e);
        }
    }

    private static Exception getAbortedException() {
        return new AbortedException("Reader aborted");
    }

    private class ReadHandleImpl
    implements ReadHandle {
        private ReadHandleImpl() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void request(int chunkNum) {
            if (chunkNum <= 0) {
                throw new IllegalArgumentException("cannot request non-positive number of data chunks: " + chunkNum);
            }
            boolean needNotify = false;
            Object object = EntityStreamImpl.this._lock;
            synchronized (object) {
                if (EntityStreamImpl.this._state != State.ACTIVE) {
                    return;
                }
                EntityStreamImpl.this._remaining += chunkNum;
                if (EntityStreamImpl.this._remaining < 0) {
                    LOG.warn("chunkNum overflow, setting to Integer.MAX_VALUE");
                    EntityStreamImpl.this._remaining = Integer.MAX_VALUE;
                }
                if (EntityStreamImpl.this._notifyWritePossible) {
                    needNotify = true;
                    EntityStreamImpl.this._notifyWritePossible = false;
                }
            }
            if (needNotify) {
                try {
                    EntityStreamImpl.this._writer.onWritePossible();
                }
                catch (Throwable ex) {
                    LOG.warn("Writer throws at onWritePossible", ex);
                    Object object2 = EntityStreamImpl.this._lock;
                    synchronized (object2) {
                        EntityStreamImpl.this._state = State.ABORTED;
                    }
                    EntityStreamImpl.this.doCancel(ex, true);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void cancel() {
            boolean doCancelNow;
            Object object = EntityStreamImpl.this._lock;
            synchronized (object) {
                boolean bl = doCancelNow = EntityStreamImpl.this._notifyWritePossible && EntityStreamImpl.this._state == State.ACTIVE;
                if (doCancelNow) {
                    EntityStreamImpl.this._state = State.ABORTED;
                } else if (EntityStreamImpl.this._state == State.ACTIVE) {
                    EntityStreamImpl.this._state = State.ABORT_REQUESTED;
                }
            }
            if (doCancelNow) {
                EntityStreamImpl.this.doCancel(EntityStreamImpl.getAbortedException(), false);
            }
        }
    }

    private class WriteHandleImpl
    implements WriteHandle<T> {
        private WriteHandleImpl() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void write(T data) {
            boolean doCancelNow = false;
            Iterator iterator = EntityStreamImpl.this._lock;
            synchronized (iterator) {
                if (EntityStreamImpl.this._state == State.FINISHED) {
                    throw new IllegalStateException("Attempting to write after done or error of WriteHandle is invoked");
                }
                if (EntityStreamImpl.this._state == State.ABORTED) {
                    return;
                }
                EntityStreamImpl.this._remaining--;
                if (EntityStreamImpl.this._remaining < 0) {
                    throw new IllegalStateException("Attempt to write when remaining is 0");
                }
                if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                    doCancelNow = true;
                    EntityStreamImpl.this._state = State.ABORTED;
                }
            }
            if (doCancelNow) {
                EntityStreamImpl.this.doCancel(EntityStreamImpl.getAbortedException(), false);
                return;
            }
            for (Observer observer : EntityStreamImpl.this._observers) {
                try {
                    observer.onDataAvailable(data);
                }
                catch (Throwable ex) {
                    LOG.warn("Observer throws exception at onDataAvailable", ex);
                }
            }
            try {
                EntityStreamImpl.this._reader.onDataAvailable(data);
            }
            catch (Throwable ex) {
                LOG.warn("Reader throws exception at onDataAvailable", ex);
                Object object = EntityStreamImpl.this._lock;
                synchronized (object) {
                    EntityStreamImpl.this._state = State.ABORTED;
                }
                EntityStreamImpl.this.doCancel(ex, true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void done() {
            boolean doCancelNow = false;
            Iterator iterator = EntityStreamImpl.this._lock;
            synchronized (iterator) {
                if (EntityStreamImpl.this._state != State.ACTIVE && EntityStreamImpl.this._state != State.ABORT_REQUESTED) {
                    return;
                }
                if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                    doCancelNow = true;
                    EntityStreamImpl.this._state = State.ABORTED;
                } else {
                    EntityStreamImpl.this._state = State.FINISHED;
                }
            }
            if (doCancelNow) {
                EntityStreamImpl.this.doCancel(EntityStreamImpl.getAbortedException(), false);
                return;
            }
            for (Observer observer : EntityStreamImpl.this._observers) {
                try {
                    observer.onDone();
                }
                catch (Throwable ex) {
                    LOG.warn("Observer throws exception at onDone, ignored.", ex);
                }
            }
            try {
                EntityStreamImpl.this._reader.onDone();
            }
            catch (Throwable ex) {
                LOG.warn("Reader throws exception at onDone; notifying writer", ex);
                EntityStreamImpl.this.safeAbortWriter(ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void error(Throwable e) {
            boolean doCancelNow = false;
            Object object = EntityStreamImpl.this._lock;
            synchronized (object) {
                if (EntityStreamImpl.this._state != State.ACTIVE && EntityStreamImpl.this._state != State.ABORT_REQUESTED) {
                    return;
                }
                if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                    doCancelNow = true;
                    EntityStreamImpl.this._state = State.ABORTED;
                } else {
                    EntityStreamImpl.this._state = State.FINISHED;
                }
            }
            if (doCancelNow) {
                EntityStreamImpl.this.doCancel(EntityStreamImpl.getAbortedException(), false);
                return;
            }
            EntityStreamImpl.this.safeNotifyErrorToObservers(e);
            try {
                EntityStreamImpl.this._reader.onError(e);
            }
            catch (Throwable ex) {
                LOG.warn("Reader throws exception at onError; notifying writer", ex);
                EntityStreamImpl.this.safeAbortWriter(ex);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int remaining() {
            int result;
            boolean doCancelNow = false;
            Object object = EntityStreamImpl.this._lock;
            synchronized (object) {
                if (EntityStreamImpl.this._state != State.ACTIVE && EntityStreamImpl.this._state != State.ABORT_REQUESTED) {
                    return 0;
                }
                if (EntityStreamImpl.this._state == State.ABORT_REQUESTED) {
                    doCancelNow = true;
                    EntityStreamImpl.this._state = State.ABORTED;
                    result = 0;
                } else {
                    if (EntityStreamImpl.this._remaining == 0) {
                        EntityStreamImpl.this._notifyWritePossible = true;
                    }
                    result = EntityStreamImpl.this._remaining;
                }
            }
            if (doCancelNow) {
                EntityStreamImpl.this.doCancel(EntityStreamImpl.getAbortedException(), false);
            }
            return result;
        }
    }

    private static enum State {
        UNINITIALIZED,
        ACTIVE,
        FINISHED,
        ABORTED,
        ABORT_REQUESTED;

    }
}

