package trace4cats;

import cats.Applicative$;
import cats.effect.kernel.GenTemporal;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.kernel.syntax.MonadCancelOps_$;
import cats.effect.kernel.syntax.package$monadCancel$;
import cats.effect.kernel.syntax.package$spawn$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.syntax.ApplicativeErrorOps$;
import cats.syntax.package$applicativeError$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import fs2.Chunk;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.Channel;
import fs2.concurrent.Channel$;
import org.typelevel.log4cats.Logger;
import org.typelevel.log4cats.Logger$;
import scala.MatchError;
import scala.Tuple2;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import trace4cats.kernel.SpanCompleter;
import trace4cats.kernel.SpanExporter;
import trace4cats.model.CompletedSpan;
import trace4cats.model.TraceProcess;

/* compiled from: QueuedSpanCompleter.scala */
/* loaded from: input_file:trace4cats/QueuedSpanCompleter$.class */
public final class QueuedSpanCompleter$ {
    public static QueuedSpanCompleter$ MODULE$;

    static {
        new QueuedSpanCompleter$();
    }

    public <F> Resource<F, SpanCompleter<F>> apply(TraceProcess traceProcess, SpanExporter<F, Chunk> spanExporter, CompleterConfig completerConfig, GenTemporal<F, Throwable> genTemporal, Logger<F> logger) {
        int batchSize = completerConfig.bufferSize() < completerConfig.batchSize() * 5 ? completerConfig.batchSize() * 5 : completerConfig.bufferSize();
        return Resource$.MODULE$.eval(Channel$.MODULE$.bounded(batchSize, genTemporal)).flatMap(channel -> {
            return Resource$.MODULE$.eval(Queue$.MODULE$.bounded(1, genTemporal)).flatMap(queue -> {
                return GenSpawnOps$.MODULE$.background$extension(package$spawn$.MODULE$.genSpawnOps(Stream$.MODULE$.fromQueueUnterminated(queue, Stream$.MODULE$.fromQueueUnterminated$default$2(), genTemporal).evalScan(BoxesRunTime.boxToBoolean(false), (obj, either) -> {
                    return $anonfun$apply$5(logger, batchSize, genTemporal, BoxesRunTime.unboxToBoolean(obj), either);
                }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain(), genTemporal), genTemporal).flatMap(obj2 -> {
                    return GenSpawnOps$.MODULE$.background$extension(package$spawn$.MODULE$.genSpawnOps(MonadCancelOps_$.MODULE$.uncancelable$extension(package$monadCancel$.MODULE$.monadCancelOps_(exportBatches$1(channel.stream(), completerConfig, genTemporal, spanExporter, logger)), genTemporal), genTemporal), genTemporal).onFinalize(Logger$.MODULE$.apply(logger).info(() -> {
                        return "Shut down queued span completer";
                    }), genTemporal).flatMap(obj2 -> {
                        return Resource$.MODULE$.onFinalize(package$functor$.MODULE$.toFunctorOps(channel.close(), genTemporal).void(), genTemporal).map(boxedUnit -> {
                            return new SpanCompleter<F>(channel, traceProcess, genTemporal, queue) { // from class: trace4cats.QueuedSpanCompleter$$anon$1
                                private final Channel channel$1;
                                private final TraceProcess process$1;
                                private final GenTemporal evidence$1$1;
                                private final Queue errorQueue$1;

                                public F complete(CompletedSpan.Builder builder) {
                                    return (F) package$functor$.MODULE$.toFunctorOps(package$flatMap$.MODULE$.toFlatMapOps(this.channel$1.trySend(builder.build(this.process$1)), this.evidence$1$1).flatMap(either2 -> {
                                        return package$functor$.MODULE$.toFunctorOps(GenSpawnOps$.MODULE$.start$extension(package$spawn$.MODULE$.genSpawnOps(this.errorQueue$1.tryOffer(either2), this.evidence$1$1), this.evidence$1$1), this.evidence$1$1).void();
                                    }), this.evidence$1$1).void();
                                }

                                {
                                    this.channel$1 = channel;
                                    this.process$1 = traceProcess;
                                    this.evidence$1$1 = genTemporal;
                                    this.errorQueue$1 = queue;
                                }
                            };
                        });
                    });
                });
            });
        });
    }

    private static final Object exportBatches$1(Stream stream, CompleterConfig completerConfig, GenTemporal genTemporal, SpanExporter spanExporter, Logger logger) {
        return stream.groupWithin(completerConfig.batchSize(), completerConfig.batchTimeout(), genTemporal).evalMap(chunk -> {
            return MonadCancelOps_$.MODULE$.uncancelable$extension(package$monadCancel$.MODULE$.monadCancelOps_(ApplicativeErrorOps$.MODULE$.onError$extension(package$applicativeError$.MODULE$.catsSyntaxApplicativeError(Stream$.MODULE$.retry(spanExporter.exportBatch(package$.MODULE$.Batch().apply(chunk)), completerConfig.retryConfig().delay(), finiteDuration -> {
                return completerConfig.retryConfig().nextDelay().calc(finiteDuration);
            }, completerConfig.retryConfig().maxAttempts(), Stream$.MODULE$.retry$default$5(), genTemporal, RaiseThrowable$.MODULE$.fromApplicativeError(genTemporal)).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain(), genTemporal), new QueuedSpanCompleter$$anonfun$$nestedInanonfun$apply$1$1(logger), genTemporal)), genTemporal);
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain();
    }

    public static final /* synthetic */ Object $anonfun$apply$5(Logger logger, int i, GenTemporal genTemporal, boolean z, Either either) {
        Tuple2 tuple2 = new Tuple2(BoxesRunTime.boxToBoolean(z), either);
        if (tuple2 != null) {
            boolean _1$mcZ$sp = tuple2._1$mcZ$sp();
            Right right = (Either) tuple2._2();
            if (false == _1$mcZ$sp && (right instanceof Right) && false == BoxesRunTime.unboxToBoolean(right.value())) {
                return package$functor$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger).warn(() -> {
                    return new StringBuilder(46).append("Failed to enqueue new span, buffer is full of ").append(i).toString();
                }), genTemporal).as(BoxesRunTime.boxToBoolean(true));
            }
        }
        if (tuple2 != null) {
            boolean _1$mcZ$sp2 = tuple2._1$mcZ$sp();
            Either either2 = (Either) tuple2._2();
            if (false == _1$mcZ$sp2 && (either2 instanceof Left)) {
                return package$functor$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger).warn(() -> {
                    return "Failed to enqueue new span, channel is closed";
                }), genTemporal).as(BoxesRunTime.boxToBoolean(true));
            }
        }
        if (tuple2 != null && true == tuple2._1$mcZ$sp()) {
            return Applicative$.MODULE$.apply(genTemporal).pure(BoxesRunTime.boxToBoolean(true));
        }
        if (tuple2 != null) {
            Right right2 = (Either) tuple2._2();
            if ((right2 instanceof Right) && true == BoxesRunTime.unboxToBoolean(right2.value())) {
                return Applicative$.MODULE$.apply(genTemporal).pure(BoxesRunTime.boxToBoolean(false));
            }
        }
        throw new MatchError(tuple2);
    }

    private QueuedSpanCompleter$() {
        MODULE$ = this;
    }
}
