package reactor.kafka.sender.internals;

import io.micrometer.observation.Observation;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.observation.KafkaRecordSenderContext;
import reactor.kafka.sender.observation.KafkaSenderObservation;
import reactor.util.context.Context;

/* loaded from: input_file:reactor/kafka/sender/internals/SendSubscriber.class */
class SendSubscriber<K, V, C> implements CoreSubscriber<ProducerRecord<K, V>> {
    private final CoreSubscriber<? super SenderResult<C>> actual;
    private final Producer<K, V> producer;
    private final AtomicInteger inflight = new AtomicInteger();
    private final AtomicReference<Throwable> firstException = new AtomicReference<>();
    private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
    private final SenderOptions<K, V> senderOptions;
    private final String producerId;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/sender/internals/SendSubscriber$State.class */
    public enum State {
        INIT,
        ACTIVE,
        INBOUND_DONE,
        COMPLETE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SendSubscriber(SenderOptions<K, V> senderOptions, Producer<K, V> producer, String str, CoreSubscriber<? super SenderResult<C>> coreSubscriber) {
        this.senderOptions = senderOptions;
        this.producer = producer;
        this.producerId = str;
        this.actual = coreSubscriber;
    }

    public Context currentContext() {
        return this.actual.currentContext();
    }

    public void onSubscribe(Subscription subscription) {
        this.state.set(State.ACTIVE);
        this.actual.onSubscribe(subscription);
    }

    public void onNext(ProducerRecord<K, V> producerRecord) {
        if (this.state.get() == State.COMPLETE) {
            Operators.onNextDropped(producerRecord, currentContext());
            return;
        }
        this.inflight.incrementAndGet();
        if (this.senderOptions.isTransactional()) {
            DefaultKafkaSender.log.trace("Transactional send initiated for producer {} in state {} inflight {}: {}", new Object[]{this.senderOptions.transactionalId(), this.state, this.inflight, producerRecord});
        }
        Object correlationMetadata = producerRecord instanceof SenderRecord ? ((SenderRecord) producerRecord).correlationMetadata() : null;
        Callback callback = (recordMetadata, exc) -> {
            if (this.senderOptions.isTransactional()) {
                DefaultKafkaSender.log.trace("Transactional send completed for producer {} in state {} inflight {}: {}", new Object[]{this.senderOptions.transactionalId(), this.state, this.inflight, producerRecord});
            }
            if (this.state.get() == State.COMPLETE) {
                return;
            }
            if (exc != null) {
                DefaultKafkaSender.log.trace("Sender failed: ", exc);
                this.firstException.compareAndSet(null, exc);
                if (this.senderOptions.stopOnError() || this.senderOptions.fatalException(exc)) {
                    onError(exc);
                    return;
                }
            }
            this.actual.onNext(new Response(recordMetadata, exc, correlationMetadata));
            if (this.inflight.decrementAndGet() == 0) {
                maybeComplete();
            }
        };
        try {
            KafkaSenderObservation.SENDER_OBSERVATION.observation(this.senderOptions.observationConvention(), KafkaSenderObservation.DefaultKafkaSenderObservationConvention.INSTANCE, () -> {
                return new KafkaRecordSenderContext(producerRecord, this.producerId, this.senderOptions.bootstrapServers());
            }, this.senderOptions.observationRegistry()).parentObservation((Observation) currentContext().getOrDefault("micrometer.observation", (Object) null)).observe(() -> {
                return this.producer.send(producerRecord, callback);
            });
        } catch (Exception e) {
            callback.onCompletion((RecordMetadata) null, e);
        }
    }

    public void onError(Throwable th) {
        DefaultKafkaSender.log.trace("Sender failed with exception", th);
        if (this.state.getAndSet(State.COMPLETE) == State.COMPLETE) {
            Operators.onErrorDropped(th, currentContext());
        } else {
            this.actual.onError(th);
        }
    }

    public void onComplete() {
        if (this.state.compareAndSet(State.ACTIVE, State.INBOUND_DONE) && this.inflight.get() == 0) {
            maybeComplete();
        }
    }

    private void maybeComplete() {
        if (this.state.compareAndSet(State.INBOUND_DONE, State.COMPLETE)) {
            Throwable th = this.firstException.get();
            if (th != null) {
                this.actual.onError(th);
            } else {
                this.actual.onComplete();
            }
        }
    }
}
