package reactor.kafka.receiver.internals;

import java.util.Iterator;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.ConsumerFlux;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver.class */
public class DefaultKafkaReceiver<K, V> implements KafkaReceiver<K, V> {
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    ConsumerFlux<K, V> consumerFlux;

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions.toImmutable();
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ReceiverRecord<K, V>> receive() {
        ConsumerFlux<K, V> createConsumerFlux = createConsumerFlux(AckMode.MANUAL_ACK);
        Flux map = createConsumerFlux.doFinally(signalType -> {
            dispose();
        }).flatMapIterable(consumerRecords -> {
            return consumerRecords;
        }).map(consumerRecord -> {
            createConsumerFlux.getClass();
            return new ReceiverRecord(consumerRecord, new ConsumerFlux.CommittableOffset(createConsumerFlux, consumerRecord));
        });
        createConsumerFlux.getClass();
        return map.doOnRequest((v1) -> {
            r1.handleRequest(v1);
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
        ConsumerFlux<K, V> createConsumerFlux = createConsumerFlux(AckMode.AUTO_ACK);
        Flux doFinally = createConsumerFlux.doFinally(signalType -> {
            dispose();
        });
        createConsumerFlux.getClass();
        return doFinally.doOnRequest((v1) -> {
            r1.handleRequest(v1);
        }).map(consumerRecords -> {
            return Flux.fromIterable(consumerRecords).doAfterTerminate(() -> {
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    createConsumerFlux.getClass();
                    new ConsumerFlux.CommittableOffset(createConsumerFlux, consumerRecord).acknowledge();
                }
            });
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
        ConsumerFlux<K, V> createConsumerFlux = createConsumerFlux(AckMode.ATMOST_ONCE);
        Flux concatMap = createConsumerFlux.doFinally(signalType -> {
            dispose();
        }).concatMap(consumerRecords -> {
            return Flux.fromIterable(consumerRecords).concatMap(consumerRecord -> {
                return createConsumerFlux.commit(consumerRecord).publishOn(createConsumerFlux.scheduler).thenReturn(consumerRecord);
            }, Integer.MAX_VALUE);
        }, Integer.MAX_VALUE);
        createConsumerFlux.getClass();
        return concatMap.doOnRequest((v1) -> {
            r1.handleRequest(v1);
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        ConsumerFlux<K, V> createConsumerFlux = createConsumerFlux(AckMode.EXACTLY_ONCE);
        Flux doFinally = createConsumerFlux.doFinally(signalType -> {
            dispose();
        });
        createConsumerFlux.getClass();
        return doFinally.doOnRequest((v1) -> {
            r1.handleRequest(v1);
        }).map(consumerRecords -> {
            if (consumerRecords.isEmpty()) {
                return Flux.empty();
            }
            CommittableBatch committableBatch = new CommittableBatch();
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                committableBatch.updateOffset(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
            }
            return transactionManager.begin().thenMany(Flux.defer(() -> {
                createConsumerFlux.awaitingTransaction.getAndSet(true);
                return Flux.fromIterable(consumerRecords);
            })).concatWith(transactionManager.sendOffsets(committableBatch.getAndClearOffsets().offsets(), this.receiverOptions.groupId())).doAfterTerminate(() -> {
                createConsumerFlux.awaitingTransaction.set(false);
            });
        }).publishOn(transactionManager.scheduler());
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.defer(() -> {
            return this.consumerFlux.doOnConsumer(function);
        });
    }

    private synchronized ConsumerFlux<K, V> createConsumerFlux(AckMode ackMode) {
        if (this.consumerFlux != null) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        ConsumerFlux<K, V> consumerFlux = new ConsumerFlux<>(ackMode, this.receiverOptions, this.consumerFactory);
        this.consumerFlux = consumerFlux;
        return consumerFlux;
    }

    private synchronized void dispose() {
        if (this.consumerFlux != null) {
            this.consumerFlux.dispose();
            this.consumerFlux = null;
        }
    }
}
