package reactor.kafka.sender.internals;

import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.TransactionManager;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/kafka/sender/internals/DefaultTransactionManager.class */
public class DefaultTransactionManager<K, V> implements TransactionManager {
    private final Mono<Producer<K, V>> producerMono;
    private final SenderOptions<K, V> senderOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultTransactionManager(Mono<Producer<K, V>> mono, SenderOptions<K, V> senderOptions) {
        this.producerMono = mono;
        this.senderOptions = senderOptions;
    }

    @Override // reactor.kafka.sender.TransactionManager
    public <T> Mono<T> begin() {
        return this.producerMono.flatMap(producer -> {
            return Mono.fromRunnable(() -> {
                producer.beginTransaction();
                DefaultKafkaSender.log.debug("Begin a new transaction for producer {}", this.senderOptions.transactionalId());
            });
        });
    }

    @Override // reactor.kafka.sender.TransactionManager
    public <T> Mono<T> sendOffsets(Map<TopicPartition, OffsetAndMetadata> map, String str) {
        return this.producerMono.flatMap(producer -> {
            return Mono.fromRunnable(() -> {
                if (map.isEmpty()) {
                    return;
                }
                producer.sendOffsetsToTransaction(map, str);
                DefaultKafkaSender.log.trace("Sent offsets to transaction for producer {}, offsets: {}", this.senderOptions.transactionalId(), map);
            });
        });
    }

    @Override // reactor.kafka.sender.TransactionManager
    public <T> Mono<T> commit() {
        return this.producerMono.flatMap(producer -> {
            return Mono.fromRunnable(() -> {
                producer.commitTransaction();
                DefaultKafkaSender.log.debug("Commit current transaction for producer {}", this.senderOptions.transactionalId());
            });
        });
    }

    @Override // reactor.kafka.sender.TransactionManager
    public <T> Mono<T> abort() {
        return this.producerMono.flatMap(producer -> {
            return Mono.fromRunnable(() -> {
                producer.abortTransaction();
                DefaultKafkaSender.log.debug("Abort current transaction for producer {}", this.senderOptions.transactionalId());
            });
        });
    }

    @Override // reactor.kafka.sender.TransactionManager
    public Scheduler scheduler() {
        return this.senderOptions.scheduler();
    }
}
