package io.janstenpickle.trace4cats.avro;

import cats.Applicative$;
import cats.Traverse;
import cats.effect.Concurrent;
import cats.effect.ContextShift;
import cats.effect.Resource;
import cats.effect.Resource$;
import cats.effect.Sync;
import cats.effect.Sync$;
import cats.effect.Timer;
import cats.effect.Timer$;
import cats.effect.concurrent.Semaphore;
import cats.effect.concurrent.Semaphore$;
import cats.effect.syntax.BracketOps$;
import cats.effect.syntax.package$bracket$;
import cats.effect.syntax.package$concurrent$;
import cats.syntax.EitherOps$;
import cats.syntax.FlatMapOps$;
import cats.syntax.MonadOps$;
import cats.syntax.package$either$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import cats.syntax.package$monad$;
import cats.syntax.package$traverse$;
import fs2.Chunk$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$Compiler$;
import fs2.concurrent.InspectableQueue;
import fs2.concurrent.InspectableQueue$;
import fs2.concurrent.Queue;
import fs2.internal.FreeC;
import fs2.io.tcp.SocketGroup;
import fs2.io.udp.Packet;
import fs2.io.udp.Socket;
import fs2.io.udp.SocketGroup$;
import io.chrisdavenport.log4cats.Logger;
import io.chrisdavenport.log4cats.Logger$;
import io.janstenpickle.trace4cats.kernel.SpanExporter;
import io.janstenpickle.trace4cats.model.Batch;
import io.janstenpickle.trace4cats.model.CompletedSpan;
import java.io.ByteArrayOutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple3;
import scala.collection.ArrayOps$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: AvroSpanExporter.scala */
/* loaded from: input_file:io/janstenpickle/trace4cats/avro/AvroSpanExporter$.class */
public final class AvroSpanExporter$ {
    public static final AvroSpanExporter$ MODULE$ = new AvroSpanExporter$();

    private <F> F encode(Schema schema, CompletedSpan completedSpan, Sync<F> sync) {
        return (F) package$flatMap$.MODULE$.toFlatMapOps(Sync$.MODULE$.apply(sync).fromEither(EitherOps$.MODULE$.leftMap$extension(package$either$.MODULE$.catsSyntaxEither(AvroInstances$.MODULE$.completedSpanCodec().encode(completedSpan)), avroError -> {
            return avroError.throwable();
        })), sync).flatMap(obj -> {
            return Resource$.MODULE$.make(Sync$.MODULE$.apply(sync).delay(() -> {
                GenericDatumWriter genericDatumWriter = new GenericDatumWriter(schema);
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                return new Tuple3(genericDatumWriter, byteArrayOutputStream, EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null));
            }), tuple3 -> {
                if (tuple3 == null) {
                    throw new MatchError(tuple3);
                }
                ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) tuple3._2();
                return Sync$.MODULE$.apply(sync).delay(() -> {
                    byteArrayOutputStream.close();
                });
            }, sync).use(tuple32 -> {
                if (tuple32 == null) {
                    throw new MatchError(tuple32);
                }
                GenericDatumWriter genericDatumWriter = (GenericDatumWriter) tuple32._1();
                ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) tuple32._2();
                BinaryEncoder binaryEncoder = (BinaryEncoder) tuple32._3();
                return Sync$.MODULE$.apply(sync).delay(() -> {
                    genericDatumWriter.write(obj, binaryEncoder);
                    binaryEncoder.flush();
                    return byteArrayOutputStream.toByteArray();
                });
            }, sync);
        });
    }

    public <F, G> Resource<F, SpanExporter<F, G>> udp(ExecutionContext executionContext, String str, int i, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer, Logger<F> logger, Traverse<G> traverse) {
        return Resource$.MODULE$.liftF(AvroInstances$.MODULE$.completedSpanSchema(concurrent), concurrent).flatMap(schema -> {
            return Resource$.MODULE$.liftF(Sync$.MODULE$.apply(concurrent).delay(() -> {
                return new InetSocketAddress(str, i);
            }), concurrent).flatMap(inetSocketAddress -> {
                return Resource$.MODULE$.liftF(InspectableQueue$.MODULE$.bounded(1, concurrent), concurrent).flatMap(inspectableQueue -> {
                    return Resource$.MODULE$.liftF(Semaphore$.MODULE$.apply(Long.MAX_VALUE, concurrent), concurrent).flatMap(semaphore -> {
                        return SocketGroup$.MODULE$.apply(executionContext, concurrent, contextShift).flatMap(socketGroup -> {
                            return socketGroup.open(socketGroup.open$default$1(), socketGroup.open$default$2(), socketGroup.open$default$3(), socketGroup.open$default$4(), socketGroup.open$default$5(), socketGroup.open$default$6(), socketGroup.open$default$7(), socketGroup.open$default$8(), socketGroup.open$default$9(), concurrent, contextShift).flatMap(socket -> {
                                return Resource$.MODULE$.make(package$concurrent$.MODULE$.toConcurrentOps(Stream$.MODULE$.compile$extension(Stream$.MODULE$.retry(write$1(schema, inetSocketAddress, semaphore, inspectableQueue, socket, concurrent, traverse), new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds(), finiteDuration -> {
                                    return finiteDuration.$plus(new package.DurationInt(package$.MODULE$.DurationInt(1)).second());
                                }, Integer.MAX_VALUE, Stream$.MODULE$.retry$default$5(), timer, RaiseThrowable$.MODULE$.fromApplicativeError(concurrent)), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain(), concurrent).start(), fiber -> {
                                    return FlatMapOps$.MODULE$.$greater$greater$extension(package$flatMap$.MODULE$.catsSyntaxFlatMapOps(MonadOps$.MODULE$.whileM_$extension(package$monad$.MODULE$.catsSyntaxMonad(Applicative$.MODULE$.apply(concurrent).unit()), package$flatMap$.MODULE$.toFlatMapOps(package$functor$.MODULE$.toFunctorOps(inspectableQueue.getSize(), concurrent).map(i2 -> {
                                        return i2 != 0;
                                    }), concurrent).flatMap(obj -> {
                                        return $anonfun$udp$16(semaphore, concurrent, timer, BoxesRunTime.unboxToBoolean(obj));
                                    }), concurrent), concurrent), () -> {
                                        return fiber.cancel();
                                    }, concurrent);
                                }, concurrent).map(fiber2 -> {
                                    return new SpanExporter<F, G>(inspectableQueue) { // from class: io.janstenpickle.trace4cats.avro.AvroSpanExporter$$anon$1
                                        private final InspectableQueue queue$1;

                                        public F exportBatch(G g) {
                                            return (F) this.queue$1.enqueue1(new Batch(g));
                                        }

                                        {
                                            this.queue$1 = inspectableQueue;
                                        }
                                    };
                                }, concurrent);
                            });
                        });
                    });
                });
            });
        });
    }

    public <F, G> String udp$default$2() {
        return package$.MODULE$.agentHostname();
    }

    public <F, G> int udp$default$3() {
        return package$.MODULE$.agentPort();
    }

    public <F, G> Resource<F, SpanExporter<F, G>> tcp(ExecutionContext executionContext, String str, int i, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer, Logger<F> logger, Traverse<G> traverse) {
        return Resource$.MODULE$.liftF(AvroInstances$.MODULE$.completedSpanSchema(concurrent), concurrent).flatMap(schema -> {
            return Resource$.MODULE$.liftF(Sync$.MODULE$.apply(concurrent).delay(() -> {
                return new InetSocketAddress(str, i);
            }), concurrent).flatMap(inetSocketAddress -> {
                return Resource$.MODULE$.liftF(InspectableQueue$.MODULE$.bounded(1, concurrent), concurrent).flatMap(inspectableQueue -> {
                    return Resource$.MODULE$.liftF(Semaphore$.MODULE$.apply(Long.MAX_VALUE, concurrent), concurrent).flatMap(semaphore -> {
                        return fs2.io.tcp.SocketGroup$.MODULE$.apply(executionContext, fs2.io.tcp.SocketGroup$.MODULE$.apply$default$2(), fs2.io.tcp.SocketGroup$.MODULE$.apply$default$3(), concurrent, contextShift).flatMap(socketGroup -> {
                            return Resource$.MODULE$.make(package$concurrent$.MODULE$.toConcurrentOps(Stream$.MODULE$.compile$extension(Stream$.MODULE$.retry(write$2(schema, inetSocketAddress, semaphore, inspectableQueue, socketGroup, concurrent, traverse, contextShift, logger, str, i, timer), new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds(), finiteDuration -> {
                                return finiteDuration.$plus(new package.DurationInt(package$.MODULE$.DurationInt(1)).second());
                            }, Integer.MAX_VALUE, Stream$.MODULE$.retry$default$5(), timer, RaiseThrowable$.MODULE$.fromApplicativeError(concurrent)), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain(), concurrent).start(), fiber -> {
                                return FlatMapOps$.MODULE$.$greater$greater$extension(package$flatMap$.MODULE$.catsSyntaxFlatMapOps(MonadOps$.MODULE$.whileM_$extension(package$monad$.MODULE$.catsSyntaxMonad(Timer$.MODULE$.apply(timer).sleep(new package.DurationInt(package$.MODULE$.DurationInt(50)).millis())), package$flatMap$.MODULE$.toFlatMapOps(package$functor$.MODULE$.toFunctorOps(inspectableQueue.getSize(), concurrent).map(i2 -> {
                                    return i2 != 0;
                                }), concurrent).flatMap(obj -> {
                                    return $anonfun$tcp$19(semaphore, concurrent, BoxesRunTime.unboxToBoolean(obj));
                                }), concurrent), concurrent), () -> {
                                    return fiber.cancel();
                                }, concurrent);
                            }, concurrent).map(fiber2 -> {
                                return new SpanExporter<F, G>(inspectableQueue) { // from class: io.janstenpickle.trace4cats.avro.AvroSpanExporter$$anon$2
                                    private final InspectableQueue queue$3;

                                    public F exportBatch(G g) {
                                        return (F) this.queue$3.enqueue1(new Batch(g));
                                    }

                                    {
                                        this.queue$3 = inspectableQueue;
                                    }
                                };
                            }, concurrent);
                        });
                    });
                });
            });
        });
    }

    public <F, G> String tcp$default$2() {
        return package$.MODULE$.agentHostname();
    }

    public <F, G> int tcp$default$3() {
        return package$.MODULE$.agentPort();
    }

    public static final /* synthetic */ Object $anonfun$udp$1(Semaphore semaphore, Concurrent concurrent, Traverse traverse, Schema schema, Socket socket, InetSocketAddress inetSocketAddress, Object obj) {
        return package$flatMap$.MODULE$.toFlatMapOps(semaphore.acquire(), concurrent).flatMap(boxedUnit -> {
            return package$functor$.MODULE$.toFunctorOps(package$traverse$.MODULE$.toTraverseOps(obj, traverse).traverse(completedSpan -> {
                return package$flatMap$.MODULE$.toFlatMapOps(MODULE$.encode(schema, completedSpan, concurrent), concurrent).flatMap(bArr -> {
                    return socket.write(new Packet(inetSocketAddress, Chunk$.MODULE$.bytes(bArr)), socket.write$default$2());
                });
            }, concurrent), concurrent).map(obj2 -> {
                BoxedUnit.UNIT;
                return BoxedUnit.UNIT;
            });
        });
    }

    private static final Object write$1(Schema schema, InetSocketAddress inetSocketAddress, Semaphore semaphore, Queue queue, Socket socket, Concurrent concurrent, Traverse traverse) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.repeatEval(BracketOps$.MODULE$.guarantee$extension(package$bracket$.MODULE$.catsEffectSyntaxBracket(package$flatMap$.MODULE$.toFlatMapOps(queue.dequeue1(), concurrent).flatMap(obj -> {
            return $anonfun$udp$1(semaphore, concurrent, traverse, schema, socket, inetSocketAddress, ((Batch) obj).spans());
        }), concurrent), semaphore.release(), concurrent)), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    public static final /* synthetic */ boolean $anonfun$udp$19(boolean z, boolean z2, BoxedUnit boxedUnit) {
        return z || z2;
    }

    public static final /* synthetic */ Object $anonfun$udp$18(Timer timer, Concurrent concurrent, boolean z, boolean z2) {
        return package$functor$.MODULE$.toFunctorOps(Timer$.MODULE$.apply(timer).sleep(new package.DurationInt(package$.MODULE$.DurationInt(50)).millis()), concurrent).map(boxedUnit -> {
            return BoxesRunTime.boxToBoolean($anonfun$udp$19(z, z2, boxedUnit));
        });
    }

    public static final /* synthetic */ Object $anonfun$udp$16(Semaphore semaphore, Concurrent concurrent, Timer timer, boolean z) {
        return package$flatMap$.MODULE$.toFlatMapOps(package$functor$.MODULE$.toFunctorOps(semaphore.count(), concurrent).map(j -> {
            return j == 0;
        }), concurrent).flatMap(obj -> {
            return $anonfun$udp$18(timer, concurrent, z, BoxesRunTime.unboxToBoolean(obj));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$tcp$3(SocketGroup socketGroup, InetSocketAddress inetSocketAddress, Timer timer, Concurrent concurrent, ContextShift contextShift, Logger logger, String str, int i) {
        return Stream$.MODULE$.delayBy$extension(connect$1(socketGroup, inetSocketAddress, concurrent, contextShift, logger, str, i, timer), new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds(), timer);
    }

    public static final /* synthetic */ FreeC $anonfun$tcp$1(Logger logger, String str, int i, SocketGroup socketGroup, InetSocketAddress inetSocketAddress, Timer timer, Concurrent concurrent, ContextShift contextShift, Throwable th) {
        if (th instanceof ConnectException) {
            return Stream$.MODULE$.$greater$greater$extension(Stream$.MODULE$.eval(Logger$.MODULE$.apply(logger).warn(() -> {
                return new StringBuilder(44).append("Failed to connect to tcp://").append(str).append(":").append(i).append(", retrying in 5s").toString();
            })), () -> {
                return new Stream($anonfun$tcp$3(socketGroup, inetSocketAddress, timer, concurrent, contextShift, logger, str, i));
            });
        }
        throw new MatchError(th);
    }

    private static final FreeC connect$1(SocketGroup socketGroup, InetSocketAddress inetSocketAddress, Concurrent concurrent, ContextShift contextShift, Logger logger, String str, int i, Timer timer) {
        return Stream$.MODULE$.handleErrorWith$extension(Stream$.MODULE$.resource(socketGroup.client(inetSocketAddress, socketGroup.client$default$2(), socketGroup.client$default$3(), socketGroup.client$default$4(), socketGroup.client$default$5(), socketGroup.client$default$6(), socketGroup.client$default$7(), concurrent, contextShift)), th -> {
            return new Stream($anonfun$tcp$1(logger, str, i, socketGroup, inetSocketAddress, timer, concurrent, contextShift, th));
        });
    }

    public static final /* synthetic */ Object $anonfun$tcp$5(Semaphore semaphore, Concurrent concurrent, Traverse traverse, Schema schema, fs2.io.tcp.Socket socket, Object obj) {
        return package$flatMap$.MODULE$.toFlatMapOps(semaphore.acquire(), concurrent).flatMap(boxedUnit -> {
            return package$functor$.MODULE$.toFunctorOps(package$traverse$.MODULE$.toTraverseOps(obj, traverse).traverse(completedSpan -> {
                return package$flatMap$.MODULE$.toFlatMapOps(MODULE$.encode(schema, completedSpan, concurrent), concurrent).flatMap(bArr -> {
                    return socket.write(Chunk$.MODULE$.bytes((byte[]) ArrayOps$.MODULE$.$plus$plus$extension(Predef$.MODULE$.byteArrayOps(bArr), new byte[]{Predef$.MODULE$.int2Integer(196).byteValue(), Predef$.MODULE$.int2Integer(2).byteValue()}, ClassTag$.MODULE$.Byte())), socket.write$default$2());
                });
            }, concurrent), concurrent).map(obj2 -> {
                BoxedUnit.UNIT;
                return BoxedUnit.UNIT;
            });
        });
    }

    public static final /* synthetic */ FreeC $anonfun$tcp$4(Queue queue, Concurrent concurrent, Semaphore semaphore, Traverse traverse, Schema schema, fs2.io.tcp.Socket socket) {
        return Stream$.MODULE$.repeatEval(BracketOps$.MODULE$.guarantee$extension(package$bracket$.MODULE$.catsEffectSyntaxBracket(package$flatMap$.MODULE$.toFlatMapOps(queue.dequeue1(), concurrent).flatMap(obj -> {
            return $anonfun$tcp$5(semaphore, concurrent, traverse, schema, socket, ((Batch) obj).spans());
        }), concurrent), semaphore.release(), concurrent));
    }

    private static final Object write$2(Schema schema, InetSocketAddress inetSocketAddress, Semaphore semaphore, Queue queue, SocketGroup socketGroup, Concurrent concurrent, Traverse traverse, ContextShift contextShift, Logger logger, String str, int i, Timer timer) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.flatMap$extension(connect$1(socketGroup, inetSocketAddress, concurrent, contextShift, logger, str, i, timer), socket -> {
            return new Stream($anonfun$tcp$4(queue, concurrent, semaphore, traverse, schema, socket));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    public static final /* synthetic */ boolean $anonfun$tcp$21(boolean z, boolean z2) {
        return z || z2;
    }

    public static final /* synthetic */ Object $anonfun$tcp$19(Semaphore semaphore, Concurrent concurrent, boolean z) {
        return package$functor$.MODULE$.toFunctorOps(package$functor$.MODULE$.toFunctorOps(semaphore.count(), concurrent).map(j -> {
            return j == 0;
        }), concurrent).map(obj -> {
            return BoxesRunTime.boxToBoolean($anonfun$tcp$21(z, BoxesRunTime.unboxToBoolean(obj)));
        });
    }

    private AvroSpanExporter$() {
    }
}
