/*
 * Decompiled with CFR 0.152.
 */
package monix.connect.elasticsearch;

import com.sksamuel.elastic4s.CommonRequestOptions$;
import com.sksamuel.elastic4s.ElasticClient;
import com.sksamuel.elastic4s.ElasticDsl$;
import com.sksamuel.elastic4s.ElasticError;
import com.sksamuel.elastic4s.Handler;
import com.sksamuel.elastic4s.RequestFailure;
import com.sksamuel.elastic4s.RequestSuccess;
import com.sksamuel.elastic4s.Response;
import com.sksamuel.elastic4s.requests.searches.SearchHit;
import com.sksamuel.elastic4s.requests.searches.SearchRequest;
import com.sksamuel.elastic4s.requests.searches.SearchResponse;
import java.io.Serializable;
import monix.connect.elasticsearch.ElasticsearchSource$;
import monix.connect.elasticsearch.package$;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Cancelable;
import monix.execution.internal.InternalApi;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005\rd!\u0002\n\u0014\u0001MI\u0002\u0002\u0003\u0018\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0019\t\u0011M\u0002!\u0011!Q\u0001\fQBQ\u0001\u000f\u0001\u0005\u0002eBqa\u0010\u0001A\u0002\u0013%\u0001\tC\u0004S\u0001\u0001\u0007I\u0011B*\t\re\u0003\u0001\u0015)\u0003B\u0011\u001dQ\u0006A1A\u0005\nmCa\u0001\u0018\u0001!\u0002\u00139\u0005\"B/\u0001\t\u0003r\u0006\"B7\u0001\t\u0013q\u0007bBA\u0002\u0001\u0011%\u0011Q\u0001\u0005\b\u00037\u0001A\u0011BA\u000f\u000f\u001d\t\td\u0005E\u0001\u0003g1aAE\n\t\u0002\u0005U\u0002B\u0002\u001d\u000f\t\u0003\t\u0019\u0005C\u0004\u0002F9!\t!a\u0012\t\u0013\u0005=c\"!A\u0005\n\u0005E#aE#mCN$\u0018nY:fCJ\u001c\u0007nU8ve\u000e,'B\u0001\u000b\u0016\u00035)G.Y:uS\u000e\u001cX-\u0019:dQ*\u0011acF\u0001\bG>tg.Z2u\u0015\u0005A\u0012!B7p]&D8C\u0001\u0001\u001b!\rYb\u0004I\u0007\u00029)\u0011QdF\u0001\te\u0016\f7\r^5wK&\u0011q\u0004\b\u0002\u000b\u001f\n\u001cXM\u001d<bE2,\u0007CA\u0011-\u001b\u0005\u0011#BA\u0012%\u0003!\u0019X-\u0019:dQ\u0016\u001c(BA\u0013'\u0003!\u0011X-];fgR\u001c(BA\u0014)\u0003%)G.Y:uS\u000e$4O\u0003\u0002*U\u0005A1o[:b[V,GNC\u0001,\u0003\r\u0019w.\\\u0005\u0003[\t\u0012\u0011bU3be\u000eD\u0007*\u001b;\u0002\u000fI,\u0017/^3ti\u000e\u0001\u0001CA\u00112\u0013\t\u0011$EA\u0007TK\u0006\u00148\r\u001b*fcV,7\u000f^\u0001\u0007G2LWM\u001c;\u0011\u0005U2T\"\u0001\u0014\n\u0005]2#!D#mCN$\u0018nY\"mS\u0016tG/\u0001\u0004=S:LGO\u0010\u000b\u0003uy\"\"aO\u001f\u0011\u0005q\u0002Q\"A\n\t\u000bM\u001a\u00019\u0001\u001b\t\u000b9\u001a\u0001\u0019\u0001\u0019\u0002\u0011M\u001c'o\u001c7m\u0013\u0012,\u0012!\u0011\t\u0004\u0005\u0016;U\"A\"\u000b\u0003\u0011\u000bQa]2bY\u0006L!AR\"\u0003\r=\u0003H/[8o!\tAuJ\u0004\u0002J\u001bB\u0011!jQ\u0007\u0002\u0017*\u0011AjL\u0001\u0007yI|w\u000e\u001e \n\u00059\u001b\u0015A\u0002)sK\u0012,g-\u0003\u0002Q#\n11\u000b\u001e:j]\u001eT!AT\"\u0002\u0019M\u001c'o\u001c7m\u0013\u0012|F%Z9\u0015\u0005Q;\u0006C\u0001\"V\u0013\t16I\u0001\u0003V]&$\bb\u0002-\u0006\u0003\u0003\u0005\r!Q\u0001\u0004q\u0012\n\u0014!C:de>dG.\u00133!\u0003%YW-\u001a9BY&4X-F\u0001H\u0003)YW-\u001a9BY&4X\rI\u0001\u0012k:\u001c\u0018MZ3Tk\n\u001c8M]5cK\u001asGCA0f!\t\u00017-D\u0001b\u0015\t\u0011w#A\u0005fq\u0016\u001cW\u000f^5p]&\u0011A-\u0019\u0002\u000b\u0007\u0006t7-\u001a7bE2,\u0007\"\u00024\n\u0001\u00049\u0017AC:vEN\u001c'/\u001b2feB\u0019\u0001n\u001b\u0011\u000e\u0003%T!A\u001b\u000f\u0002\u0013=\u00147/\u001a:wKJ\u001c\u0018B\u00017j\u0005)\u0019VOY:de&\u0014WM]\u0001\tM\u0006\u001cH\u000fT8paR\u0019q.^@\u0011\u0007A\u001cH+D\u0001r\u0015\t\u0011x#\u0001\u0003fm\u0006d\u0017B\u0001;r\u0005\u0011!\u0016m]6\t\u000bYT\u0001\u0019A<\u0002\r\t,hMZ3s!\rAX\u0010I\u0007\u0002s*\u0011!p_\u0001\b[V$\u0018M\u00197f\u0015\ta8)\u0001\u0006d_2dWm\u0019;j_:L!A`=\u0003\u000bE+X-^3\t\r\u0005\u0005!\u00021\u0001h\u0003\r\u0019XOY\u0001\u0010a>\u0004X\u000f\\1uK\"\u000bg\u000e\u001a7feR9A+a\u0002\u0002\u0018\u0005e\u0001bBA\u0005\u0017\u0001\u0007\u00111B\u0001\te\u0016\u001c\bo\u001c8tKB)Q'!\u0004\u0002\u0012%\u0019\u0011q\u0002\u0014\u0003\u0011I+7\u000f]8og\u0016\u00042!IA\n\u0013\r\t)B\t\u0002\u000f'\u0016\f'o\u00195SKN\u0004xN\\:f\u0011\u001518\u00021\u0001x\u0011\u0019\t\ta\u0003a\u0001O\u0006)a-\u001a;dQR)q.a\b\u0002\"!)a\u000f\u0004a\u0001o\"1\u0011\u0011\u0001\u0007A\u0002\u001dD3\u0001AA\u0013!\u0011\t9#!\f\u000e\u0005\u0005%\"bAA\u0016C\u0006A\u0011N\u001c;fe:\fG.\u0003\u0003\u00020\u0005%\"aC%oi\u0016\u0014h.\u00197Ba&\f1#\u00127bgRL7m]3be\u000eD7k\\;sG\u0016\u0004\"\u0001\u0010\b\u0014\u000b9\t9$!\u0010\u0011\u0007\t\u000bI$C\u0002\u0002<\r\u0013a!\u00118z%\u00164\u0007c\u0001\"\u0002@%\u0019\u0011\u0011I\"\u0003\u0019M+'/[1mSj\f'\r\\3\u0015\u0005\u0005M\u0012AB:fCJ\u001c\u0007\u000e\u0006\u0003\u0002J\u00055CcA\u001e\u0002L!)1\u0007\u0005a\u0002i!)a\u0006\u0005a\u0001a\u0005Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\t\u0019\u0006\u0005\u0003\u0002V\u0005}SBAA,\u0015\u0011\tI&a\u0017\u0002\t1\fgn\u001a\u0006\u0003\u0003;\nAA[1wC&!\u0011\u0011MA,\u0005\u0019y%M[3di\u0002")
@InternalApi
public class ElasticsearchSource
extends Observable<SearchHit> {
    private final SearchRequest request;
    private final ElasticClient client;
    private Option<String> scrollId;
    private final String keepAlive;

    public static ElasticsearchSource search(SearchRequest searchRequest, ElasticClient elasticClient) {
        return ElasticsearchSource$.MODULE$.search(searchRequest, elasticClient);
    }

    private Option<String> scrollId() {
        return this.scrollId;
    }

    private void scrollId_$eq(Option<String> x$1) {
        this.scrollId = x$1;
    }

    private String keepAlive() {
        return this.keepAlive;
    }

    public Cancelable unsafeSubscribeFn(Subscriber<SearchHit> subscriber) {
        return this.fastLoop((Queue<SearchHit>)((Queue)Queue$.MODULE$.empty()), subscriber).runToFuture(subscriber.scheduler());
    }

    private Task<BoxedUnit> fastLoop(Queue<SearchHit> buffer, Subscriber<SearchHit> sub) {
        return this.fetch(buffer, sub).flatMap((Function1 & Serializable & scala.Serializable)x$1 -> {
            if (buffer.nonEmpty()) {
                return Task$.MODULE$.deferFuture((Function0 & Serializable & scala.Serializable)() -> sub.onNext(buffer.dequeue()));
            }
            return Task$.MODULE$.now((Object)Ack.Stop$.MODULE$);
        }).flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Ack ack = x0$1;
            if (Ack.Continue$.MODULE$.equals(ack)) {
                return this.fastLoop(buffer, sub);
            }
            if (Ack.Stop$.MODULE$.equals(ack)) {
                sub.onComplete();
                return Task$.MODULE$.unit();
            }
            throw new MatchError((Object)ack);
        });
    }

    private void populateHandler(Response<SearchResponse> response, Queue<SearchHit> buffer, Subscriber<SearchHit> sub) {
        Response<SearchResponse> response2 = response;
        if (response2 instanceof RequestFailure) {
            RequestFailure requestFailure = (RequestFailure)response2;
            ElasticError error = requestFailure.error();
            sub.onError((Throwable)error.asException());
            return;
        }
        if (response2 instanceof RequestSuccess) {
            RequestSuccess requestSuccess = (RequestSuccess)response2;
            SearchResponse result = (SearchResponse)requestSuccess.result();
            Option option = result.scrollId();
            if (None$.MODULE$.equals(option)) {
                sub.onError((Throwable)new RuntimeException("Search response did not include a scroll id"));
            } else if (option instanceof Some) {
                Some some = (Some)option;
                String id = (String)some.value();
                this.scrollId_$eq((Option<String>)new Some((Object)id));
                if (result.hits().hits().length == 0 && buffer.isEmpty()) {
                    sub.onComplete();
                } else {
                    buffer.$plus$plus$eq((TraversableOnce)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])result.hits().hits())));
                }
            } else {
                throw new MatchError((Object)option);
            }
            return;
        }
        throw new MatchError(response2);
    }

    private Task<BoxedUnit> fetch(Queue<SearchHit> buffer, Subscriber<SearchHit> sub) {
        if (buffer.isEmpty()) {
            Option<String> option = this.scrollId();
            if (option instanceof Some) {
                Some some = (Some)option;
                String id = (String)some.value();
                return ((Task)this.client.execute((Object)ElasticDsl$.MODULE$.searchScroll(id).keepAlive(this.keepAlive()), package$.MODULE$.taskExecutor(), package$.MODULE$.taskFunctor(), (Handler)ElasticDsl$.MODULE$.SearchScrollHandler(), ManifestFactory$.MODULE$.classType(SearchResponse.class), CommonRequestOptions$.MODULE$.defaults())).map((Function1 & Serializable & scala.Serializable)x$2 -> {
                    this.populateHandler((Response<SearchResponse>)x$2, (Queue<SearchHit>)buffer, (Subscriber<SearchHit>)sub);
                    return BoxedUnit.UNIT;
                });
            }
            if (None$.MODULE$.equals(option)) {
                return ((Task)this.client.execute((Object)this.request.keepAlive(this.keepAlive()), package$.MODULE$.taskExecutor(), package$.MODULE$.taskFunctor(), (Handler)ElasticDsl$.MODULE$.SearchHandler(), ManifestFactory$.MODULE$.classType(SearchResponse.class), CommonRequestOptions$.MODULE$.defaults())).map((Function1 & Serializable & scala.Serializable)x$3 -> {
                    this.populateHandler((Response<SearchResponse>)x$3, (Queue<SearchHit>)buffer, (Subscriber<SearchHit>)sub);
                    return BoxedUnit.UNIT;
                });
            }
            throw new MatchError(option);
        }
        return Task$.MODULE$.unit();
    }

    public ElasticsearchSource(SearchRequest request, ElasticClient client) {
        this.request = request;
        this.client = client;
        this.scrollId = None$.MODULE$;
        this.keepAlive = (String)request.keepAlive().getOrElse((Function0 & Serializable & scala.Serializable)() -> "1m");
    }
}

