/*
 * Decompiled with CFR 0.152.
 */
package monix.kafka;

import java.io.Serializable;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.Callback;
import monix.execution.Callback$;
import monix.execution.Cancelable;
import monix.execution.Scheduler;
import monix.execution.Scheduler$;
import monix.execution.UncaughtExceptionReporter;
import monix.execution.cancelables.BooleanCancelable;
import monix.execution.cancelables.BooleanCancelable$;
import monix.kafka.KafkaConsumerConfig;
import monix.kafka.KafkaConsumerObservable;
import monix.kafka.config.ObservableCommitType;
import monix.kafka.config.ObservableCommitType$Async$;
import monix.kafka.config.ObservableCommitType$Sync$;
import monix.reactive.Observable;
import monix.reactive.Observer;
import monix.reactive.Observer$;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001i4AAD\b\u0003)!Aa\b\u0001BC\u0002\u0013Es\b\u0003\u0005D\u0001\t\u0005\t\u0015!\u0003A\u0011!y\u0002A!b\u0001\n#\"\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011B#\t\r=\u0003A\u0011A\bQ\u0011\u0015!\u0006\u0001\"\u0003V\u0011\u001dQ\u0006A1A\u0005\nmCaa\u0018\u0001!\u0002\u0013a\u0006b\u00021\u0001\u0005\u0004%I!\u0019\u0005\u0007K\u0002\u0001\u000b\u0011\u00022\t\u000f\u0019\u0004!\u0019!C\u0005C\"1q\r\u0001Q\u0001\n\tDQ\u0001\u001b\u0001\u0005R%\u0014\u0011eS1gW\u0006\u001cuN\\:v[\u0016\u0014xJY:feZ\f'\r\\3BkR|7i\\7nSRT!\u0001E\t\u0002\u000b-\fgm[1\u000b\u0003I\tQ!\\8oSb\u001c\u0001!F\u0002\u0016Wa\u001a2\u0001\u0001\f;!\r9\"\u0004H\u0007\u00021)\u0011\u0011$E\u0001\te\u0016\f7\r^5wK&\u00111\u0004\u0007\u0002\u000b\u001f\n\u001cXM\u001d<bE2,\u0007\u0003B\u000f(S]j\u0011A\b\u0006\u0003?\u0001\n\u0001bY8ogVlWM\u001d\u0006\u0003C\t\nqa\u00197jK:$8O\u0003\u0002\u0011G)\u0011A%J\u0001\u0007CB\f7\r[3\u000b\u0003\u0019\n1a\u001c:h\u0013\tAcD\u0001\bD_:\u001cX/\\3s%\u0016\u001cwN\u001d3\u0011\u0005)ZC\u0002\u0001\u0003\u0006Y\u0001\u0011\r!\f\u0002\u0002\u0017F\u0011a\u0006\u000e\t\u0003_Ij\u0011\u0001\r\u0006\u0002c\u0005)1oY1mC&\u00111\u0007\r\u0002\b\u001d>$\b.\u001b8h!\tyS'\u0003\u00027a\t\u0019\u0011I\\=\u0011\u0005)BD!B\u001d\u0001\u0005\u0004i#!\u0001,\u0011\u000bmb\u0014f\u000e\u000f\u000e\u0003=I!!P\b\u0003/-\u000bgm[1D_:\u001cX/\\3s\u001f\n\u001cXM\u001d<bE2,\u0017AB2p]\u001aLw-F\u0001A!\tY\u0014)\u0003\u0002C\u001f\t\u00192*\u00194lC\u000e{gn];nKJ\u001cuN\u001c4jO\u000691m\u001c8gS\u001e\u0004S#A#\u0011\u0007\u0019K5*D\u0001H\u0015\tA\u0015#\u0001\u0003fm\u0006d\u0017B\u0001&H\u0005\u0011!\u0016m]6\u0011\tua\u0015fN\u0005\u0003\u001bz\u0011QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\u0018!C2p]N,X.\u001a:!\u0003\u0019a\u0014N\\5u}Q\u0019\u0011KU*\u0011\tm\u0002\u0011f\u000e\u0005\u0006}\u0015\u0001\r\u0001\u0011\u0005\u0006?\u0015\u0001\r!R\u0001\u000fG>t7/^7fe\u000e{W.\\5u)\t1\u0016\f\u0005\u00020/&\u0011\u0001\f\r\u0002\u0005+:LG\u000fC\u0003 \r\u0001\u00071*A\tq_2dG+[7f_V$X*\u001b7mSN,\u0012\u0001\u0018\t\u0003_uK!A\u0018\u0019\u0003\t1{gnZ\u0001\u0013a>dG\u000eV5nK>,H/T5mY&\u001c\b%\u0001\ntQ>,H\u000eZ\"p[6LGOQ3g_J,W#\u00012\u0011\u0005=\u001a\u0017B\u000131\u0005\u001d\u0011un\u001c7fC:\f1c\u001d5pk2$7i\\7nSR\u0014UMZ8sK\u0002\n\u0011c\u001d5pk2$7i\\7nSR\fe\r^3s\u0003I\u0019\bn\\;mI\u000e{W.\\5u\u0003\u001a$XM\u001d\u0011\u0002\u000f\u0005\u001c7\u000eV1tWR\u0019!.\u001d:\u0011\u0007\u0019K5\u000e\u0005\u0002m_6\tQN\u0003\u0002o#\u0005IQ\r_3dkRLwN\\\u0005\u0003a6\u00141!Q2l\u0011\u0015yR\u00021\u0001L\u0011\u0015\u0019X\u00021\u0001u\u0003\ryW\u000f\u001e\t\u0004kbdR\"\u0001<\u000b\u0005]D\u0012!C8cg\u0016\u0014h/\u001a:t\u0013\tIhO\u0001\u0006Tk\n\u001c8M]5cKJ\u0004")
public final class KafkaConsumerObservableAutoCommit<K, V>
extends Observable<ConsumerRecord<K, V>>
implements KafkaConsumerObservable<K, V, ConsumerRecord<K, V>> {
    private final KafkaConsumerConfig config;
    private final Task<KafkaConsumer<K, V>> consumer;
    private final long pollTimeoutMillis;
    private final boolean shouldCommitBefore;
    private final boolean shouldCommitAfter;

    @Override
    public final Cancelable unsafeSubscribeFn(Subscriber<ConsumerRecord<K, V>> out) {
        return KafkaConsumerObservable.unsafeSubscribeFn$(this, out);
    }

    @Override
    public KafkaConsumerConfig config() {
        return this.config;
    }

    @Override
    public Task<KafkaConsumer<K, V>> consumer() {
        return this.consumer;
    }

    private void consumerCommit(KafkaConsumer<K, V> consumer) {
        ObservableCommitType observableCommitType = this.config().observableCommitType();
        if (ObservableCommitType$Sync$.MODULE$.equals(observableCommitType)) {
            BoxedUnit boxedUnit = (BoxedUnit)package$.MODULE$.blocking((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> consumer.commitSync());
        } else if (ObservableCommitType$Async$.MODULE$.equals(observableCommitType)) {
            BoxedUnit boxedUnit = (BoxedUnit)package$.MODULE$.blocking((Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> consumer.commitAsync());
        } else {
            throw new MatchError((Object)observableCommitType);
        }
    }

    private long pollTimeoutMillis() {
        return this.pollTimeoutMillis;
    }

    private boolean shouldCommitBefore() {
        return this.shouldCommitBefore;
    }

    private boolean shouldCommitAfter() {
        return this.shouldCommitAfter;
    }

    @Override
    public Task<Ack> ackTask(KafkaConsumer<K, V> consumer, Subscriber<ConsumerRecord<K, V>> out) {
        return Task.AsyncBuilder$.CreatePartiallyApplied$.MODULE$.apply$extension(Task$.MODULE$.create(), (Function2 & Serializable & scala.Serializable)(scheduler, cb) -> {
            Scheduler s = scheduler;
            Callback asyncCb = Callback$.MODULE$.forked(cb, (ExecutionContext)s);
            BooleanCancelable cancelable = BooleanCancelable$.MODULE$.apply();
            new Scheduler.Extensions(Scheduler$.MODULE$.Extensions(s)).executeAsync(() -> {
                Ack.Stop$ stop$;
                try {
                    KafkaConsumer kafkaConsumer = consumer;
                    synchronized (kafkaConsumer) {
                        Ack.Stop$ stop$2;
                        if (cancelable.isCanceled()) {
                            stop$2 = Ack.Stop$.MODULE$;
                        } else {
                            ConsumerRecords next = (ConsumerRecords)package$.MODULE$.blocking((Function0 & Serializable & scala.Serializable)() -> consumer.poll(this.pollTimeoutMillis()));
                            if (this.shouldCommitBefore()) {
                                this.consumerCommit(consumer);
                            }
                            stop$2 = Observer$.MODULE$.feed((Observer)out, (Iterable)JavaConverters$.MODULE$.iterableAsScalaIterableConverter((java.lang.Iterable)next).asScala(), out.scheduler());
                        }
                        Ack.Stop$ stop$3 = stop$2;
                        // MONITOREXIT @DISABLED, blocks:[0, 1, 2] lbl13 : MonitorExitStatement: MONITOREXIT : var8_6
                        stop$ = stop$3;
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    Option option = NonFatal$.MODULE$.unapply(throwable2);
                    if (option.isEmpty()) {
                        throw throwable;
                    }
                    Throwable ex = (Throwable)option.get();
                    Future future = Future$.MODULE$.failed(ex);
                    stop$ = future;
                }
                Ack.Stop$ ackFuture = stop$;
                Ack.AckExtensions$.MODULE$.syncOnComplete$extension(Ack$.MODULE$.AckExtensions((Future)ackFuture), (Function1 & Serializable & scala.Serializable)x0$1 -> {
                    KafkaConsumerObservableAutoCommit.$anonfun$ackTask$4(this, consumer, cancelable, asyncCb, s, x0$1);
                    return BoxedUnit.UNIT;
                }, (UncaughtExceptionReporter)s);
            });
            return cancelable;
        }, Task.AsyncBuilder$.MODULE$.forCancelable());
    }

    public static final /* synthetic */ void $anonfun$ackTask$4(KafkaConsumerObservableAutoCommit $this, KafkaConsumer consumer$2, BooleanCancelable cancelable$1, Callback asyncCb$1, Scheduler s$1, Try x0$1) {
        Try try_ = x0$1;
        if (try_ instanceof Success) {
            BoxedUnit boxedUnit;
            Success success = (Success)try_;
            Ack ack = (Ack)success.value();
            boolean streamErrors = true;
            try {
                KafkaConsumer kafkaConsumer = consumer$2;
                synchronized (kafkaConsumer) {
                    if (cancelable$1.isCanceled()) {
                        streamErrors = false;
                        asyncCb$1.onSuccess((Object)Ack.Stop$.MODULE$);
                    } else {
                        if ($this.shouldCommitAfter()) {
                            $this.consumerCommit(consumer$2);
                        }
                        streamErrors = false;
                        asyncCb$1.onSuccess((Object)ack);
                    }
                }
                boxedUnit = BoxedUnit.UNIT;
            }
            catch (Throwable throwable) {
                BoxedUnit boxedUnit2;
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (!option.isEmpty()) {
                    Throwable ex = (Throwable)option.get();
                    if (streamErrors) {
                        asyncCb$1.onError((Object)ex);
                        boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        s$1.reportFailure(ex);
                        boxedUnit2 = BoxedUnit.UNIT;
                    }
                } else {
                    throw throwable;
                }
                BoxedUnit boxedUnit3 = boxedUnit2;
                boxedUnit = BoxedUnit.UNIT;
            }
            BoxedUnit boxedUnit4 = boxedUnit;
        } else if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable ex = failure.exception();
            asyncCb$1.onError((Object)ex);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    public KafkaConsumerObservableAutoCommit(KafkaConsumerConfig config, Task<KafkaConsumer<K, V>> consumer) {
        this.config = config;
        this.consumer = consumer;
        KafkaConsumerObservable.$init$(this);
        this.pollTimeoutMillis = config.fetchMaxWaitTime().toMillis();
        this.shouldCommitBefore = !config.enableAutoCommit() && config.observableCommitOrder().isBefore();
        this.shouldCommitAfter = !config.enableAutoCommit() && config.observableCommitOrder().isAfter();
    }
}

