package org.kie.kogito.addon.quarkus.messaging.common;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.providers.PublisherDecorator;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/addon/quarkus/messaging/common/BackpressurePublisherDecorator.class */
public class BackpressurePublisherDecorator implements PublisherDecorator {
    private static final Logger logger = LoggerFactory.getLogger(BackpressurePublisherDecorator.class);

    @Inject
    BackpressureKogitoEmitter emitter;

    /* loaded from: input_file:org/kie/kogito/addon/quarkus/messaging/common/BackpressurePublisherDecorator$BackpressureOperator.class */
    private class BackpressureOperator extends AbstractMultiOperator<Message<?>, Message<?>> {
        private String channelName;

        public BackpressureOperator(Multi<? extends Message<?>> multi, String str) {
            super(multi);
            this.channelName = str;
        }

        public void subscribe(MultiSubscriber<? super Message<?>> multiSubscriber) {
            this.upstream.subscribe().withSubscriber(new BackpressureProcessor(multiSubscriber, this.channelName));
        }
    }

    /* loaded from: input_file:org/kie/kogito/addon/quarkus/messaging/common/BackpressurePublisherDecorator$BackpressureProcessor.class */
    private class BackpressureProcessor extends MultiOperatorProcessor<Message<?>, Message<?>> {
        private String channelName;

        public BackpressureProcessor(MultiSubscriber<? super Message<?>> multiSubscriber, String str) {
            super(multiSubscriber);
            this.channelName = str;
            BackpressurePublisherDecorator.this.emitter.registerHandler(str, () -> {
                super.request(1L);
            });
        }

        public void request(long j) {
            if (!BackpressurePublisherDecorator.this.emitter.isEnabled(this.channelName)) {
                BackpressurePublisherDecorator.logger.trace("Blocking {} elements", Long.valueOf(j));
            } else {
                BackpressurePublisherDecorator.logger.trace("Requesting {} elements", Long.valueOf(j));
                super.request(j);
            }
        }
    }

    public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> multi, String str) {
        return multi.plug(multi2 -> {
            return new BackpressureOperator(multi, str);
        });
    }
}
