package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.internals.CommittableBatch;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux.class */
public class ConsumerFlux<K, V> extends Flux<ConsumerRecords<K, V>> implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerFlux.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets"));
    final AckMode ackMode;
    final ReceiverOptions<K, V> receiverOptions;
    final ConsumerFactory consumerFactory;
    final Scheduler scheduler;
    Consumer<K, V> consumer;
    Consumer<K, V> consumerProxy;
    UnicastProcessor<ConsumerFlux<K, V>.Event> eventEmitter;
    FluxSink<ConsumerFlux<K, V>.Event> eventSubmission;
    final AtomicBoolean isActive = new AtomicBoolean();
    final AtomicBoolean isClosed = new AtomicBoolean();
    final List<Disposable> subscribeDisposables = new ArrayList();
    final AtomicBoolean awaitingTransaction = new AtomicBoolean();
    final EmitterProcessor<ConsumerRecords<K, V>> recordEmitter = EmitterProcessor.create();
    final FluxSink<ConsumerRecords<K, V>> recordSubmission = this.recordEmitter.sink();
    final AtmostOnceOffsets atmostOnceOffsets = new AtmostOnceOffsets();
    final ConsumerFlux<K, V>.PollEvent pollEvent = new PollEvent();
    ConsumerFlux<K, V>.CommitEvent commitEvent = new CommitEvent();

    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$CloseEvent.class */
    private class CloseEvent extends ConsumerFlux<K, V>.Event {
        private final long closeEndTimeNanos;
        private final CountDownLatch latch;

        CloseEvent(Duration duration) {
            super();
            this.latch = new CountDownLatch(1);
            this.closeEndTimeNanos = System.nanoTime() + duration.toNanos();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (ConsumerFlux.this.consumer != null) {
                    Collection<TopicPartition> assignment = ConsumerFlux.this.receiverOptions.assignment();
                    if (assignment != null && !assignment.isEmpty()) {
                        ConsumerFlux.this.onPartitionsRevoked(assignment);
                    }
                    for (int i = 0; i < 3; i++) {
                        try {
                            boolean undoCommitAhead = ConsumerFlux.this.ackMode == AckMode.ATMOST_ONCE ? ConsumerFlux.this.atmostOnceOffsets.undoCommitAhead(ConsumerFlux.this.commitEvent.commitBatch) : true;
                            if (ConsumerFlux.this.ackMode != AckMode.EXACTLY_ONCE) {
                                ConsumerFlux.this.commitEvent.runIfRequired(undoCommitAhead);
                                ConsumerFlux.this.commitEvent.waitFor(this.closeEndTimeNanos);
                            }
                            long nanoTime = this.closeEndTimeNanos - System.nanoTime();
                            if (nanoTime < 0) {
                                nanoTime = 0;
                            }
                            ConsumerFlux.this.consumer.close(nanoTime, TimeUnit.NANOSECONDS);
                            break;
                        } catch (WakeupException e) {
                            if (i == 3 - 1) {
                                throw e;
                            }
                        }
                    }
                }
                this.latch.countDown();
            } catch (Exception e2) {
                ConsumerFlux.log.error("Unexpected exception during close", e2);
                ConsumerFlux.this.fail(e2);
            }
        }

        boolean await(long j) throws InterruptedException {
            return this.latch.await(j, TimeUnit.NANOSECONDS);
        }

        boolean await() {
            boolean z = false;
            while (!z) {
                long nanoTime = this.closeEndTimeNanos - System.nanoTime();
                if (nanoTime <= 0) {
                    break;
                }
                try {
                    z = await(nanoTime);
                } catch (InterruptedException e) {
                }
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$CommitEvent.class */
    public class CommitEvent extends ConsumerFlux<K, V>.Event {
        final CommittableBatch commitBatch;
        private final AtomicBoolean isPending;
        private final AtomicInteger inProgress;
        private final AtomicInteger consecutiveCommitFailures;

        CommitEvent() {
            super();
            this.commitBatch = new CommittableBatch();
            this.isPending = new AtomicBoolean();
            this.inProgress = new AtomicInteger();
            this.consecutiveCommitFailures = new AtomicInteger();
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Failed to find 'out' block for switch in B:14:0x003b. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            if (this.isPending.compareAndSet(true, false)) {
                CommittableBatch.CommitArgs andClearOffsets = this.commitBatch.getAndClearOffsets();
                if (andClearOffsets != null) {
                    try {
                        if (!andClearOffsets.offsets().isEmpty()) {
                            this.inProgress.incrementAndGet();
                            switch (ConsumerFlux.this.ackMode) {
                                case ATMOST_ONCE:
                                    try {
                                        ConsumerFlux.this.consumer.commitSync(andClearOffsets.offsets());
                                        handleSuccess(andClearOffsets, andClearOffsets.offsets());
                                        ConsumerFlux.this.atmostOnceOffsets.onCommit(andClearOffsets.offsets());
                                    } catch (Exception e) {
                                        handleFailure(andClearOffsets, e);
                                    }
                                    this.inProgress.decrementAndGet();
                                    break;
                                case AUTO_ACK:
                                case MANUAL_ACK:
                                    ConsumerFlux.this.consumer.commitAsync(andClearOffsets.offsets(), (map, exc) -> {
                                        this.inProgress.decrementAndGet();
                                        if (exc == null) {
                                            handleSuccess(andClearOffsets, map);
                                        } else {
                                            handleFailure(andClearOffsets, exc);
                                        }
                                    });
                                    ConsumerFlux.this.pollEvent.scheduleIfRequired();
                                    break;
                            }
                        } else {
                            handleSuccess(andClearOffsets, andClearOffsets.offsets());
                        }
                    } catch (Exception e2) {
                        ConsumerFlux.log.error("Unexpected exception", e2);
                        this.inProgress.decrementAndGet();
                        handleFailure(andClearOffsets, e2);
                    }
                }
            }
        }

        void runIfRequired(boolean z) {
            if (z) {
                this.isPending.set(true);
            }
            if (this.isPending.get()) {
                run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> map) {
            if (!map.isEmpty()) {
                this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                Iterator<MonoSink<Void>> it = commitArgs.callbackEmitters().iterator();
                while (it.hasNext()) {
                    it.next().success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exc) {
            ConsumerFlux.log.warn("Commit failed", exc);
            if (isRetriableException(exc) && !ConsumerFlux.this.isClosed.get() && this.consecutiveCommitFailures.incrementAndGet() < ConsumerFlux.this.receiverOptions.maxCommitAttempts()) {
                this.commitBatch.restoreOffsets(commitArgs, true);
                ConsumerFlux.log.warn("Commit failed with exception" + exc + ", retries remaining " + (ConsumerFlux.this.receiverOptions.maxCommitAttempts() - this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                ConsumerFlux.this.pollEvent.scheduleIfRequired();
                return;
            }
            List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
            if (callbackEmitters == null || callbackEmitters.isEmpty()) {
                ConsumerFlux.this.fail(exc);
                return;
            }
            this.isPending.set(false);
            this.commitBatch.restoreOffsets(commitArgs, false);
            Iterator<MonoSink<Void>> it = callbackEmitters.iterator();
            while (it.hasNext()) {
                it.next().error(exc);
            }
        }

        ConsumerFlux<K, V>.CommitEvent periodicEvent() {
            this.isPending.set(true);
            return this;
        }

        void scheduleIfRequired() {
            if (ConsumerFlux.this.isActive.get() && this.isPending.compareAndSet(false, true)) {
                ConsumerFlux.this.eventSubmission.next(this);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitFor(long j) {
            while (this.inProgress.get() > 0 && j - System.nanoTime() > 0) {
                ConsumerFlux.this.consumer.poll(Duration.ofMillis(1L));
            }
        }

        protected boolean isRetriableException(Exception exc) {
            return exc instanceof RetriableCommitFailedException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$CommittableOffset.class */
    public class CommittableOffset implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final AtomicBoolean acknowledged;

        public CommittableOffset(ConsumerFlux consumerFlux, ConsumerRecord<K, V> consumerRecord) {
            this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
        }

        public CommittableOffset(TopicPartition topicPartition, long j) {
            this.topicPartition = topicPartition;
            this.commitOffset = j;
            this.acknowledged = new AtomicBoolean(false);
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public Mono<Void> commit() {
            return maybeUpdateOffset() > 0 ? scheduleCommit() : Mono.empty();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public void acknowledge() {
            int commitBatchSize = ConsumerFlux.this.receiverOptions.commitBatchSize();
            long maybeUpdateOffset = maybeUpdateOffset();
            if (commitBatchSize <= 0 || maybeUpdateOffset < commitBatchSize) {
                return;
            }
            ConsumerFlux.this.commitEvent.scheduleIfRequired();
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override // reactor.kafka.receiver.ReceiverOffset
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            return this.acknowledged.compareAndSet(false, true) ? ConsumerFlux.this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset) : ConsumerFlux.this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(monoSink -> {
                ConsumerFlux.this.commitEvent.commitBatch.addCallbackEmitter(monoSink);
                ConsumerFlux.this.commitEvent.scheduleIfRequired();
            });
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }

    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$CustomEvent.class */
    class CustomEvent<T> extends ConsumerFlux<K, V>.Event {
        private final Function<Consumer<K, V>, ? extends T> function;
        private final MonoSink<T> monoSink;

        CustomEvent(Function<Consumer<K, V>, ? extends T> function, MonoSink<T> monoSink) {
            super();
            this.function = function;
            this.monoSink = monoSink;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ConsumerFlux.this.isActive.get()) {
                try {
                    this.monoSink.success(this.function.apply(consumerProxy()));
                } catch (Throwable th) {
                    this.monoSink.error(th);
                }
            }
        }

        private Consumer<K, V> consumerProxy() {
            if (ConsumerFlux.this.consumerProxy == null) {
                InvocationHandler invocationHandler = (obj, method, objArr) -> {
                    if (!ConsumerFlux.DELEGATE_METHODS.contains(method.getName())) {
                        throw new UnsupportedOperationException("Method is not supported: " + method);
                    }
                    try {
                        return method.invoke(ConsumerFlux.this.consumer, objArr);
                    } catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                };
                ConsumerFlux.this.consumerProxy = (Consumer) Proxy.newProxyInstance(Consumer.class.getClassLoader(), new Class[]{Consumer.class}, invocationHandler);
            }
            return ConsumerFlux.this.consumerProxy;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$Event.class */
    public abstract class Event implements Runnable {
        Event() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$InitEvent.class */
    public class InitEvent extends ConsumerFlux<K, V>.Event {
        InitEvent() {
            super();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ConsumerFlux.this.isActive.set(true);
                ConsumerFlux.this.isClosed.set(false);
                ConsumerFlux.this.consumer = ConsumerFlux.this.consumerFactory.createConsumer(ConsumerFlux.this.receiverOptions);
                ConsumerFlux.this.receiverOptions.subscriber(new ConsumerRebalanceListener() { // from class: reactor.kafka.receiver.internals.ConsumerFlux.InitEvent.1
                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                        ConsumerFlux.log.debug("onPartitionsAssigned {}", collection);
                        if (collection.isEmpty()) {
                            return;
                        }
                        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = ConsumerFlux.this.receiverOptions.assignListeners().iterator();
                        while (it.hasNext()) {
                            it.next().accept(ConsumerFlux.this.toSeekable(collection));
                        }
                    }

                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                        ConsumerFlux.this.onPartitionsRevoked(collection);
                    }
                }).accept(ConsumerFlux.this.consumer);
            } catch (Exception e) {
                if (ConsumerFlux.this.isActive.get()) {
                    ConsumerFlux.log.error("Unexpected exception", e);
                    ConsumerFlux.this.fail(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/kafka/receiver/internals/ConsumerFlux$PollEvent.class */
    public class PollEvent extends ConsumerFlux<K, V>.Event {
        private final AtomicInteger pendingCount;
        private final Duration pollTimeout;
        private final AtomicBoolean partitionsPaused;
        final AtomicLong requestsPending;

        PollEvent() {
            super();
            this.pendingCount = new AtomicInteger();
            this.pollTimeout = ConsumerFlux.this.receiverOptions.pollTimeout();
            this.partitionsPaused = new AtomicBoolean();
            this.requestsPending = new AtomicLong();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (ConsumerFlux.this.isActive.get()) {
                    ConsumerFlux.this.commitEvent.runIfRequired(false);
                    this.pendingCount.decrementAndGet();
                    if (this.requestsPending.get() <= 0 || ConsumerFlux.this.awaitingTransaction.get()) {
                        if (!this.partitionsPaused.getAndSet(true)) {
                            ConsumerFlux.this.consumer.pause(ConsumerFlux.this.consumer.assignment());
                        }
                    } else if (this.partitionsPaused.getAndSet(false)) {
                        ConsumerFlux.this.consumer.resume(ConsumerFlux.this.consumer.assignment());
                    }
                    ConsumerRecords poll = ConsumerFlux.this.consumer.poll(this.pollTimeout);
                    if (poll.count() > 0) {
                        ConsumerFlux.this.recordSubmission.next(poll);
                    }
                    if (ConsumerFlux.this.isActive.get()) {
                        int count = ((ConsumerFlux.this.ackMode == AckMode.AUTO_ACK || ConsumerFlux.this.ackMode == AckMode.EXACTLY_ONCE) && poll.count() > 0) ? 1 : poll.count();
                        if (this.requestsPending.get() == Long.MAX_VALUE || this.requestsPending.addAndGet(0 - count) > 0 || ((CommitEvent) ConsumerFlux.this.commitEvent).inProgress.get() > 0) {
                            scheduleIfRequired();
                        }
                    }
                }
            } catch (Exception e) {
                if (ConsumerFlux.this.isActive.get()) {
                    ConsumerFlux.log.error("Unexpected exception", e);
                    ConsumerFlux.this.fail(e);
                }
            }
        }

        void scheduleIfRequired() {
            if (this.pendingCount.get() <= 0) {
                ConsumerFlux.this.eventSubmission.next(this);
                this.pendingCount.incrementAndGet();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerFlux(AckMode ackMode, ReceiverOptions<K, V> receiverOptions, ConsumerFactory consumerFactory) {
        this.ackMode = ackMode;
        this.receiverOptions = receiverOptions;
        this.consumerFactory = consumerFactory;
        this.scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
    }

    public void subscribe(CoreSubscriber<? super ConsumerRecords<K, V>> coreSubscriber) {
        try {
            start();
            this.recordEmitter.publishOn(this.scheduler).doOnRequest(j -> {
                if (this.pollEvent.requestsPending.get() > 0) {
                    this.pollEvent.scheduleIfRequired();
                }
            }).subscribe(coreSubscriber);
        } catch (Exception e) {
            log.error("Subscription to event flux failed", e);
            Operators.error(coreSubscriber, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.debug("onPartitionsRevoked {}", collection);
        if (collection.isEmpty()) {
            return;
        }
        if (this.ackMode != AckMode.ATMOST_ONCE) {
            this.commitEvent.runIfRequired(true);
        }
        Iterator<java.util.function.Consumer<Collection<ReceiverPartition>>> it = this.receiverOptions.revokeListeners().iterator();
        while (it.hasNext()) {
            it.next().accept(toSeekable(collection));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new SeekablePartition(this.consumer, it.next()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fail(Throwable th) {
        log.error("Consumer flux exception", th);
        this.recordSubmission.error(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRequest(Long l) {
        if (OperatorUtils.safeAddAndGet(this.pollEvent.requestsPending, l.longValue()) > 0) {
            this.pollEvent.scheduleIfRequired();
        }
    }

    private void start() {
        log.debug("start");
        if (!this.isActive.compareAndSet(false, true)) {
            throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
        }
        this.awaitingTransaction.set(false);
        this.eventEmitter = UnicastProcessor.create();
        this.eventSubmission = this.eventEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
        Disposable newEvent = KafkaSchedulers.newEvent(this.receiverOptions.groupId());
        this.subscribeDisposables.add(newEvent);
        newEvent.start();
        this.subscribeDisposables.add(((Flux) this.eventEmitter.as(flux -> {
            switch (this.ackMode) {
                case AUTO_ACK:
                case MANUAL_ACK:
                    Duration commitInterval = this.receiverOptions.commitInterval();
                    return commitInterval.isZero() ? flux : flux.mergeWith(Flux.interval(commitInterval).onBackpressureLatest().map(l -> {
                        return this.commitEvent.periodicEvent();
                    }));
                default:
                    return flux;
            }
        })).startWith(new Event[]{new InitEvent()}).publishOn(newEvent).doOnNext(event -> {
            log.trace("doEvent {}", event.getClass());
            try {
                event.run();
            } catch (Exception e) {
                fail(e);
            }
        }).subscribe());
    }

    /* JADX WARN: Incorrect condition in loop: B:26:0x0093 */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void dispose() {
        /*
            Method dump skipped, instructions count: 1218
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: reactor.kafka.receiver.internals.ConsumerFlux.dispose():void");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create(monoSink -> {
            this.eventSubmission.next(new CustomEvent(function, monoSink));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> commit(ConsumerRecord<K, V> consumerRecord) {
        long offset = consumerRecord.offset();
        TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
        long committedOffset = this.atmostOnceOffsets.committedOffset(topicPartition);
        this.atmostOnceOffsets.onDispatch(topicPartition, offset);
        long atmostOnceCommitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
        CommittableOffset committableOffset = new CommittableOffset(topicPartition, offset + atmostOnceCommitAheadSize);
        if (offset >= committedOffset) {
            return committableOffset.commit();
        }
        if (committedOffset - offset >= atmostOnceCommitAheadSize / 2) {
            committableOffset.commit().subscribe();
        }
        return Mono.empty();
    }
}
