package com.sap.cds.feature.messaging.em.mt.service;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.sap.cds.feature.messaging.em.client.EnterpriseMessagingWebhookManagementClient;
import com.sap.cds.feature.messaging.em.mt.SubdomainUtils;
import com.sap.cds.feature.messaging.em.mt.service.EnterpriseMessagingTenantStatus;
import com.sap.cds.feature.messaging.em.mt.webhook.EnterpriseMessagingWebhookAdapterFactory;
import com.sap.cds.feature.messaging.em.service.EnterpriseMessagingService;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.environment.ServiceBinding;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.request.RequestContext;
import com.sap.cds.services.request.UserInfo;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/feature/messaging/em/mt/service/EnterpriseMessagingMtService.class */
public class EnterpriseMessagingMtService extends EnterpriseMessagingService {
    private static final Logger logger = LoggerFactory.getLogger(EnterpriseMessagingMtService.class);
    private final MessagingBrokerQueueListener queueListener;
    private final String webhookUrl;
    private final EnterpriseMessagingWebhookManagementClient webhookManagementClient;

    public EnterpriseMessagingMtService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, ServiceBinding serviceBinding, CdsRuntime cdsRuntime) {
        super(messagingServiceConfig, serviceBinding, null, cdsRuntime);
        this.queueListener = new MessagingBrokerQueueListener(this, toFullyQualifiedQueueName(this.queue), this.queue, cdsRuntime);
        String url = cdsRuntime.getEnvironment().getCdsProperties().getMessaging().getWebhooks().getUrl();
        url = url == null ? cdsRuntime.getEnvironment().getApplicationInfo().getUrl() : url;
        if (url == null) {
            throw new ErrorStatusException(CdsErrorStatuses.NO_WEBHOOK_URL, new Object[0]);
        }
        this.webhookUrl = "https://" + url + "/" + EnterpriseMessagingWebhookAdapterFactory.URL_PATH;
        this.webhookManagementClient = new EnterpriseMessagingWebhookManagementClient(serviceBinding, messagingServiceConfig.getConnection().getConnectionPool());
    }

    public void init(String str) {
        String checkSubdomain = checkSubdomain(str);
        try {
            try {
                logger.info("Initializing the enterprise-messaging service '{}' for tenant '{}'", getName(), str);
                if (checkTenantReadiness(str, false) && createOrUpdateQueuesAndSubscriptions()) {
                    this.webhookManagementClient.createOrUpdateWebhookRegistration(getName(), toFullyQualifiedQueueName(this.queue), this.webhookUrl, checkSubdomain);
                }
                logger.debug("Finished initializing the enterprise-messaging service '{}' for tenant '{}'", getName(), str);
            } catch (IOException e) {
                logger.error("Failed to initialize the enterprise-messaging service '{}' for tenant '{}'", new Object[]{getName(), str, e});
                logger.debug("Finished initializing the enterprise-messaging service '{}' for tenant '{}'", getName(), str);
            }
        } catch (Throwable th) {
            logger.debug("Finished initializing the enterprise-messaging service '{}' for tenant '{}'", getName(), str);
            throw th;
        }
    }

    public EnterpriseMessagingTenantStatus getTenantStatus(String str, boolean z) {
        EnterpriseMessagingTenantStatus enterpriseMessagingTenantStatus = new EnterpriseMessagingTenantStatus(str);
        EnterpriseMessagingTenantStatus.QueueStatus queueStatus = new EnterpriseMessagingTenantStatus.QueueStatus();
        String fullyQualifiedQueueName = toFullyQualifiedQueueName(this.queue);
        enterpriseMessagingTenantStatus.getServices().put(getName(), queueStatus);
        try {
            if (checkTenantReadiness(str, true)) {
                this.managementClient.getQueues().forEach(jsonNode -> {
                    JsonNode asText = jsonNode.get("name").asText();
                    if (!asText.equals(fullyQualifiedQueueName)) {
                        enterpriseMessagingTenantStatus.getUnmanagedQueues().add(z ? jsonNode : asText);
                        return;
                    }
                    queueStatus.setQueue(z ? jsonNode : asText);
                    try {
                        ArrayNode queueSubscriptions = this.managementClient.getQueueSubscriptions(asText);
                        Set set = (Set) this.queue.getTopics().stream().map(messageTopic -> {
                            return messageTopic.getBrokerName();
                        }).collect(Collectors.toSet());
                        if (queueSubscriptions != null && queueSubscriptions.size() > 0) {
                            queueSubscriptions.forEach(jsonNode -> {
                                String asText2 = jsonNode.get("topicPattern").asText();
                                queueStatus.getTopics().add(asText2);
                                if (set.remove(asText2)) {
                                    return;
                                }
                                queueStatus.getUnmanagedTopics().add(asText2);
                            });
                            if (!set.isEmpty()) {
                                queueStatus.setError("Missing topic subscriptions: " + ((String) set.stream().collect(Collectors.joining(", "))));
                            }
                            if (!queueStatus.getUnmanagedTopics().isEmpty()) {
                                queueStatus.setWarning("There are unmanaged topics subscribed. This could potentially lead to unexpected messages received by the queue.");
                            }
                        } else if (!set.isEmpty()) {
                            queueStatus.getUnsubscribedTopics().addAll(set);
                            queueStatus.setError("Missing topic subscriptions: " + ((String) set.stream().collect(Collectors.joining(", "))));
                        }
                        this.webhookManagementClient.getRegisteredWebhooks().forEach(jsonNode2 -> {
                            if (jsonNode2.get("address").asText().endsWith(asText)) {
                                queueStatus.getWebhooks().add(z ? jsonNode2 : jsonNode2.get("name").asText());
                            } else {
                                enterpriseMessagingTenantStatus.getUnmanagedWebhooks().add(z ? jsonNode2 : jsonNode2.get("name").asText());
                            }
                        });
                        if (queueStatus.getWebhooks().isEmpty() && !queueStatus.getTopics().isEmpty()) {
                            queueStatus.setError("No webhook registration available.");
                        }
                    } catch (IOException e) {
                        logger.debug("Could not retrieve status of tenant '{}' for queue '{}' of service '{}'", new Object[]{str, asText, getName(), e});
                    }
                });
                if (queueStatus.getQueue() == null) {
                    queueStatus.setInitial();
                }
            } else {
                queueStatus.setOnboarding();
            }
        } catch (IOException e) {
            queueStatus.setError("The status of the tenant could not be retrieved.");
            logger.debug("The status of the tenant '{}' on service '{}' could not be retrieved.", new Object[]{str, getName(), e});
        }
        return enterpriseMessagingTenantStatus;
    }

    private boolean checkTenantReadiness(String str, boolean z) throws IOException {
        while (!this.managementClient.checkTenantInstanceReadiness()) {
            logger.info("The messaging bus for service '{}' is not yet ready for the tenant '{}'", getName(), str);
            if (z) {
                return false;
            }
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        logger.debug("The messaging bus for service '{}' is ready for the tenant '{}'", getName(), str);
        return true;
    }

    private String checkSubdomain(String str) {
        UserInfo userInfo = RequestContext.getCurrent(this.runtime).getUserInfo();
        if (!Objects.equals(userInfo.getTenant(), str) || StringUtils.isEmpty((String) userInfo.getAdditionalAttribute(SubdomainUtils.USER_INFO_SUBDOMAIN))) {
            throw new ErrorStatusException(CdsErrorStatuses.NO_TENANT_INFO, new Object[0]);
        }
        return (String) userInfo.getAdditionalAttribute(SubdomainUtils.USER_INFO_SUBDOMAIN);
    }

    public MessagingBrokerQueueListener getQueueListener() {
        return this.queueListener;
    }

    @Override // com.sap.cds.feature.messaging.em.service.EnterpriseMessagingService
    public void init() {
    }

    @Override // com.sap.cds.feature.messaging.em.service.EnterpriseMessagingService
    protected void registerQueueListener(String str, MessagingBrokerQueueListener messagingBrokerQueueListener) throws IOException {
    }

    @Override // com.sap.cds.feature.messaging.em.service.EnterpriseMessagingService
    protected void emitTopicMessage(String str, TopicMessageEventContext topicMessageEventContext) {
        try {
            this.webhookManagementClient.sendMessage(str, topicMessageEventContext.getData());
        } catch (IOException e) {
            throw new ErrorStatusException(CdsErrorStatuses.EVENT_EMITTING_FAILED, new Object[]{str, e});
        }
    }
}
