package reactor.kafka.receiver.internals;

import java.util.Iterator;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:reactor/kafka/receiver/internals/DefaultKafkaReceiver.class */
public class DefaultKafkaReceiver<K, V> implements KafkaReceiver<K, V> {
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    Predicate<Throwable> isRetriableException;
    ConsumerHandler<K, V> consumerHandler;

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        Class<RetriableCommitFailedException> cls = RetriableCommitFailedException.class;
        RetriableCommitFailedException.class.getClass();
        this.isRetriableException = (v1) -> {
            return r1.isInstance(v1);
        };
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions;
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ReceiverRecord<K, V>> receive() {
        return (Flux<ReceiverRecord<K, V>>) withHandler(AckMode.MANUAL_ACK, (scheduler, consumerHandler) -> {
            return consumerHandler.receive().publishOn(scheduler).flatMapIterable(consumerRecords -> {
                return consumerRecords;
            }).map(consumerRecord -> {
                return new ReceiverRecord(consumerRecord, consumerHandler.toCommittableOffset(consumerRecord));
            });
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
        return (Flux<Flux<ConsumerRecord<K, V>>>) withHandler(AckMode.AUTO_ACK, (scheduler, consumerHandler) -> {
            return consumerHandler.receive().filter(consumerRecords -> {
                return !consumerRecords.isEmpty();
            }).publishOn(scheduler).map(consumerRecords2 -> {
                return Flux.fromIterable(consumerRecords2).doAfterTerminate(() -> {
                    Iterator it = consumerRecords2.iterator();
                    while (it.hasNext()) {
                        consumerHandler.acknowledge((ConsumerRecord) it.next());
                    }
                });
            });
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
        return (Flux<ConsumerRecord<K, V>>) withHandler(AckMode.ATMOST_ONCE, (scheduler, consumerHandler) -> {
            return consumerHandler.receive().concatMap(consumerRecords -> {
                return Flux.fromIterable(consumerRecords).concatMap(consumerRecord -> {
                    return consumerHandler.commit(consumerRecord).thenReturn(consumerRecord);
                }).publishOn(scheduler);
            });
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        return (Flux<Flux<ConsumerRecord<K, V>>>) withHandler(AckMode.EXACTLY_ONCE, (scheduler, consumerHandler) -> {
            return consumerHandler.receive().filter(consumerRecords -> {
                return !consumerRecords.isEmpty();
            }).map(consumerRecords2 -> {
                CommittableBatch committableBatch = new CommittableBatch();
                Iterator it = consumerRecords2.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    committableBatch.updateOffset(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
                }
                return transactionManager.begin().thenMany(Flux.defer(() -> {
                    consumerHandler.awaitingTransaction.getAndSet(true);
                    return Flux.fromIterable(consumerRecords2);
                })).concatWith(transactionManager.sendOffsets(committableBatch.getAndClearOffsets().offsets(), this.receiverOptions.groupId())).doAfterTerminate(() -> {
                    consumerHandler.awaitingTransaction.set(false);
                });
            }).publishOn(transactionManager.scheduler());
        });
    }

    @Override // reactor.kafka.receiver.KafkaReceiver
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return this.consumerHandler == null ? Mono.error(new IllegalStateException("You must call one of receive*() methods before using doOnConsumer")) : this.consumerHandler.doOnConsumer(function);
    }

    private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> biFunction) {
        return Flux.usingWhen(Mono.fromCallable(() -> {
            ConsumerHandler<K, V> consumerHandler = new ConsumerHandler<>(this.receiverOptions, this.consumerFactory.createConsumer(this.receiverOptions), th -> {
                return this.isRetriableException.test(th);
            }, ackMode);
            this.consumerHandler = consumerHandler;
            return consumerHandler;
        }), consumerHandler -> {
            return Flux.using(() -> {
                return Schedulers.single(this.receiverOptions.schedulerSupplier().get());
            }, scheduler -> {
                return (Flux) biFunction.apply(scheduler, consumerHandler);
            }, (v0) -> {
                v0.dispose();
            });
        }, consumerHandler2 -> {
            return consumerHandler2.close().doFinally(signalType -> {
                this.consumerHandler = null;
            });
        });
    }
}
