/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.rabbitmq.AloRabbitMQMessageFactory;
import io.atleon.rabbitmq.BodyDeserializer;
import io.atleon.rabbitmq.DefaultAloRabbitMQMessageFactory;
import io.atleon.rabbitmq.RabbitMQConfig;
import io.atleon.rabbitmq.RabbitMQConfigSource;
import io.atleon.rabbitmq.RabbitMQMessage;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

public class AloRabbitMQReceiver<T> {
    public static final String CONFIG_PREFIX = "rabbitmq-receiver-";
    public static final String QOS_CONFIG = "rabbitmq-receiver-qos";
    public static final String BODY_DESERIALIZER_CONFIG = "rabbitmq-receiver-body-deserializer";
    public static final String NACK_STRATEGY_CONFIG = "rabbitmq-receiver-nack-strategy";
    public static final String ALO_FACTORY_CONFIG = "rabbitmq-receiver-alo-factory";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQReceiver.class);
    private final Mono<ReceiveResources<T>> futureResources;

    private AloRabbitMQReceiver(RabbitMQConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(ReceiveResources::fromConfig).cache();
    }

    public static <T> AloRabbitMQReceiver<T> from(RabbitMQConfigSource configSource) {
        return new AloRabbitMQReceiver<T>(configSource);
    }

    public AloFlux<T> receiveAloBodies(String queue) {
        return this.receiveAloMessages(queue).filter(message -> message.getBody() != null).map(RabbitMQMessage::getBody);
    }

    public AloFlux<RabbitMQMessage<T>> receiveAloMessages(String queue) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> this.receiveMessages((ReceiveResources<T>)resources, queue)).as(AloFlux::wrap);
    }

    private Flux<Alo<RabbitMQMessage<T>>> receiveMessages(ReceiveResources<T> resources, String queue) {
        Sinks.Empty sink = Sinks.empty();
        return resources.receive(queue, arg_0 -> ((Sinks.Empty)sink).tryEmitError(arg_0)).mergeWith((Publisher)sink.asMono());
    }

    private static final class ReceiveResources<T> {
        private final ConnectionFactory connectionFactory;
        private final int qos;
        private final BodyDeserializer<T> bodyDeserializer;
        private final NackStrategy nackStrategy;
        private final AloRabbitMQMessageFactory<T> messageFactory;

        private ReceiveResources(ConnectionFactory connectionFactory, int qos, BodyDeserializer<T> bodyDeserializer, NackStrategy nackStrategy, AloRabbitMQMessageFactory<T> messageFactory) {
            this.connectionFactory = connectionFactory;
            this.qos = qos;
            this.bodyDeserializer = bodyDeserializer;
            this.nackStrategy = nackStrategy;
            this.messageFactory = messageFactory;
        }

        public static <T> ReceiveResources<T> fromConfig(RabbitMQConfig config) {
            AloRabbitMQMessageFactory messageFactory = config.loadConfigured(AloRabbitMQReceiver.ALO_FACTORY_CONFIG).orElseGet(DefaultAloRabbitMQMessageFactory::new);
            return new ReceiveResources<T>(config.getConnectionFactory(), config.load(AloRabbitMQReceiver.QOS_CONFIG, Integer::parseInt).orElse(256), (BodyDeserializer)config.loadConfiguredOrThrow(AloRabbitMQReceiver.BODY_DESERIALIZER_CONFIG), config.load(AloRabbitMQReceiver.NACK_STRATEGY_CONFIG, NackStrategy::valueOf).orElse(NackStrategy.EMIT), messageFactory);
        }

        public Flux<Alo<RabbitMQMessage<T>>> receive(String queue, Consumer<? super Throwable> errorEmitter) {
            ReceiverOptions receiverOptions = new ReceiverOptions();
            receiverOptions.connectionFactory(this.connectionFactory);
            ConsumeOptions consumeOptions = new ConsumeOptions();
            consumeOptions.qos(this.qos);
            return new Receiver(receiverOptions).consumeManualAck(queue, consumeOptions).map(delivery -> this.deserialize((AcknowledgableDelivery)delivery, errorEmitter));
        }

        private Alo<RabbitMQMessage<T>> deserialize(AcknowledgableDelivery delivery, Consumer<? super Throwable> errorEmitter) {
            RabbitMQMessage<T> rabbitMessage = new RabbitMQMessage<T>(delivery.getEnvelope().getExchange(), delivery.getEnvelope().getRoutingKey(), delivery.getProperties(), this.bodyDeserializer.deserialize(delivery.getBody()));
            Runnable acknowledger = () -> this.ack(delivery, errorEmitter);
            Consumer<Throwable> nacknowledger = error -> this.nack(delivery, errorEmitter, (Throwable)error);
            return this.messageFactory.create(rabbitMessage, acknowledger, nacknowledger);
        }

        private void ack(AcknowledgableDelivery delivery, Consumer<? super Throwable> errorEmitter) {
            try {
                delivery.ack(false);
            }
            catch (Throwable error) {
                LOGGER.error("Failed to ack", error);
                errorEmitter.accept(error);
            }
        }

        private void nack(AcknowledgableDelivery delivery, Consumer<? super Throwable> errorEmitter, Throwable error) {
            if (this.nackStrategy == NackStrategy.EMIT) {
                errorEmitter.accept(error);
            } else {
                try {
                    delivery.nack(false, this.nackStrategy == NackStrategy.REQUEUE);
                }
                catch (Throwable fatalError) {
                    LOGGER.error("Failed to nack", fatalError);
                    fatalError.addSuppressed(error);
                    errorEmitter.accept(fatalError);
                }
            }
        }
    }

    public static enum NackStrategy {
        EMIT,
        REQUEUE,
        DISCARD;

    }
}

