package net.dreamlu.mica.nats.core;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import java.io.IOException;
import net.dreamlu.mica.core.utils.Exceptions;
import net.dreamlu.mica.core.utils.StringUtil;
import net.dreamlu.mica.nats.annotation.NatsStreamListener;
import net.dreamlu.mica.nats.config.NatsStreamCustomizer;
import net.dreamlu.mica.nats.config.NatsStreamProperties;
import net.dreamlu.mica.nats.utils.StreamConfigurationUtil;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:net/dreamlu/mica/nats/core/NatsStreamListenerDetector.class */
public class NatsStreamListenerDetector implements BeanPostProcessor {
    private final NatsStreamProperties properties;
    private final ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider;
    private final Connection natsConnection;
    private final JetStream jetStream;

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        ReflectionUtils.doWithMethods(ClassUtils.getUserClass(obj), method -> {
            NatsStreamListener natsStreamListener = (NatsStreamListener) AnnotationUtils.findAnnotation(method, NatsStreamListener.class);
            if (natsStreamListener != null) {
                Assert.hasText(natsStreamListener.value(), "@NatsStreamListener value(subject) must not be empty.");
                try {
                    jetStreamSubscribe(natsStreamListener, new DefaultMessageHandler(obj, method));
                } catch (JetStreamApiException | IOException e) {
                    throw Exceptions.unchecked(e);
                }
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        return obj;
    }

    private void jetStreamSubscribe(NatsStreamListener natsStreamListener, MessageHandler messageHandler) throws JetStreamApiException, IOException {
        String value = natsStreamListener.value();
        Dispatcher createDispatcher = this.natsConnection.createDispatcher(messageHandler);
        long pendingMessageLimit = natsStreamListener.pendingMessageLimit();
        long pendingByteLimit = natsStreamListener.pendingByteLimit();
        createDispatcher.setPendingLimits(pendingMessageLimit, pendingByteLimit);
        PushSubscribeOptions.Builder ordered = PushSubscribeOptions.builder().pendingMessageLimit(pendingMessageLimit).pendingByteLimit(pendingByteLimit).ordered(natsStreamListener.ordered());
        String stream = natsStreamListener.stream();
        String name = this.properties.getName();
        if (StringUtils.hasText(stream) && !name.equals(stream)) {
            ordered.stream(stream);
            this.natsConnection.jetStreamManagement().addStream(StreamConfigurationUtil.from(stream, value, this.properties, this.natsStreamCustomizerObjectProvider));
        }
        ordered.configuration(ConsumerConfiguration.builder().durable(this.properties.getConsumerName() + '-' + StringUtil.getUUID()).deliverGroup(this.properties.getConsumerGroup()).deliverPolicy(this.properties.getConsumerPolicy()).build());
        String queue = natsStreamListener.queue();
        boolean autoAck = natsStreamListener.autoAck();
        if (StringUtils.hasText(queue)) {
            this.jetStream.subscribe(value, queue, createDispatcher, messageHandler, autoAck, ordered.build());
        } else {
            this.jetStream.subscribe(value, createDispatcher, messageHandler, autoAck, ordered.build());
        }
    }

    public NatsStreamListenerDetector(NatsStreamProperties natsStreamProperties, ObjectProvider<NatsStreamCustomizer> objectProvider, Connection connection, JetStream jetStream) {
        this.properties = natsStreamProperties;
        this.natsStreamCustomizerObjectProvider = objectProvider;
        this.natsConnection = connection;
        this.jetStream = jetStream;
    }
}
