package weco.messaging.sqs;

import grizzled.slf4j.Logger;
import grizzled.slf4j.Logging;
import io.circe.Decoder;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.ActorAttributes$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.Supervision;
import org.apache.pekko.stream.Supervision$;
import org.apache.pekko.stream.Supervision$Resume$;
import org.apache.pekko.stream.Supervision$Stop$;
import org.apache.pekko.stream.connectors.sqs.MessageAction;
import org.apache.pekko.stream.connectors.sqs.MessageAction$Delete$;
import org.apache.pekko.stream.connectors.sqs.scaladsl.SqsAckSink$;
import org.apache.pekko.stream.connectors.sqs.scaladsl.SqsSource$;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.RunnableGraph;
import org.apache.pekko.stream.scaladsl.Sink;
import org.apache.pekko.stream.scaladsl.Source;
import org.slf4j.Marker;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import weco.json.JsonUtil$;
import weco.json.exceptions.JsonDecodingError;
import weco.monitoring.Metrics;

/* compiled from: SQSStream.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005h\u0001\u0002\n\u0014\u0001iA\u0001B\u000b\u0001\u0003\u0002\u0003\u0006Ia\u000b\u0005\tq\u0001\u0011\t\u0011)A\u0005s!AQ\b\u0001B\u0001B\u0003%a\b\u0003\u0005K\u0001\t\u0015\r\u0011b\u0001L\u0011!A\u0006A!A!\u0002\u0013a\u0005\"B-\u0001\t\u0003Q\u0006b\u00027\u0001\u0005\u0004%\u0019!\u001c\u0005\u0007c\u0002\u0001\u000b\u0011\u00028\t\u000fI\u0004!\u0019!C\u0005g\"9\u0011Q\u0002\u0001!\u0002\u0013!\b\"CA\b\u0001\t\u0007I\u0011BA\t\u0011!\ty\u0003\u0001Q\u0001\n\u0005M\u0001bBA\u0019\u0001\u0011\u0005\u00111\u0007\u0005\b\u0003o\u0002A\u0011AA=\u0011\u001d\ty\n\u0001C\u0001\u0003CCq!a,\u0001\t\u0013\t\t\fC\u0004\u0002H\u0002!I!!3\u0003\u0013M\u000b6k\u0015;sK\u0006l'B\u0001\u000b\u0016\u0003\r\u0019\u0018o\u001d\u0006\u0003-]\t\u0011\"\\3tg\u0006<\u0017N\\4\u000b\u0003a\tAa^3d_\u000e\u0001QCA\u000e`'\r\u0001AD\t\t\u0003;\u0001j\u0011A\b\u0006\u0002?\u0005)1oY1mC&\u0011\u0011E\b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005\rBS\"\u0001\u0013\u000b\u0005\u00152\u0013!B:mMRR'\"A\u0014\u0002\u0011\u001d\u0014\u0018N\u001f>mK\u0012L!!\u000b\u0013\u0003\u000f1{wmZ5oO\u0006I1/]:DY&,g\u000e\u001e\t\u0003YYj\u0011!\f\u0006\u0003)9R!a\f\u0019\u0002\u0011M,'O^5dKNT!!\r\u001a\u0002\r\u0005<8o\u001d3l\u0015\t\u0019D'\u0001\u0004b[\u0006TxN\u001c\u0006\u0002k\u0005A1o\u001c4uo\u0006\u0014X-\u0003\u00028[\tq1+]:Bgft7m\u00117jK:$\u0018!C:rg\u000e{gNZ5h!\tQ4(D\u0001\u0014\u0013\ta4CA\u0005T#N\u001buN\u001c4jO\u0006iQ.\u001a;sS\u000e\u001c8+\u001a8eKJ\u00042a\u0010\"E\u001b\u0005\u0001%BA!\u0018\u0003)iwN\\5u_JLgnZ\u0005\u0003\u0007\u0002\u0013q!T3ue&\u001c7\u000f\u0005\u0002F\u00116\taI\u0003\u0002H=\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0005%3%A\u0002$viV\u0014X-A\u0006bGR|'oU=ti\u0016lW#\u0001'\u0011\u000553V\"\u0001(\u000b\u0005=\u0003\u0016!B1di>\u0014(BA)S\u0003\u0015\u0001Xm[6p\u0015\t\u0019F+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002+\u0006\u0019qN]4\n\u0005]s%aC!di>\u00148+_:uK6\fA\"Y2u_J\u001c\u0016p\u001d;f[\u0002\na\u0001P5oSRtD\u0003B.jU.$\"\u0001\u00185\u0011\u0007i\u0002Q\f\u0005\u0002_?2\u0001A!\u00021\u0001\u0005\u0004\t'!\u0001+\u0012\u0005\t,\u0007CA\u000fd\u0013\t!gDA\u0004O_RD\u0017N\\4\u0011\u0005u1\u0017BA4\u001f\u0005\r\te.\u001f\u0005\u0006\u0015\u001a\u0001\u001d\u0001\u0014\u0005\u0006U\u0019\u0001\ra\u000b\u0005\u0006q\u0019\u0001\r!\u000f\u0005\u0006{\u0019\u0001\rAP\u0001\u000bI&\u001c\b/\u0019;dQ\u0016\u0014X#\u00018\u0011\u0005\u0015{\u0017B\u00019G\u0005a)\u00050Z2vi&|gnQ8oi\u0016DH/\u0012=fGV$xN]\u0001\fI&\u001c\b/\u0019;dQ\u0016\u0014\b%\u0001\u0004t_V\u00148-Z\u000b\u0002iB)QO\u001f?\u0002\u00065\taO\u0003\u0002xq\u0006A1oY1mC\u0012\u001cHN\u0003\u0002z!\u000611\u000f\u001e:fC6L!a\u001f<\u0003\rM{WO]2f!\ri\u0018\u0011A\u0007\u0002}*\u0011q0L\u0001\u0006[>$W\r\\\u0005\u0004\u0003\u0007q(aB'fgN\fw-\u001a\t\u0005\u0003\u000f\tI!D\u0001Q\u0013\r\tY\u0001\u0015\u0002\b\u001d>$Xk]3e\u0003\u001d\u0019x.\u001e:dK\u0002\nAa]5oWV\u0011\u00111\u0003\t\bk\u0006U\u0011\u0011DA\u0014\u0013\r\t9B\u001e\u0002\u0005'&t7\u000e\u0005\u0003\u0002\u001c\u0005\rRBAA\u000f\u0015\r!\u0012q\u0004\u0006\u0004\u0003CA\u0018AC2p]:,7\r^8sg&!\u0011QEA\u000f\u00055iUm]:bO\u0016\f5\r^5p]B!Q\tSA\u0015!\u0011\t9!a\u000b\n\u0007\u00055\u0002K\u0001\u0003E_:,\u0017!B:j].\u0004\u0013a\u00024pe\u0016\f7\r\u001b\u000b\u0007\u0003k\tY%!\u001a\u0015\t\u0005\u001d\u0012q\u0007\u0005\b\u0003si\u00019AA\u001e\u0003!!WmY8eKJ$\u0006#BA\u001f\u0003\u000fjVBAA \u0015\u0011\t\t%a\u0011\u0002\u000b\rL'oY3\u000b\u0005\u0005\u0015\u0013AA5p\u0013\u0011\tI%a\u0010\u0003\u000f\u0011+7m\u001c3fe\"9\u0011QJ\u0007A\u0002\u0005=\u0013AC:ue\u0016\fWNT1nKB!\u0011\u0011KA0\u001d\u0011\t\u0019&a\u0017\u0011\u0007\u0005Uc$\u0004\u0002\u0002X)\u0019\u0011\u0011L\r\u0002\rq\u0012xn\u001c;?\u0013\r\tiFH\u0001\u0007!J,G-\u001a4\n\t\u0005\u0005\u00141\r\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005uc\u0004C\u0004\u0002h5\u0001\r!!\u001b\u0002\u000fA\u0014xnY3tgB1Q$a\u001b^\u0003_J1!!\u001c\u001f\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0003F\u0011\u0006E\u0004cA\u000f\u0002t%\u0019\u0011Q\u000f\u0010\u0003\tUs\u0017\u000e^\u0001\teVtwI]1qQR!\u00111PAO)\u0011\ti(a!\u0015\t\u0005\u001d\u0012q\u0010\u0005\b\u0003\u0003s\u00019AA\u001e\u0003\u001d!WmY8eKJDq!!\"\u000f\u0001\u0004\t9)\u0001\u0007he\u0006\u0004\bNQ3uo\u0016,g\u000eE\u0005\u001e\u0003\u0013\u000bi)!&\u0002\u0018&\u0019\u00111\u0012\u0010\u0003\u0013\u0019+hn\u0019;j_:\u0014\u0004CB;{\u0003\u001f\u000b)\u0001E\u0003\u001e\u0003#cX,C\u0002\u0002\u0014z\u0011a\u0001V;qY\u0016\u0014\u0004CB;\u0002\u0016q\f9\u0003E\u0003v\u00033\u000b9#C\u0002\u0002\u001cZ\u0014QBU;o]\u0006\u0014G.Z$sCBD\u0007bBA'\u001d\u0001\u0007\u0011qJ\u0001\neVt7\u000b\u001e:fC6$b!a)\u0002(\u0006%F\u0003BA\u0014\u0003KCq!!!\u0010\u0001\b\tY\u0004C\u0004\u0002N=\u0001\r!a\u0014\t\u000f\u0005-v\u00021\u0001\u0002.\u0006aQn\u001c3jMf\u001cv.\u001e:dKB1Q$a\u001b\u0002\u000eR\fq\u0001Z3dS\u0012,'\u000f\u0006\u0003\u00024\u0006\r\u0007\u0003BA[\u0003{sA!a.\u0002:6\t\u00010C\u0002\u0002<b\f1bU;qKJ4\u0018n]5p]&!\u0011qXAa\u0005\u001d!UmY5eKJT1!a/y\u0011\u001d\t)\r\u0005a\u0001\u0003\u001f\n!\"\\3ue&\u001cg*Y7f\u00031awnZ#yG\u0016\u0004H/[8o)\u0011\t\t(a3\t\u000f\u00055\u0017\u00031\u0001\u0002P\u0006IQ\r_2faRLwN\u001c\t\u0005\u0003#\fYN\u0004\u0003\u0002T\u0006]g\u0002BA+\u0003+L\u0011aH\u0005\u0004\u00033t\u0012a\u00029bG.\fw-Z\u0005\u0005\u0003;\fyNA\u0005UQJ|w/\u00192mK*\u0019\u0011\u0011\u001c\u0010")
/* loaded from: input_file:weco/messaging/sqs/SQSStream.class */
public class SQSStream<T> implements Logging {
    private final SQSConfig sqsConfig;
    private final Metrics<Future> metricsSender;
    private final ActorSystem actorSystem;
    private final ExecutionContextExecutor dispatcher;
    private final Source<Message, NotUsed> source;
    private final Sink<MessageAction, Future<Done>> sink;
    private transient Logger grizzled$slf4j$Logging$$_logger;
    private volatile transient boolean bitmap$trans$0;

    public Logger logger() {
        return Logging.logger$(this);
    }

    public String loggerName() {
        return Logging.loggerName$(this);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void trace(Function0<Object> function0) {
        Logging.trace$(this, function0);
    }

    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, function0, function02);
    }

    public void trace(Marker marker, Function0<Object> function0, Function0<Throwable> function02) {
        Logging.trace$(this, marker, function0, function02);
    }

    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    public void debug(Function0<Object> function0) {
        Logging.debug$(this, function0);
    }

    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, function0, function02);
    }

    public void debug(Marker marker, Function0<Object> function0, Function0<Throwable> function02) {
        Logging.debug$(this, marker, function0, function02);
    }

    public boolean isErrorEnabled() {
        return Logging.isErrorEnabled$(this);
    }

    public void error(Function0<Object> function0) {
        Logging.error$(this, function0);
    }

    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, function0, function02);
    }

    public void error(Marker marker, Function0<Object> function0, Function0<Throwable> function02) {
        Logging.error$(this, marker, function0, function02);
    }

    public boolean isInfoEnabled() {
        return Logging.isInfoEnabled$(this);
    }

    public void info(Function0<Object> function0) {
        Logging.info$(this, function0);
    }

    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, function0, function02);
    }

    public void info(Marker marker, Function0<Object> function0, Function0<Throwable> function02) {
        Logging.info$(this, marker, function0, function02);
    }

    public boolean isWarnEnabled() {
        return Logging.isWarnEnabled$(this);
    }

    public void warn(Function0<Object> function0) {
        Logging.warn$(this, function0);
    }

    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, function0, function02);
    }

    public void warn(Marker marker, Function0<Object> function0, Function0<Throwable> function02) {
        Logging.warn$(this, marker, function0, function02);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [weco.messaging.sqs.SQSStream] */
    private Logger grizzled$slf4j$Logging$$_logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.grizzled$slf4j$Logging$$_logger = Logging.grizzled$slf4j$Logging$$_logger$(this);
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.grizzled$slf4j$Logging$$_logger;
    }

    public Logger grizzled$slf4j$Logging$$_logger() {
        return !this.bitmap$trans$0 ? grizzled$slf4j$Logging$$_logger$lzycompute() : this.grizzled$slf4j$Logging$$_logger;
    }

    public ActorSystem actorSystem() {
        return this.actorSystem;
    }

    public ExecutionContextExecutor dispatcher() {
        return this.dispatcher;
    }

    private Source<Message, NotUsed> source() {
        return this.source;
    }

    private Sink<MessageAction, Future<Done>> sink() {
        return this.sink;
    }

    public Future<Done> foreach(String str, Function1<T, Future<BoxedUnit>> function1, Decoder<T> decoder) {
        return runStream(str, source -> {
            return source.mapAsyncUnordered(Predef$.MODULE$.Integer2int(this.sqsConfig.parallelism()), tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Message message = (Message) tuple2._1();
                Object _2 = tuple2._2();
                this.debug(() -> {
                    return new StringBuilder(19).append("Processing message ").append(message.messageId()).toString();
                });
                return ((Future) function1.apply(_2)).map(boxedUnit -> {
                    return message;
                }, this.dispatcher());
            });
        }, decoder);
    }

    public Future<Done> runGraph(String str, Function2<Source<Tuple2<Message, T>, NotUsed>, Sink<Message, Future<Done>>, RunnableGraph<Future<Done>>> function2, Decoder<T> decoder) {
        String sb = new StringBuilder(15).append(str).append("_ProcessMessage").toString();
        return ((Future) ((RunnableGraph) function2.apply(source().map(message -> {
            return new Tuple2(message, JsonUtil$.MODULE$.fromJson(message.body(), decoder).get());
        }), Flow$.MODULE$.apply().map(message2 -> {
            this.metricsSender.incrementCount(new StringBuilder(8).append(sb).append("_success").toString());
            this.debug(() -> {
                return new StringBuilder(17).append("Deleting message ").append(message2.messageId()).toString();
            });
            return MessageAction$Delete$.MODULE$.apply(message2);
        }).toMat(sink(), Keep$.MODULE$.right()))).withAttributes(ActorAttributes$.MODULE$.supervisionStrategy(decider(sb))).run(Materializer$.MODULE$.matFromSystem(actorSystem()))).map(done -> {
            this.logger().info(() -> {
                return "SQSStream finished processing messages.";
            });
            return Done$.MODULE$;
        }, dispatcher()).recover(new SQSStream$$anonfun$runGraph$6(this), dispatcher());
    }

    public Future<Done> runStream(String str, Function1<Source<Tuple2<Message, T>, NotUsed>, Source<Message, NotUsed>> function1, Decoder<T> decoder) {
        return runGraph(str, (source, sink) -> {
            return ((Source) function1.apply(source)).toMat(sink, Keep$.MODULE$.right());
        }, decoder);
    }

    private Function1<Throwable, Supervision.Directive> decider(String str) {
        return th -> {
            Supervision$Resume$ supervision$Resume$;
            if (th instanceof JsonDecodingError) {
                this.logException((JsonDecodingError) th);
                this.metricsSender.incrementCount(new StringBuilder(20).append(str).append("_jsonDecodingFailure").toString());
                supervision$Resume$ = Supervision$.MODULE$.resume();
            } else if (th instanceof Exception) {
                this.logException((Exception) th);
                this.metricsSender.incrementCount(new StringBuilder(8).append(str).append("_failure").toString());
                supervision$Resume$ = Supervision$Resume$.MODULE$;
            } else {
                this.logger().warn(() -> {
                    return new StringBuilder(36).append("Received throwable: ").append(th).append(". Shutting down.").toString();
                });
                supervision$Resume$ = Supervision$Stop$.MODULE$;
            }
            return supervision$Resume$;
        };
    }

    private void logException(Throwable th) {
        if (th instanceof JsonDecodingError) {
            JsonDecodingError jsonDecodingError = (JsonDecodingError) th;
            logger().warn(() -> {
                return new StringBuilder(21).append("JSON decoding error: ").append(jsonDecodingError.getMessage()).toString();
            });
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(th instanceof Exception)) {
                throw new MatchError(th);
            }
            Exception exc = (Exception) th;
            logger().error(() -> {
                return new StringBuilder(28).append("Unrecognised failure while: ").append(exc.getMessage()).toString();
            }, () -> {
                return exc;
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public SQSStream(SqsAsyncClient sqsAsyncClient, SQSConfig sQSConfig, Metrics<Future> metrics, ActorSystem actorSystem) {
        this.sqsConfig = sQSConfig;
        this.metricsSender = metrics;
        this.actorSystem = actorSystem;
        Logging.$init$(this);
        this.dispatcher = actorSystem.dispatcher();
        this.source = SqsSource$.MODULE$.apply(sQSConfig.queueUrl(), SqsSource$.MODULE$.apply$default$2(), sqsAsyncClient);
        this.sink = SqsAckSink$.MODULE$.grouped(sQSConfig.queueUrl(), SqsAckSink$.MODULE$.grouped$default$2(), sqsAsyncClient);
    }
}
