/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.rabbitmq;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.rabbitmq.BodySerializer;
import io.atleon.rabbitmq.RabbitMQConfig;
import io.atleon.rabbitmq.RabbitMQConfigSource;
import io.atleon.rabbitmq.RabbitMQMessage;
import io.atleon.rabbitmq.RabbitMQMessageCreator;
import io.atleon.rabbitmq.RabbitMQMessageSendInterceptor;
import io.atleon.rabbitmq.RabbitMQSenderResult;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.CorrelableOutboundMessage;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

public class AloRabbitMQSender<T> {
    public static final String CONFIG_PREFIX = "rabbitmq-sender-";
    public static final String INTERCEPTORS_CONFIG = "rabbitmq-sender-send-interceptors";
    public static final String BODY_SERIALIZER_CONFIG = "rabbitmq-sender-body-serializer";
    private static final SendOptions SEND_OPTIONS = new SendOptions();
    private static final SendOptions ALO_SEND_OPTIONS = new SendOptions().exceptionHandler(AloRabbitMQSender::handleAloSendException);
    private final Mono<SendResources<T>> futureResources;

    private AloRabbitMQSender(RabbitMQConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(SendResources::fromConfig).cache();
    }

    public static <T> AloRabbitMQSender<T> from(RabbitMQConfigSource configSource) {
        return new AloRabbitMQSender<T>(configSource);
    }

    public Function<Publisher<T>, Flux<RabbitMQSenderResult<T>>> sendBodies(RabbitMQMessageCreator<T> messageCreator) {
        return bodies -> this.sendBodies((Publisher<T>)bodies, messageCreator);
    }

    public Flux<RabbitMQSenderResult<T>> sendBodies(Publisher<T> bodies, RabbitMQMessageCreator<T> messageCreator) {
        return this.futureResources.flatMapMany(resources -> this.sendBodies((SendResources<T>)resources, bodies, messageCreator));
    }

    public Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(Publisher<RabbitMQMessage<T>> messages) {
        return this.futureResources.flatMapMany(resources -> this.sendMessages((SendResources<T>)resources, messages));
    }

    public Function<Publisher<Alo<T>>, AloFlux<RabbitMQSenderResult<T>>> sendAloBodies(RabbitMQMessageCreator<T> messageCreator) {
        return aloBodies -> this.sendAloBodies((Publisher<Alo<T>>)aloBodies, messageCreator);
    }

    public AloFlux<RabbitMQSenderResult<T>> sendAloBodies(Publisher<Alo<T>> aloBodies, RabbitMQMessageCreator<T> messageCreator) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> this.sendAloBodies((SendResources<T>)resources, aloBodies, messageCreator)).as(AloFlux::wrap);
    }

    public AloFlux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendAloMessages(Publisher<Alo<RabbitMQMessage<T>>> aloMessages) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> this.sendAloMessages((SendResources<T>)resources, aloMessages)).as(AloFlux::wrap);
    }

    private Flux<RabbitMQSenderResult<T>> sendBodies(SendResources<T> resources, Publisher<T> bodies, RabbitMQMessageCreator<T> messageCreator) {
        return Flux.from(bodies).map(body -> resources.createOutboundMessage((RabbitMQMessage)messageCreator.apply(body), body)).transform(obMessages -> resources.getSender().sendWithTypedPublishConfirms((Publisher)obMessages, SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResult);
    }

    private Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(SendResources<T> resources, Publisher<RabbitMQMessage<T>> messages) {
        return Flux.from(messages).map(message -> resources.createOutboundMessage((RabbitMQMessage)message, message)).transform(obMessages -> resources.getSender().sendWithTypedPublishConfirms((Publisher)obMessages, SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResult);
    }

    private Flux<Alo<RabbitMQSenderResult<T>>> sendAloBodies(SendResources<T> resources, Publisher<Alo<T>> aloBodies, RabbitMQMessageCreator<T> messageCreator) {
        return AloFlux.toFlux(aloBodies).map(aloBody -> resources.createOutboundMessage((RabbitMQMessage)messageCreator.apply(aloBody.get()), aloBody)).transform(obMessages -> resources.getSender().sendWithTypedPublishConfirms((Publisher)obMessages, ALO_SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResultOfAlo);
    }

    private Flux<Alo<RabbitMQSenderResult<RabbitMQMessage<T>>>> sendAloMessages(SendResources<T> resources, Publisher<Alo<RabbitMQMessage<T>>> aloMessages) {
        return AloFlux.toFlux(aloMessages).map(aloMessage -> resources.createOutboundMessage((RabbitMQMessage)aloMessage.get(), aloMessage)).transform(obMessages -> resources.getSender().sendWithTypedPublishConfirms((Publisher)obMessages, ALO_SEND_OPTIONS)).map(RabbitMQSenderResult::fromMessageResultOfAlo);
    }

    private static void handleAloSendException(Sender.SendContext sendContext, Exception error) {
        CorrelableOutboundMessage message = (CorrelableOutboundMessage)CorrelableOutboundMessage.class.cast(sendContext.getMessage());
        Alo.nacknowledge((Alo)((Alo)Alo.class.cast(message.getCorrelationMetadata())), (Throwable)error);
    }

    private static final class SendResources<T> {
        private final Sender sender;
        private final List<RabbitMQMessageSendInterceptor<T>> interceptors;
        private final BodySerializer<T> bodySerializer;

        public SendResources(Sender sender, List<RabbitMQMessageSendInterceptor<T>> interceptors, BodySerializer<T> bodySerializer) {
            this.sender = sender;
            this.bodySerializer = bodySerializer;
            this.interceptors = interceptors;
        }

        public static <T> SendResources<T> fromConfig(RabbitMQConfig config) {
            return new SendResources<T>(new Sender(new SenderOptions().connectionFactory(config.getConnectionFactory())), config.loadListOfConfigured(AloRabbitMQSender.INTERCEPTORS_CONFIG), (BodySerializer)config.loadConfiguredOrThrow(AloRabbitMQSender.BODY_SERIALIZER_CONFIG));
        }

        public <C> CorrelableOutboundMessage<C> createOutboundMessage(RabbitMQMessage<T> message, C correlationMetadata) {
            for (RabbitMQMessageSendInterceptor<T> interceptor : this.interceptors) {
                message = interceptor.onSend(message);
            }
            return new CorrelableOutboundMessage(message.getExchange(), message.getRoutingKey(), message.getProperties(), this.bodySerializer.serialize(message.getBody()), correlationMetadata);
        }

        public Sender getSender() {
            return this.sender;
        }
    }
}

