package reactor.kafka.receiver.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.internals.CommittableBatch;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerEventLoop.class */
public class ConsumerEventLoop<K, V> implements Sinks.EmitFailureHandler {
    final AtmostOnceOffsets atmostOnceOffsets;
    final AckMode ackMode;
    final ReceiverOptions<K, V> receiverOptions;
    final Scheduler eventScheduler;
    final Predicate<Throwable> isRetriableException;
    private final Disposable periodicCommitDisposable;
    Consumer<K, V> consumer;
    final Sinks.Many<ConsumerRecords<K, V>> sink;
    final AtomicBoolean awaitingTransaction;
    volatile long requested;
    private static final Logger log = LoggerFactory.getLogger(ConsumerEventLoop.class.getName());
    static final AtomicLongFieldUpdater<ConsumerEventLoop> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConsumerEventLoop.class, "requested");
    final AtomicBoolean isActive = new AtomicBoolean(true);
    final ConsumerEventLoop<K, V>.CommitEvent commitEvent = new CommitEvent();
    final ConsumerEventLoop<K, V>.PollEvent pollEvent = new PollEvent();

    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerEventLoop$CloseEvent.class */
    private class CloseEvent implements Runnable {
        private final long closeEndTimeMillis;

        CloseEvent(Duration duration) {
            this.closeEndTimeMillis = System.currentTimeMillis() + duration.toMillis();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (ConsumerEventLoop.this.consumer != null) {
                    Collection<TopicPartition> assignment = ConsumerEventLoop.this.receiverOptions.assignment();
                    if (assignment != null && !assignment.isEmpty()) {
                        ConsumerEventLoop.this.onPartitionsRevoked(assignment);
                    }
                    for (int i = 0; i < 3; i++) {
                        try {
                            boolean undoCommitAhead = ConsumerEventLoop.this.ackMode == AckMode.ATMOST_ONCE ? ConsumerEventLoop.this.atmostOnceOffsets.undoCommitAhead(ConsumerEventLoop.this.commitEvent.commitBatch) : true;
                            if (ConsumerEventLoop.this.ackMode != AckMode.EXACTLY_ONCE) {
                                ConsumerEventLoop.this.commitEvent.runIfRequired(undoCommitAhead);
                                ConsumerEventLoop.this.commitEvent.waitFor(this.closeEndTimeMillis);
                            }
                            long currentTimeMillis = this.closeEndTimeMillis - System.currentTimeMillis();
                            if (currentTimeMillis < 0) {
                                currentTimeMillis = 0;
                            }
                            ConsumerEventLoop.this.consumer.close(Duration.ofMillis(currentTimeMillis));
                            ConsumerEventLoop.this.consumer = null;
                            break;
                        } catch (WakeupException e) {
                            if (i == 3 - 1) {
                                throw e;
                            }
                        }
                    }
                }
            } catch (Exception e2) {
                ConsumerEventLoop.log.error("Unexpected exception during close", e2);
                ConsumerEventLoop.this.sink.emitError(e2, ConsumerEventLoop.this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerEventLoop$CommitEvent.class */
    public class CommitEvent implements Runnable {
        final CommittableBatch commitBatch = new CommittableBatch();
        private final AtomicBoolean isPending = new AtomicBoolean();
        private final AtomicInteger inProgress = new AtomicInteger();
        private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();

        CommitEvent() {
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Failed to find 'out' block for switch in B:14:0x0033. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            if (this.isPending.compareAndSet(true, false)) {
                CommittableBatch.CommitArgs andClearOffsets = this.commitBatch.getAndClearOffsets();
                if (andClearOffsets != null) {
                    try {
                        if (!andClearOffsets.offsets().isEmpty()) {
                            switch (ConsumerEventLoop.this.ackMode) {
                                case AUTO_ACK:
                                case MANUAL_ACK:
                                    this.inProgress.incrementAndGet();
                                    try {
                                        ConsumerEventLoop.this.consumer.commitAsync(andClearOffsets.offsets(), (map, exc) -> {
                                            this.inProgress.decrementAndGet();
                                            if (exc == null) {
                                                handleSuccess(andClearOffsets, map);
                                            } else {
                                                handleFailure(andClearOffsets, exc);
                                            }
                                        });
                                        ConsumerEventLoop.this.pollEvent.schedule();
                                        break;
                                    } catch (Throwable th) {
                                        this.inProgress.decrementAndGet();
                                        throw th;
                                    }
                                case ATMOST_ONCE:
                                    ConsumerEventLoop.this.consumer.commitSync(andClearOffsets.offsets());
                                    handleSuccess(andClearOffsets, andClearOffsets.offsets());
                                    ConsumerEventLoop.this.atmostOnceOffsets.onCommit(andClearOffsets.offsets());
                                    break;
                            }
                        } else {
                            handleSuccess(andClearOffsets, andClearOffsets.offsets());
                        }
                    } catch (Exception e) {
                        ConsumerEventLoop.log.error("Unexpected exception", e);
                        handleFailure(andClearOffsets, e);
                    }
                }
            }
        }

        void runIfRequired(boolean z) {
            if (z) {
                this.isPending.set(true);
            }
            if (this.isPending.get()) {
                run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> map) {
            if (!map.isEmpty()) {
                this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                Iterator<MonoSink<Void>> it = commitArgs.callbackEmitters().iterator();
                while (it.hasNext()) {
                    it.next().success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exc) {
            ConsumerEventLoop.log.warn("Commit failed", exc);
            if (ConsumerEventLoop.this.isRetriableException.test(exc) && ConsumerEventLoop.this.consumer != null && this.consecutiveCommitFailures.incrementAndGet() < ConsumerEventLoop.this.receiverOptions.maxCommitAttempts()) {
                this.commitBatch.restoreOffsets(commitArgs, true);
                ConsumerEventLoop.log.warn("Commit failed with exception" + exc + ", retries remaining " + (ConsumerEventLoop.this.receiverOptions.maxCommitAttempts() - this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                ConsumerEventLoop.this.pollEvent.schedule();
                return;
            }
            List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
            if (callbackEmitters == null || callbackEmitters.isEmpty()) {
                ConsumerEventLoop.this.sink.emitError(exc, ConsumerEventLoop.this);
                return;
            }
            this.isPending.set(false);
            this.commitBatch.restoreOffsets(commitArgs, false);
            Iterator<MonoSink<Void>> it = callbackEmitters.iterator();
            while (it.hasNext()) {
                it.next().error(exc);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void scheduleIfRequired() {
            if (ConsumerEventLoop.this.isActive.get() && this.isPending.compareAndSet(false, true)) {
                ConsumerEventLoop.this.eventScheduler.schedule(this);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitFor(long j) {
            while (this.inProgress.get() > 0 && j - System.currentTimeMillis() > 0) {
                ConsumerEventLoop.this.consumer.poll(Duration.ofMillis(1L));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerEventLoop$PollEvent.class */
    public class PollEvent implements Runnable {
        private final Duration pollTimeout;
        private final AtomicBoolean pausedByUs = new AtomicBoolean();
        private final Set<TopicPartition> pausedByUser = new HashSet();
        private final AtomicBoolean scheduled = new AtomicBoolean();

        PollEvent() {
            this.pollTimeout = ConsumerEventLoop.this.receiverOptions.pollTimeout();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.scheduled.set(false);
                if (ConsumerEventLoop.this.isActive.get()) {
                    ConsumerEventLoop.this.commitEvent.runIfRequired(false);
                    long j = ConsumerEventLoop.this.requested;
                    if (j > 0) {
                        if (ConsumerEventLoop.this.awaitingTransaction.get()) {
                            if (!this.pausedByUs.getAndSet(true)) {
                                this.pausedByUser.addAll(ConsumerEventLoop.this.consumer.paused());
                                ConsumerEventLoop.this.consumer.pause(ConsumerEventLoop.this.consumer.assignment());
                                ConsumerEventLoop.log.debug("Paused - awaiting transaction");
                            }
                        } else if (this.pausedByUs.getAndSet(false)) {
                            HashSet hashSet = new HashSet(ConsumerEventLoop.this.consumer.assignment());
                            hashSet.removeAll(this.pausedByUser);
                            this.pausedByUser.clear();
                            ConsumerEventLoop.this.consumer.resume(hashSet);
                            ConsumerEventLoop.log.debug("Resumed");
                        }
                    } else if (!this.pausedByUs.getAndSet(true)) {
                        this.pausedByUser.addAll(ConsumerEventLoop.this.consumer.paused());
                        ConsumerEventLoop.this.consumer.pause(ConsumerEventLoop.this.consumer.assignment());
                        ConsumerEventLoop.log.debug("Paused - back pressure");
                    }
                    ConsumerRecords poll = ConsumerEventLoop.this.consumer.poll(this.pollTimeout);
                    if (ConsumerEventLoop.this.isActive.get()) {
                        schedule();
                    }
                    if (!poll.isEmpty()) {
                        Operators.produced(ConsumerEventLoop.REQUESTED, ConsumerEventLoop.this, 1L);
                        ConsumerEventLoop.log.debug("Emitting {} records, requested now {}", Integer.valueOf(poll.count()), Long.valueOf(j));
                        ConsumerEventLoop.this.sink.emitNext(poll, ConsumerEventLoop.this);
                    }
                }
            } catch (Exception e) {
                if (ConsumerEventLoop.this.isActive.get()) {
                    ConsumerEventLoop.log.error("Unexpected exception", e);
                    ConsumerEventLoop.this.sink.emitError(e, ConsumerEventLoop.this);
                }
            }
        }

        void schedule() {
            if (this.scheduled.getAndSet(true)) {
                return;
            }
            ConsumerEventLoop.this.eventScheduler.schedule(this);
        }
    }

    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerEventLoop$SubscribeEvent.class */
    class SubscribeEvent implements Runnable {
        SubscribeEvent() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumerEventLoop.log.info("SubscribeEvent");
            try {
                ConsumerEventLoop.this.receiverOptions.subscriber(new ConsumerRebalanceListener() { // from class: reactor.kafka.receiver.internals.ConsumerEventLoop.SubscribeEvent.1
                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        ConsumerEventLoop.log.debug("onPartitionsAssigned {}", collection);
                        if (collection.isEmpty()) {
                            return;
                        }
                        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = ConsumerEventLoop.this.receiverOptions.assignListeners().iterator();
                        while (it.hasNext()) {
                            it.next().accept(ConsumerEventLoop.this.toSeekable(collection));
                        }
                    }

                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                        ConsumerEventLoop.this.onPartitionsRevoked(collection);
                    }
                }).accept(ConsumerEventLoop.this.consumer);
            } catch (Exception e) {
                if (ConsumerEventLoop.this.isActive.get()) {
                    ConsumerEventLoop.log.error("Unexpected exception", e);
                    ConsumerEventLoop.this.sink.emitError(e, ConsumerEventLoop.this);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerEventLoop(AckMode ackMode, AtmostOnceOffsets atmostOnceOffsets, ReceiverOptions<K, V> receiverOptions, Scheduler scheduler, Consumer<K, V> consumer, Predicate<Throwable> predicate, Sinks.Many<ConsumerRecords<K, V>> many, AtomicBoolean atomicBoolean) {
        this.ackMode = ackMode;
        this.atmostOnceOffsets = atmostOnceOffsets;
        this.receiverOptions = receiverOptions;
        this.eventScheduler = scheduler;
        this.consumer = consumer;
        this.isRetriableException = predicate;
        this.sink = many;
        this.awaitingTransaction = atomicBoolean;
        scheduler.schedule(new SubscribeEvent());
        Duration commitInterval = receiverOptions.commitInterval();
        if (commitInterval.isZero()) {
            this.periodicCommitDisposable = Disposables.disposed();
            return;
        }
        switch (ackMode) {
            case AUTO_ACK:
            case MANUAL_ACK:
                Scheduler parallel = Schedulers.parallel();
                ConsumerEventLoop<K, V>.CommitEvent commitEvent = this.commitEvent;
                commitEvent.getClass();
                this.periodicCommitDisposable = parallel.schedulePeriodically(commitEvent::scheduleIfRequired, commitInterval.toMillis(), commitInterval.toMillis(), TimeUnit.MILLISECONDS);
                return;
            default:
                this.periodicCommitDisposable = Disposables.disposed();
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onRequest(long j) {
        if (log.isDebugEnabled()) {
            log.debug("onRequest.toAdd {}", Long.valueOf(j));
        }
        Operators.addCap(REQUESTED, this, j);
        this.pollEvent.schedule();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.debug("onPartitionsRevoked {}", collection);
        if (collection.isEmpty()) {
            return;
        }
        if (this.ackMode != AckMode.ATMOST_ONCE) {
            this.commitEvent.runIfRequired(true);
        }
        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = this.receiverOptions.revokeListeners().iterator();
        while (it.hasNext()) {
            it.next().accept(toSeekable(collection));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new SeekablePartition(this.consumer, it.next()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> stop() {
        return Mono.defer(() -> {
            log.debug("dispose {}", this.isActive);
            if (!this.isActive.compareAndSet(true, false)) {
                return Mono.empty();
            }
            this.periodicCommitDisposable.dispose();
            if (this.consumer == null) {
                return Mono.empty();
            }
            this.consumer.wakeup();
            return (Mono) Mono.fromRunnable(new CloseEvent(this.receiverOptions.closeTimeout())).as(mono -> {
                return mono.subscribeOn(this.eventScheduler);
            });
        }).onErrorResume(th -> {
            log.warn("Cancel exception: " + th);
            return Mono.empty();
        });
    }

    public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
        return this.isActive.get() && emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED;
    }
}
