/*
 * Decompiled with CFR 0.152.
 */
package monix.reactive.internal.operators;

import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import monix.eval.Task;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.AsyncSemaphore;
import monix.execution.AsyncSemaphore$;
import monix.execution.Cancelable;
import monix.execution.Cancelable$;
import monix.execution.CancelableFuture;
import monix.execution.ChannelType;
import monix.execution.Scheduler;
import monix.execution.UncaughtExceptionReporter;
import monix.execution.cancelables.CompositeCancelable;
import monix.execution.cancelables.CompositeCancelable$;
import monix.reactive.Observable;
import monix.reactive.OverflowStrategy;
import monix.reactive.internal.operators.MapTaskObservable;
import monix.reactive.observers.BufferedSubscriber$;
import monix.reactive.observers.Subscriber;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

public final class MapParallelOrderedObservable<A, B>
extends Observable<B> {
    private final Observable<A> source;
    public final int monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism;
    public final Function1<A, Task<B>> monix$reactive$internal$operators$MapParallelOrderedObservable$$f;
    public final OverflowStrategy<B> monix$reactive$internal$operators$MapParallelOrderedObservable$$overflowStrategy;

    public MapParallelOrderedObservable(Observable<A> source, int parallelism, Function1<A, Task<B>> f, OverflowStrategy<B> overflowStrategy) {
        this.source = source;
        this.monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism = parallelism;
        this.monix$reactive$internal$operators$MapParallelOrderedObservable$$f = f;
        this.monix$reactive$internal$operators$MapParallelOrderedObservable$$overflowStrategy = overflowStrategy;
    }

    /*
     * WARNING - void declaration
     */
    @Override
    public Cancelable unsafeSubscribeFn(Subscriber<B> out) {
        Cancelable cancelable;
        if (this.monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism <= 0) {
            out.onError(new IllegalArgumentException("parallelism > 0"));
            cancelable = Cancelable$.MODULE$.empty();
        } else if (this.monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism == 1) {
            cancelable = new MapTaskObservable<A, B>(this.source, this.monix$reactive$internal$operators$MapParallelOrderedObservable$$f).unsafeSubscribeFn(out);
        } else {
            void var3_3;
            CompositeCancelable composite = CompositeCancelable$.MODULE$.apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Cancelable[0]));
            MapAsyncParallelSubscription subscription = new MapAsyncParallelSubscription(this, out, composite);
            composite.$plus$eq(this.source.unsafeSubscribeFn(subscription));
            cancelable = var3_3;
        }
        return cancelable;
    }

    private final class MapAsyncParallelSubscription
    implements Subscriber<A>,
    Cancelable {
        private final CompositeCancelable composite;
        private final Scheduler scheduler;
        private final AsyncSemaphore semaphore;
        private final Subscriber<B> buffer;
        private boolean isDone;
        private Ack lastAck;
        private final ConcurrentLinkedQueue<CancelableFuture<B>> queue;
        private final AsyncSemaphore sendDownstreamSemaphore;
        private final /* synthetic */ MapParallelOrderedObservable $outer;

        public MapAsyncParallelSubscription(MapParallelOrderedObservable $outer, Subscriber<B> out, CompositeCancelable composite) {
            this.composite = composite;
            if ($outer == null) {
                throw new NullPointerException();
            }
            this.$outer = $outer;
            this.scheduler = out.scheduler();
            this.semaphore = AsyncSemaphore$.MODULE$.apply((long)$outer.monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism, AsyncSemaphore$.MODULE$.apply$default$2());
            this.buffer = BufferedSubscriber$.MODULE$.apply(out, $outer.monix$reactive$internal$operators$MapParallelOrderedObservable$$overflowStrategy, (ChannelType.ProducerSide)ChannelType.MultiProducer$.MODULE$);
            this.isDone = false;
            this.lastAck = Ack.Continue$.MODULE$;
            this.queue = new ConcurrentLinkedQueue();
            this.sendDownstreamSemaphore = AsyncSemaphore$.MODULE$.apply(1L, AsyncSemaphore$.MODULE$.apply$default$2());
        }

        @Override
        public Scheduler scheduler() {
            return this.scheduler;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private boolean shouldStop() {
            if (this.isDone) return true;
            Ack ack = this.lastAck;
            Ack.Stop$ stop$ = Ack.Stop$.MODULE$;
            if (ack != null) {
                if (!ack.equals(stop$)) return false;
                return true;
            }
            if (stop$ == null) return true;
            return false;
        }

        /*
         * Enabled aggressive block sorting
         */
        private void sendDownstreamOrdered() {
            CancelableFuture permit = this.sendDownstreamSemaphore.acquire();
            this.composite.$plus$eq((Cancelable)permit);
            Option option = permit.value();
            if (option instanceof Some) {
                Try try_ = (Try)((Some)option).value();
                if (try_ instanceof Success) {
                    this.doSend$1(permit);
                    return;
                }
                if (try_ instanceof Failure) {
                    Throwable error = ((Failure)try_).exception();
                    this.lastAck = Ack.Stop$.MODULE$;
                    this.composite.$minus$eq((Cancelable)permit);
                    this.onError(error);
                    return;
                }
            }
            if (!None$.MODULE$.equals(option)) throw new MatchError((Object)option);
            permit.onComplete((Function1 & Serializable)x$1 -> {
                this.sendDownstreamOrdered$$anonfun$1(permit, (Try)x$1);
                return BoxedUnit.UNIT;
            }, (ExecutionContext)this.scheduler());
        }

        private void process(A elem) {
            boolean streamErrors = true;
            try {
                Task task = (Task)this.$outer.monix$reactive$internal$operators$MapParallelOrderedObservable$$f.apply(elem);
                streamErrors = false;
                CancelableFuture future = task.executeAsync().runToFuture(this.scheduler());
                this.composite.$plus$eq(future.cancelable());
                this.queue.offer(future);
                future.onComplete((Function1 & Serializable)x$1 -> {
                    this.process$$anonfun$1(future, (Try)x$1);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.scheduler());
            }
            catch (Throwable throwable) {
                Throwable throwable2;
                Throwable ex = throwable2 = throwable;
                if (NonFatal$.MODULE$.apply(ex)) {
                    if (streamErrors) {
                        this.onError(ex);
                    } else {
                        this.scheduler().reportFailure(ex);
                    }
                }
                throw throwable;
            }
        }

        @Override
        public Future<Ack> onNext(A elem) {
            Ack.Stop$ stop$;
            if (this.shouldStop()) {
                stop$ = Ack.Stop$.MODULE$;
            } else {
                CancelableFuture cancelableFuture;
                CancelableFuture permit = this.semaphore.acquire();
                this.composite.$plus$eq((Cancelable)permit);
                Option option = permit.value();
                if (None$.MODULE$.equals(option)) {
                    cancelableFuture = permit.flatMap((Function1 & Serializable)_$1 -> {
                        this.composite.$minus$eq((Cancelable)permit);
                        this.process(elem);
                        return Ack.Continue$.MODULE$;
                    }, (ExecutionContext)this.scheduler());
                } else if (option instanceof Some) {
                    this.composite.$minus$eq((Cancelable)permit);
                    this.process(elem);
                    cancelableFuture = Ack.Continue$.MODULE$;
                } else {
                    throw new MatchError((Object)option);
                }
                Future ack = (Future)cancelableFuture;
                ack.onComplete((Function1 & Serializable)x$1 -> {
                    this.onNext$$anonfun$1(permit, (Try)x$1);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.scheduler());
                Future future = Ack$.MODULE$.AckExtensions(ack);
                stop$ = Ack.AckExtensions$.MODULE$.syncTryFlatten$extension(future, (UncaughtExceptionReporter)this.scheduler());
            }
            return stop$;
        }

        @Override
        public void onError(Throwable ex) {
            if (!this.isDone) {
                this.isDone = true;
                this.lastAck = Ack.Stop$.MODULE$;
                this.queue.clear();
                this.buffer.onError(ex);
            }
        }

        @Override
        public void onComplete() {
            this.semaphore.awaitAvailable((long)this.$outer.monix$reactive$internal$operators$MapParallelOrderedObservable$$parallelism).foreach((Function1 & Serializable)_$2 -> {
                this.onComplete$$anonfun$1((BoxedUnit)_$2);
                return BoxedUnit.UNIT;
            }, (ExecutionContext)this.scheduler());
        }

        public void cancel() {
            this.isDone = true;
            this.composite.cancel();
        }

        public final /* synthetic */ MapParallelOrderedObservable monix$reactive$internal$operators$MapParallelOrderedObservable$MapAsyncParallelSubscription$$$outer() {
            return this.$outer;
        }

        /*
         * Enabled aggressive block sorting
         */
        private final /* synthetic */ void doSend$1$$anonfun$1(CancelableFuture head$1, Try x$1) {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                Ack ack = (Ack)((Success)try_).value();
                if (Ack.Stop$.MODULE$.equals(ack)) {
                    this.lastAck = Ack.Stop$.MODULE$;
                    this.composite.cancel();
                    return;
                }
                if (Ack.Continue$.MODULE$.equals(ack)) {
                    this.semaphore.release();
                    this.composite.$minus$eq(head$1.cancelable());
                    return;
                }
            }
            if (!(try_ instanceof Failure)) throw new MatchError((Object)try_);
            Throwable ex = ((Failure)try_).exception();
            this.lastAck = Ack.Stop$.MODULE$;
            this.onError(ex);
        }

        private final void doSend$1(CancelableFuture permit$1) {
            try {
                this.composite.$minus$eq((Cancelable)permit$1);
                while (!this.shouldStop() && !this.queue.isEmpty() && ((Future)this.queue.peek()).isCompleted()) {
                    CancelableFuture head = this.queue.poll();
                    Option option = head.value();
                    if (option instanceof Some) {
                        Try try_ = (Try)((Some)option).value();
                        if (try_ instanceof Success) {
                            Object value = ((Success)try_).value();
                            Future future = Ack$.MODULE$.AckExtensions(this.buffer.onNext(value));
                            Ack.AckExtensions$.MODULE$.syncOnComplete$extension(future, (Function1 & Serializable)x$1 -> {
                                this.doSend$1$$anonfun$1(head, (Try)x$1);
                                return BoxedUnit.UNIT;
                            }, (UncaughtExceptionReporter)this.scheduler());
                            continue;
                        }
                        if (try_ instanceof Failure) {
                            Throwable error = ((Failure)try_).exception();
                            this.lastAck = Ack.Stop$.MODULE$;
                            this.composite.$minus$eq(head.cancelable());
                            this.onError(error);
                            continue;
                        }
                    }
                    if (None$.MODULE$.equals(option)) continue;
                    throw new MatchError((Object)option);
                }
            }
            finally {
                this.sendDownstreamSemaphore.release();
            }
        }

        private final /* synthetic */ void sendDownstreamOrdered$$anonfun$1(CancelableFuture permit$2, Try x$1) {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                this.doSend$1(permit$2);
            } else if (try_ instanceof Failure) {
                Throwable error = ((Failure)try_).exception();
                this.lastAck = Ack.Stop$.MODULE$;
                this.composite.$minus$eq((Cancelable)permit$2);
                this.onError(error);
            } else {
                throw new MatchError((Object)try_);
            }
        }

        private final /* synthetic */ void process$$anonfun$1(CancelableFuture future$1, Try x$1) {
            Try try_ = x$1;
            if (try_ instanceof Success) {
                this.sendDownstreamOrdered();
            } else if (try_ instanceof Failure) {
                Throwable error = ((Failure)try_).exception();
                this.lastAck = Ack.Stop$.MODULE$;
                this.composite.$minus$eq(future$1.cancelable());
                this.composite.cancel();
                this.onError(error);
            } else {
                throw new MatchError((Object)try_);
            }
        }

        private final /* synthetic */ void onNext$$anonfun$1(CancelableFuture permit$4, Try x$1) {
            block0: {
                Try try_ = x$1;
                if (!(try_ instanceof Failure)) break block0;
                Throwable ex = ((Failure)try_).exception();
                this.composite.$minus$eq((Cancelable)permit$4);
                this.onError(ex);
            }
        }

        private final /* synthetic */ void onComplete$$anonfun$1(BoxedUnit _$2) {
            if (!this.isDone) {
                this.isDone = true;
                this.lastAck = Ack.Stop$.MODULE$;
                this.buffer.onComplete();
            }
        }
    }
}

