package reactor.kafka.sender.internals;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:reactor/kafka/sender/internals/DefaultKafkaOutbound.class */
class DefaultKafkaOutbound<K, V> implements KafkaOutbound<K, V> {
    final DefaultKafkaSender<K, V> sender;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultKafkaOutbound(DefaultKafkaSender<K, V> defaultKafkaSender) {
        this.sender = defaultKafkaSender;
    }

    @Override // reactor.kafka.sender.KafkaOutbound
    public KafkaOutbound<K, V> send(Publisher<? extends ProducerRecord<K, V>> publisher) {
        return then(this.sender.doSend(publisher).then());
    }

    @Override // reactor.kafka.sender.KafkaOutbound
    public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> publisher) {
        return then(Flux.from(publisher).publishOn(this.sender.senderOptions.scheduler()).concatMapDelayError(this::transaction, false, 1));
    }

    private Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> publisher) {
        TransactionManager transactionManager = this.sender.transactionManager();
        return transactionManager.begin().thenMany(this.sender.doSend(publisher)).concatWith(transactionManager.commit()).onErrorResume(th -> {
            return transactionManager.abort().then(Mono.error(th));
        }).publishOn(this.sender.senderOptions.scheduler()).then();
    }

    @Override // reactor.kafka.sender.KafkaOutbound
    public KafkaOutbound<K, V> then(Publisher<Void> publisher) {
        return new KafkaOutboundThen(this.sender, this, publisher);
    }

    @Override // reactor.kafka.sender.KafkaOutbound
    public Mono<Void> then() {
        return Mono.empty();
    }
}
