/*
 * Decompiled with CFR 0.152.
 */
package rx.internal.operators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.internal.operators.NotificationLite;
import rx.internal.util.RxRingBuffer;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;

public class OperatorPublish<T>
extends ConnectableObservable<T> {
    final Observable<? extends T> source;
    private final RequestHandler<T> requestHandler;

    public static <T> ConnectableObservable<T> create(Observable<? extends T> source) {
        return new OperatorPublish<T>(source);
    }

    public static <T, R> Observable<R> create(final Observable<? extends T> source, final Func1<? super Observable<T>, ? extends Observable<R>> selector) {
        return Observable.create(new Observable.OnSubscribe<R>(){

            @Override
            public void call(final Subscriber<? super R> child) {
                OperatorPublish op = new OperatorPublish(source);
                ((Observable)selector.call(op)).unsafeSubscribe(child);
                op.connect(new Action1<Subscription>(){

                    @Override
                    public void call(Subscription sub) {
                        child.add(sub);
                    }
                });
            }
        });
    }

    private OperatorPublish(Observable<? extends T> source) {
        this(source, new Object(), new RequestHandler());
    }

    private OperatorPublish(Observable<? extends T> source, Object guard, final RequestHandler<T> requestHandler) {
        super(new Observable.OnSubscribe<T>(){

            @Override
            public void call(final Subscriber<? super T> subscriber) {
                subscriber.setProducer(new Producer(){

                    @Override
                    public void request(long n) {
                        requestHandler.requestFromChildSubscriber(subscriber, n);
                    }
                });
                subscriber.add(Subscriptions.create(new Action0(){

                    @Override
                    public void call() {
                        requestHandler.state.removeSubscriber(subscriber);
                    }
                }));
            }
        });
        this.source = source;
        this.requestHandler = requestHandler;
    }

    @Override
    public void connect(Action1<? super Subscription> connection) {
        boolean shouldSubscribe = false;
        OriginSubscriber origin = ((RequestHandler)this.requestHandler).state.getOrigin();
        if (origin == null) {
            shouldSubscribe = true;
            ((RequestHandler)this.requestHandler).state.setOrigin(new OriginSubscriber<T>(this.requestHandler));
        }
        if (shouldSubscribe) {
            connection.call(Subscriptions.create(new Action0(){

                @Override
                public void call() {
                    OriginSubscriber s = OperatorPublish.this.requestHandler.state.getOrigin();
                    OperatorPublish.this.requestHandler.state.setOrigin(null);
                    if (s != null) {
                        s.unsubscribe();
                    }
                }
            }));
            OriginSubscriber os = ((RequestHandler)this.requestHandler).state.getOrigin();
            if (os != null) {
                this.source.unsafeSubscribe(os);
            }
        }
    }

    private static class RequestHandler<T> {
        private final NotificationLite<T> notifier = NotificationLite.instance();
        private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance();
        private final State<T> state = new State();
        volatile long wip;
        static final AtomicLongFieldUpdater<RequestHandler> WIP = AtomicLongFieldUpdater.newUpdater(RequestHandler.class, "wip");

        private RequestHandler() {
        }

        public void requestFromChildSubscriber(Subscriber<? super T> subscriber, Long request) {
            this.state.requestFromSubscriber(subscriber, request);
            this.drainQueue();
        }

        public void emit(Object t) throws MissingBackpressureException {
            if (this.notifier.isCompleted(t)) {
                this.buffer.onCompleted();
            } else {
                this.buffer.onNext(this.notifier.getValue(t));
            }
            this.drainQueue();
        }

        private void requestMoreAfterEmission(int emitted) {
            long r;
            OriginSubscriber<T> origin = this.state.getOrigin();
            if (emitted > 0 && origin != null && (r = ((OriginSubscriber)origin).originOutstanding.addAndGet(-emitted)) <= ((OriginSubscriber)origin).THRESHOLD) {
                ((OriginSubscriber)origin).requestMore((long)RxRingBuffer.SIZE - ((OriginSubscriber)origin).THRESHOLD);
            }
        }

        public void drainQueue() {
            if (WIP.getAndIncrement(this) == 0L) {
                int emitted = 0;
                block0: do {
                    boolean shouldEmit;
                    WIP.set(this, 1L);
                    while (shouldEmit = this.state.canEmitWithDecrement()) {
                        Object o = this.buffer.poll();
                        if (o == null) {
                            this.state.incrementOutstandingAfterFailedEmit();
                            continue block0;
                        }
                        if (this.notifier.isCompleted(o)) {
                            for (Subscriber<T> s : this.state.getSubscribers()) {
                                this.notifier.accept(s, o);
                            }
                        } else {
                            for (Subscriber<T> s : this.state.getSubscribers()) {
                                this.notifier.accept(s, o);
                            }
                        }
                        ++emitted;
                    }
                } while (WIP.decrementAndGet(this) > 0L);
                this.requestMoreAfterEmission(emitted);
            }
        }
    }

    private static class State<T> {
        private long outstandingRequests = -1L;
        private long emittedSinceRequest = 0L;
        private OriginSubscriber<T> origin;
        private final Map<Subscriber<? super T>, AtomicLong> ss = new LinkedHashMap<Subscriber<? super T>, AtomicLong>();
        private Subscriber<? super T>[] subscribers = new Subscriber[0];

        private State() {
        }

        public synchronized OriginSubscriber<T> getOrigin() {
            return this.origin;
        }

        public synchronized void setOrigin(OriginSubscriber<T> o) {
            this.origin = o;
        }

        public synchronized boolean canEmitWithDecrement() {
            if (this.outstandingRequests > 0L) {
                --this.outstandingRequests;
                ++this.emittedSinceRequest;
                return true;
            }
            return false;
        }

        public synchronized void incrementOutstandingAfterFailedEmit() {
            ++this.outstandingRequests;
            --this.emittedSinceRequest;
        }

        public synchronized Subscriber<? super T>[] getSubscribers() {
            return this.subscribers;
        }

        public synchronized long requestFromSubscriber(Subscriber<? super T> subscriber, Long request) {
            AtomicLong r = this.ss.get(subscriber);
            if (r == null) {
                this.ss.put(subscriber, new AtomicLong(request));
            } else if (r.get() != Long.MAX_VALUE) {
                if (request == Long.MAX_VALUE) {
                    r.set(Long.MAX_VALUE);
                } else {
                    r.addAndGet(request);
                }
            }
            return this.resetAfterSubscriberUpdate();
        }

        public synchronized void removeSubscriber(Subscriber<? super T> subscriber) {
            this.ss.remove(subscriber);
            this.resetAfterSubscriberUpdate();
        }

        private long resetAfterSubscriberUpdate() {
            this.subscribers = new Subscriber[this.ss.size()];
            int i = 0;
            for (Subscriber<? super T> subscriber : this.ss.keySet()) {
                this.subscribers[i++] = subscriber;
            }
            long lowest = -1L;
            for (AtomicLong l : this.ss.values()) {
                long c = l.addAndGet(-this.emittedSinceRequest);
                if (lowest != -1L && c >= lowest) continue;
                lowest = c;
            }
            this.outstandingRequests = lowest;
            this.emittedSinceRequest = 0L;
            return this.outstandingRequests;
        }
    }

    private static class OriginSubscriber<T>
    extends Subscriber<T> {
        private final RequestHandler<T> requestHandler;
        private final AtomicLong originOutstanding = new AtomicLong();
        private final long THRESHOLD = RxRingBuffer.SIZE / 4;

        OriginSubscriber(RequestHandler<T> requestHandler) {
            this.requestHandler = requestHandler;
        }

        @Override
        public void onStart() {
            this.requestMore(RxRingBuffer.SIZE);
        }

        private void requestMore(long r) {
            this.originOutstanding.addAndGet(r);
            this.request(r);
        }

        @Override
        public void onCompleted() {
            try {
                this.requestHandler.emit(((RequestHandler)this.requestHandler).notifier.completed());
            }
            catch (MissingBackpressureException e) {
                this.onError(e);
            }
        }

        @Override
        public void onError(Throwable e) {
            ArrayList<Throwable> errors = null;
            for (Subscriber subscriber : ((RequestHandler)this.requestHandler).state.getSubscribers()) {
                try {
                    subscriber.onError(e);
                }
                catch (Throwable e2) {
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(e2);
                }
            }
            if (errors != null) {
                if (errors.size() == 1) {
                    Exceptions.propagate((Throwable)errors.get(0));
                } else {
                    throw new CompositeException("Errors while emitting onError", (Collection<? extends Throwable>)errors);
                }
            }
        }

        @Override
        public void onNext(T t) {
            try {
                this.requestHandler.emit(((RequestHandler)this.requestHandler).notifier.next(t));
            }
            catch (MissingBackpressureException e) {
                this.onError(e);
            }
        }
    }
}

