package no.nextgentel.oss.akkatools.persistence.jdbcjournal;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSub$;
import akka.cluster.pubsub.DistributedPubSubMediator$Subscribe$;
import akka.event.LoggingAdapter;
import akka.persistence.PersistentRepr;
import akka.persistence.query.EventEnvelope;
import akka.serialization.SerializationExtension$;
import akka.serialization.Serializer;
import akka.stream.actor.ActorPublisher;
import akka.stream.actor.ActorPublisherState;
import org.reactivestreams.Subscriber;
import scala.MatchError;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.immutable.Vector;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: JdbcReadJournalProvider.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005d\u0001B\u0001\u0003\u0001=\u0011aD\u00133cG\u00163XM\u001c;t\u0005f\u0004VM]:jgR,gnY3JI\u0006\u001bGo\u001c:\u000b\u0005\r!\u0011a\u00036eE\u000eTw.\u001e:oC2T!!\u0002\u0004\u0002\u0017A,'o]5ti\u0016t7-\u001a\u0006\u0003\u000f!\t\u0011\"Y6lCR|w\u000e\\:\u000b\u0005%Q\u0011aA8tg*\u00111\u0002D\u0001\u000b]\u0016DHoZ3oi\u0016d'\"A\u0007\u0002\u00059|7\u0001A\n\u0005\u0001A1r\u0005\u0005\u0002\u0012)5\t!CC\u0001\u0014\u0003\u0015\u00198-\u00197b\u0013\t)\"C\u0001\u0004B]f\u0014VM\u001a\t\u0004/y\u0001S\"\u0001\r\u000b\u0005eQ\u0012!B1di>\u0014(BA\u000e\u001d\u0003\u0019\u0019HO]3b[*\tQ$\u0001\u0003bW.\f\u0017BA\u0010\u0019\u00059\t5\r^8s!V\u0014G.[:iKJ\u0004\"!I\u0013\u000e\u0003\tR!a\t\u0013\u0002\u000bE,XM]=\u000b\u0005\u0015a\u0012B\u0001\u0014#\u00055)e/\u001a8u\u000b:4X\r\\8qKB\u0011\u0001FK\u0007\u0002S)\u0011\u0011\u0004H\u0005\u0003W%\u0012A\"Q2u_JdunZ4j]\u001eD\u0001\"\f\u0001\u0003\u0002\u0003\u0006IAL\u0001'U\u0012\u00147MS8ve:\fGNU;oi&lW\rR1uC\u001a\u000b7\r^8ss\u000ec\u0017m]:OC6,\u0007CA\u00183\u001d\t\t\u0002'\u0003\u00022%\u00051\u0001K]3eK\u001aL!a\r\u001b\u0003\rM#(/\u001b8h\u0015\t\t$\u0003\u0003\u00057\u0001\t\u0005\t\u0015!\u00038\u0003-\u0011XO\u001c;j[\u0016$\u0015\r^1\u0011\u0005aJT\"\u0001\u0002\n\u0005i\u0012!A\u0006&eE\u000eTu.\u001e:oC2\u0014VO\u001c;j[\u0016$\u0015\r^1\t\u0011q\u0002!\u0011!Q\u0001\nu\nA\u0001\\5wKB\u0011\u0011CP\u0005\u0003\u007fI\u0011qAQ8pY\u0016\fg\u000e\u0003\u0005B\u0001\t\u0005\t\u0015!\u0003C\u0003=\u0011XM\u001a:fg\"Le\u000e^3sm\u0006d\u0007CA\"I\u001b\u0005!%BA#G\u0003!!WO]1uS>t'BA$\u0013\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003\u0013\u0012\u0013aBR5oSR,G)\u001e:bi&|g\u000e\u0003\u0005L\u0001\t\u0005\t\u0015!\u0003/\u00035\u0001XM]:jgR,gnY3JI\"AQ\n\u0001B\u0001B\u0003%a*\u0001\bge>l7+Z9vK:\u001cWM\u0014:\u0011\u0005Ey\u0015B\u0001)\u0013\u0005\u0011auN\\4\t\u0011I\u0003!\u0011!Q\u0001\n9\u000bA\u0002^8TKF,XM\\2f\u001dJDQ\u0001\u0016\u0001\u0005\u0002U\u000ba\u0001P5oSRtD\u0003\u0003,X1fS6\fX/\u0011\u0005a\u0002\u0001\"B\u0017T\u0001\u0004q\u0003\"\u0002\u001cT\u0001\u00049\u0004\"\u0002\u001fT\u0001\u0004i\u0004\"B!T\u0001\u0004\u0011\u0005\"B&T\u0001\u0004q\u0003\"B'T\u0001\u0004q\u0005\"\u0002*T\u0001\u0004q\u0005bB0\u0001\u0005\u0004%\t\u0001Y\u0001\u000bg\u0016\u0014\u0018.\u00197ju\u0016\u0014X#A1\u0011\u0005\t,W\"A2\u000b\u0005\u0011d\u0012!D:fe&\fG.\u001b>bi&|g.\u0003\u0002gG\nQ1+\u001a:jC2L'0\u001a:\t\r!\u0004\u0001\u0015!\u0003b\u0003-\u0019XM]5bY&TXM\u001d\u0011\t\u000f)\u0004!\u0019!C\u0001W\u0006\u0019\u0002/\u001a:tSN$XM\\2f\u0013\u0012|%M[3diV\tA\u000e\u0005\u00029[&\u0011aN\u0001\u0002\u000e!\u0016\u00148/[:uK:\u001cW-\u00133\t\rA\u0004\u0001\u0015!\u0003m\u0003Q\u0001XM]:jgR,gnY3JI>\u0013'.Z2uA!9!\u000f\u0001a\u0001\n\u0013\u0019\u0018A\u00058fqR4%o\\7TKF,XM\\2f\u001dJ,\u0012A\u0014\u0005\bk\u0002\u0001\r\u0011\"\u0003w\u0003YqW\r\u001f;Ge>l7+Z9vK:\u001cWM\u0014:`I\u0015\fHCA<{!\t\t\u00020\u0003\u0002z%\t!QK\\5u\u0011\u001dYH/!AA\u00029\u000b1\u0001\u001f\u00132\u0011\u0019i\b\u0001)Q\u0005\u001d\u0006\u0019b.\u001a=u\rJ|WnU3rk\u0016t7-\u001a(sA!Aq\u0010\u0001a\u0001\n\u0003\t\t!A\u0002ck\u001a,\"!a\u0001\u0011\u000b\u0005\u0015\u0011q\u0002\u0011\u000e\u0005\u0005\u001d!\u0002BA\u0005\u0003\u0017\t\u0011\"[7nkR\f'\r\\3\u000b\u0007\u00055!#\u0001\u0006d_2dWm\u0019;j_:LA!!\u0005\u0002\b\t1a+Z2u_JD\u0011\"!\u0006\u0001\u0001\u0004%\t!a\u0006\u0002\u000f\t,hm\u0018\u0013fcR\u0019q/!\u0007\t\u0013m\f\u0019\"!AA\u0002\u0005\r\u0001\u0002CA\u000f\u0001\u0001\u0006K!a\u0001\u0002\t\t,h\r\t\u0005\n\u0003C\u0001!\u0019!C\u0001\u0003G\tAbY8oi&tW/\u001a+bg.,\"!!\n\u0011\u0007!\n9#C\u0002\u0002*%\u00121bQ1oG\u0016dG.\u00192mK\"A\u0011Q\u0006\u0001!\u0002\u0013\t)#A\u0007d_:$\u0018N\\;f)\u0006\u001c8\u000e\t\u0005\n\u0003c\u0001!\u0019!C\u0001\u0003g\ta\u0002];cgV\u0014W*\u001a3jCR|'/\u0006\u0002\u00026A\u0019\u0001&a\u000e\n\u0007\u0005e\u0012F\u0001\u0005BGR|'OU3g\u0011!\ti\u0004\u0001Q\u0001\n\u0005U\u0012a\u00049vEN,(-T3eS\u0006$xN\u001d\u0011\t\u000f\u0005\u0005\u0003\u0001\"\u0011\u0002D\u0005A\u0001o\\:u'R|\u0007\u000fF\u0001x\u0011\u001d\t9\u0005\u0001C\u0001\u0003\u0013\nqA]3dK&4X-\u0006\u0002\u0002LA1\u0011#!\u0014\u0002R]L1!a\u0014\u0013\u0005=\u0001\u0016M\u001d;jC24UO\\2uS>t\u0007cA\t\u0002T%\u0019\u0011Q\u000b\n\u0003\u0007\u0005s\u0017\u0010C\u0004\u0002Z\u0001!\t!a\u0011\u0002\r\u0011|wk\u001c:l\u0011\u0019\u0019\u0003\u0001\"\u0001\u0002D!9\u0011q\f\u0001\u0005\u0006\u0005\r\u0013A\u00033fY&4XM\u001d\"vM\u0002")
/* loaded from: input_file:no/nextgentel/oss/akkatools/persistence/jdbcjournal/JdbcEventsByPersistenceIdActor.class */
public class JdbcEventsByPersistenceIdActor implements ActorPublisher<EventEnvelope>, ActorLogging {
    private final JdbcJournalRuntimeData runtimeData;
    private final boolean live;
    private final String persistenceId;
    private final long toSequenceNr;
    private final Serializer serializer;
    private final PersistenceId persistenceIdObject;
    private long no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr;
    private Vector<EventEnvelope> buf;
    private final Cancellable continueTask;
    private final ActorRef pubsubMediator;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorPublisherState akka$stream$actor$ActorPublisher$$state;
    private Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber;
    private long akka$stream$actor$ActorPublisher$$demand;
    private ActorPublisher.Internal.LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState;
    private Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    private final ActorContext context;
    private final ActorRef self;

    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    public LoggingAdapter log() {
        return ActorLogging.class.log(this);
    }

    public ActorPublisherState akka$stream$actor$ActorPublisher$$state() {
        return this.akka$stream$actor$ActorPublisher$$state;
    }

    public Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber() {
        return this.akka$stream$actor$ActorPublisher$$subscriber;
    }

    public void akka$stream$actor$ActorPublisher$$subscriber_$eq(Subscriber<Object> subscriber) {
        this.akka$stream$actor$ActorPublisher$$subscriber = subscriber;
    }

    public long akka$stream$actor$ActorPublisher$$demand() {
        return this.akka$stream$actor$ActorPublisher$$demand;
    }

    public void akka$stream$actor$ActorPublisher$$demand_$eq(long j) {
        this.akka$stream$actor$ActorPublisher$$demand = j;
    }

    public ActorPublisher.Internal.LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState() {
        return this.akka$stream$actor$ActorPublisher$$lifecycleState;
    }

    public void akka$stream$actor$ActorPublisher$$lifecycleState_$eq(ActorPublisher.Internal.LifecycleState lifecycleState) {
        this.akka$stream$actor$ActorPublisher$$lifecycleState = lifecycleState;
    }

    public Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout() {
        return this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    }

    public void akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout_$eq(Cancellable cancellable) {
        this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout = cancellable;
    }

    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundReceive(PartialFunction partialFunction, Object obj) {
        Actor.class.aroundReceive(this, partialFunction, obj);
    }

    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreStart() {
        Actor.class.aroundPreStart(this);
    }

    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreRestart(Throwable th, Option option) {
        Actor.class.aroundPreRestart(this, th, option);
    }

    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostRestart(Throwable th) {
        Actor.class.aroundPostRestart(this, th);
    }

    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostStop() {
        Actor.class.aroundPostStop(this);
    }

    public void akka$stream$actor$ActorPublisher$_setter_$akka$stream$actor$ActorPublisher$$state_$eq(ActorPublisherState actorPublisherState) {
        this.akka$stream$actor$ActorPublisher$$state = actorPublisherState;
    }

    public Duration subscriptionTimeout() {
        return ActorPublisher.class.subscriptionTimeout(this);
    }

    public final boolean isActive() {
        return ActorPublisher.class.isActive(this);
    }

    public final long totalDemand() {
        return ActorPublisher.class.totalDemand(this);
    }

    public final boolean isCompleted() {
        return ActorPublisher.class.isCompleted(this);
    }

    public final boolean isErrorEmitted() {
        return ActorPublisher.class.isErrorEmitted(this);
    }

    public final boolean isCanceled() {
        return ActorPublisher.class.isCanceled(this);
    }

    public void onNext(Object obj) {
        ActorPublisher.class.onNext(this, obj);
    }

    public void onComplete() {
        ActorPublisher.class.onComplete(this);
    }

    public void onCompleteThenStop() {
        ActorPublisher.class.onCompleteThenStop(this);
    }

    public void onError(Throwable th) {
        ActorPublisher.class.onError(this, th);
    }

    public void onErrorThenStop(Throwable th) {
        ActorPublisher.class.onErrorThenStop(this, th);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        ActorPublisher.class.aroundReceive(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        ActorPublisher.class.aroundPreStart(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        ActorPublisher.class.aroundPreRestart(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        ActorPublisher.class.aroundPostRestart(this, th);
    }

    public void aroundPostStop() {
        ActorPublisher.class.aroundPostStop(this);
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public final ActorRef sender() {
        return Actor.class.sender(this);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.class.supervisorStrategy(this);
    }

    public void preStart() throws Exception {
        Actor.class.preStart(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.class.preRestart(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.class.postRestart(this, th);
    }

    public void unhandled(Object obj) {
        Actor.class.unhandled(this, obj);
    }

    public Serializer serializer() {
        return this.serializer;
    }

    public PersistenceId persistenceIdObject() {
        return this.persistenceIdObject;
    }

    private long no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr() {
        return this.no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr;
    }

    public void no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr_$eq(long j) {
        this.no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr = j;
    }

    public Vector<EventEnvelope> buf() {
        return this.buf;
    }

    public void buf_$eq(Vector<EventEnvelope> vector) {
        this.buf = vector;
    }

    public Cancellable continueTask() {
        return this.continueTask;
    }

    public ActorRef pubsubMediator() {
        return this.pubsubMediator;
    }

    public void postStop() {
        continueTask().cancel();
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new JdbcEventsByPersistenceIdActor$$anonfun$receive$1(this);
    }

    public void doWork() {
        query();
        deliverBuf();
    }

    public void query() {
        if (buf().isEmpty()) {
            try {
                log().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Reading entries for persistenceId=", " - nextFromSequenceNr=", ", toSequenceNr=", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.persistenceId, BoxesRunTime.boxToLong(no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr()), BoxesRunTime.boxToLong(this.toSequenceNr)})));
                buf_$eq(((TraversableOnce) this.runtimeData.repo().loadJournalEntries(persistenceIdObject(), no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr(), this.toSequenceNr, this.runtimeData.maxRowsPrRead()).map(new JdbcEventsByPersistenceIdActor$$anonfun$query$1(this), List$.MODULE$.canBuildFrom())).toVector());
                if (this.live || !buf().isEmpty()) {
                    return;
                }
                log().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stopping none-live stream for persistenceId=", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.persistenceId})));
                onCompleteThenStop();
            } catch (Exception e) {
                onErrorThenStop(e);
            }
        }
    }

    public final void deliverBuf() {
        if (totalDemand() <= 0 || !buf().nonEmpty()) {
            return;
        }
        if (totalDemand() > 2147483647L) {
            buf().foreach(new JdbcEventsByPersistenceIdActor$$anonfun$deliverBuf$2(this));
            buf_$eq(package$.MODULE$.Vector().empty());
            return;
        }
        Tuple2 splitAt = buf().splitAt((int) totalDemand());
        if (splitAt == null) {
            throw new MatchError(splitAt);
        }
        Tuple2 tuple2 = new Tuple2((Vector) splitAt._1(), (Vector) splitAt._2());
        Vector vector = (Vector) tuple2._1();
        buf_$eq((Vector) tuple2._2());
        vector.foreach(new JdbcEventsByPersistenceIdActor$$anonfun$deliverBuf$1(this));
    }

    public JdbcEventsByPersistenceIdActor(String str, JdbcJournalRuntimeData jdbcJournalRuntimeData, boolean z, FiniteDuration finiteDuration, String str2, long j, long j2) {
        this.runtimeData = jdbcJournalRuntimeData;
        this.live = z;
        this.persistenceId = str2;
        this.toSequenceNr = j2;
        Actor.class.$init$(this);
        ActorPublisher.class.$init$(this);
        ActorLogging.class.$init$(this);
        this.serializer = SerializationExtension$.MODULE$.get(context().system()).serializerFor(PersistentRepr.class);
        this.persistenceIdObject = jdbcJournalRuntimeData.persistenceIdSplitter().split(str2);
        this.no$nextgentel$oss$akkatools$persistence$jdbcjournal$JdbcEventsByPersistenceIdActor$$nextFromSequenceNr = j;
        this.buf = package$.MODULE$.Vector().empty();
        this.continueTask = context().system().scheduler().schedule(finiteDuration, finiteDuration, self(), Continue$.MODULE$, context().dispatcher(), self());
        this.pubsubMediator = DistributedPubSub$.MODULE$.apply(context().system()).mediator();
        if (!z || persistenceIdObject().isFull()) {
            return;
        }
        akka.actor.package$.MODULE$.actorRef2Scala(pubsubMediator()).$bang(DistributedPubSubMediator$Subscribe$.MODULE$.apply(EntryWrittenToTag$.MODULE$.topic(str, persistenceIdObject().typePath()), self()), self());
    }
}
