package org.eclipse.dataspaceconnector.events.azure;

import com.azure.core.util.BinaryData;
import com.azure.messaging.eventgrid.EventGridEvent;
import com.azure.messaging.eventgrid.EventGridPublisherAsyncClient;
import org.eclipse.dataspaceconnector.events.azure.TransferProcessDto;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.transfer.observe.TransferProcessListener;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcess;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcessStates;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/eclipse/dataspaceconnector/events/azure/AzureEventGridPublisher.class */
class AzureEventGridPublisher implements TransferProcessListener {
    private final Monitor monitor;
    private final EventGridPublisherAsyncClient<EventGridEvent> client;
    private final String eventTypeTransferprocess = "dataspaceconnector/transfer/transferprocess";
    private final String eventTypeMetadata = "dataspaceconnector/metadata/store";
    private final String connectorId;

    /* loaded from: input_file:org/eclipse/dataspaceconnector/events/azure/AzureEventGridPublisher$LoggingSubscriber.class */
    private class LoggingSubscriber<T> extends BaseSubscriber<T> {
        private final String message;

        LoggingSubscriber(String str) {
            this.message = str;
        }

        protected void hookOnComplete() {
            AzureEventGridPublisher.this.monitor.debug("AzureEventGrid: " + this.message, new Throwable[0]);
        }

        protected void hookOnError(@NotNull Throwable th) {
            AzureEventGridPublisher.this.monitor.severe("Error during event publishing", new Throwable[]{th});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AzureEventGridPublisher(String str, Monitor monitor, EventGridPublisherAsyncClient<EventGridEvent> eventGridPublisherAsyncClient) {
        this.connectorId = str;
        this.monitor = monitor;
        this.client = eventGridPublisherAsyncClient;
    }

    public void preCreated(TransferProcess transferProcess) {
        TransferProcessDto createTransferProcessDto = createTransferProcessDto(transferProcess);
        if (transferProcess.getType() == TransferProcess.Type.CONSUMER) {
            sendEvent("createdConsumer", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto).subscribe(new LoggingSubscriber("Transfer process created"));
        } else {
            sendEvent("createdProvider", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto).subscribe(new LoggingSubscriber("Transfer process created"));
        }
    }

    public void preCompleted(TransferProcess transferProcess) {
        sendEvent("completed", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto(transferProcess)).subscribe(new LoggingSubscriber("Transfer process completed"));
    }

    public void preDeprovisioned(TransferProcess transferProcess) {
        sendEvent("deprovisioned", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto(transferProcess)).subscribe(new LoggingSubscriber("Transfer process resources deprovisioned"));
    }

    public void preEnded(TransferProcess transferProcess) {
        sendEvent("ended", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto(transferProcess)).subscribe(new LoggingSubscriber("Transfer process ended"));
    }

    public void preError(TransferProcess transferProcess) {
        sendEvent("error", "dataspaceconnector/transfer/transferprocess", createTransferProcessDto(transferProcess)).subscribe(new LoggingSubscriber("Transfer process errored!"));
    }

    private Mono<Void> sendEvent(String str, String str2, Object obj) {
        return this.client.sendEvent(new EventGridEvent(str, str2, BinaryData.fromObject(obj), "0.1"));
    }

    @NotNull
    private TransferProcessDto createTransferProcessDto(TransferProcess transferProcess) {
        return TransferProcessDto.Builder.newInstance().connector(this.connectorId).state(TransferProcessStates.from(transferProcess.getState())).requestId(transferProcess.getDataRequest().getId()).type(transferProcess.getType()).build();
    }
}
