package reactor.kafka.sender.internals;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:reactor/kafka/sender/internals/DefaultKafkaSender.class */
public class DefaultKafkaSender<K, V> implements KafkaSender<K, V>, Sinks.EmitFailureHandler {
    static final Logger log = LoggerFactory.getLogger(DefaultKafkaSender.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet(Arrays.asList("sendOffsetsToTransaction", "partitionsFor", "metrics", "flush"));
    private final Mono<Producer<K, V>> producerMono;
    final SenderOptions<K, V> senderOptions;
    private final TransactionManager transactionManager;
    private final String producerId;
    private Producer<K, V> producerProxy;
    private final Scheduler scheduler = Schedulers.newSingle(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName(this.producerId);
        return thread;
    });
    private final AtomicBoolean hasProducer = new AtomicBoolean();

    public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> senderOptions) {
        this.producerId = (String) Optional.ofNullable(senderOptions.clientId()).filter(str -> {
            return !str.isEmpty();
        }).orElse("reactor-kafka-sender-" + System.identityHashCode(this));
        this.senderOptions = senderOptions.scheduler(senderOptions.isTransactional() ? Schedulers.newSingle(senderOptions.transactionalId()) : senderOptions.scheduler());
        boolean isTransactional = this.senderOptions.isTransactional();
        this.producerMono = (Mono) Mono.fromCallable(() -> {
            Producer<K, V> createProducer = producerFactory.createProducer(this.senderOptions);
            SenderOptions.ProducerListener producerListener = this.senderOptions.producerListener();
            if (producerListener != null) {
                producerListener.producerAdded(this.producerId, createProducer);
            }
            if (isTransactional) {
                log.info("Initializing transactions for producer {}", this.senderOptions.transactionalId());
                createProducer.initTransactions();
            }
            this.hasProducer.set(true);
            return createProducer;
        }).publishOn(this.senderOptions.isTransactional() ? this.scheduler : this.senderOptions.scheduler()).cache().as(mono -> {
            if (this.senderOptions.isTransactional()) {
                return mono.publishOn(this.senderOptions.isTransactional() ? this.scheduler : this.senderOptions.scheduler());
            }
            return mono;
        });
        if (isTransactional) {
            this.producerMono.subscribe().dispose();
        }
        this.transactionManager = isTransactional ? new DefaultTransactionManager(this.producerMono, this.senderOptions) : null;
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> publisher) {
        return doSend(publisher);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Flux<SenderResult<T>> doSend(Publisher<? extends ProducerRecord<K, V>> publisher) {
        return this.producerMono.flatMapMany(producer -> {
            return (Publisher) Flux.from(publisher).publishOn(this.scheduler).as(flux -> {
                return new FluxOperator<ProducerRecord<K, V>, SenderResult<T>>(flux) { // from class: reactor.kafka.sender.internals.DefaultKafkaSender.1
                    public void subscribe(CoreSubscriber<? super SenderResult<T>> coreSubscriber) {
                        this.source.subscribe(new SendSubscriber(DefaultKafkaSender.this.senderOptions, producer, DefaultKafkaSender.this.producerId, coreSubscriber));
                    }
                };
            });
        }).doOnError(th -> {
            log.trace("Send failed with exception", th);
        }).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight());
    }

    @Override // reactor.kafka.sender.KafkaSender
    public KafkaOutbound<K, V> createOutbound() {
        return new DefaultKafkaOutbound(this);
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> publisher) {
        Sinks.Many onBackpressureBuffer = Sinks.many().unicast().onBackpressureBuffer();
        return Flux.from(publisher).publishOn(this.senderOptions.scheduler(), false, 1).concatMapDelayError(publisher2 -> {
            return transaction(publisher2, onBackpressureBuffer);
        }, false, 1).window(onBackpressureBuffer.asFlux()).doOnTerminate(() -> {
            onBackpressureBuffer.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        }).doOnCancel(() -> {
            onBackpressureBuffer.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        });
    }

    @Override // reactor.kafka.sender.KafkaSender
    public TransactionManager transactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Transactions are not enabled");
        }
        return this.transactionManager;
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return this.producerMono.map(producer -> {
            return function.apply(producerProxy(producer));
        });
    }

    @Override // reactor.kafka.sender.KafkaSender
    public void close() {
        if (this.hasProducer.getAndSet(false)) {
            this.producerMono.doOnNext(producer -> {
                producer.close(this.senderOptions.closeTimeout());
                if (this.senderOptions.producerListener() != null) {
                    this.senderOptions.producerListener().producerRemoved(this.producerId, producer);
                }
            }).block();
            if (this.senderOptions.isTransactional()) {
                this.senderOptions.scheduler().dispose();
            }
            this.scheduler.dispose();
        }
    }

    private <T> Flux<SenderResult<T>> transaction(Publisher<? extends SenderRecord<K, V, T>> publisher, Sinks.Many<Object> many) {
        return transactionManager().begin().thenMany(send(publisher)).concatWith(transactionManager().commit()).concatWith(Mono.fromRunnable(() -> {
            many.emitNext(this, this);
        })).onErrorResume(th -> {
            return transactionManager().abort().then(Mono.error(th));
        }).publishOn(this.senderOptions.scheduler());
    }

    private synchronized Producer<K, V> producerProxy(Producer<K, V> producer) {
        if (this.producerProxy == null) {
            this.producerProxy = (Producer) Proxy.newProxyInstance(Producer.class.getClassLoader(), new Class[]{Producer.class}, (obj, method, objArr) -> {
                if (!DELEGATE_METHODS.contains(method.getName())) {
                    throw new UnsupportedOperationException("Method is not supported: " + method);
                }
                try {
                    return method.invoke(producer, objArr);
                } catch (InvocationTargetException e) {
                    throw e.getCause();
                }
            });
        }
        return this.producerProxy;
    }

    public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
        return this.hasProducer.get();
    }
}
