package nstream.adapter.confluent;

import java.time.Duration;
import nstream.adapter.common.schedule.DeferrableException;
import nstream.adapter.kafka.KafkaIngestingAgent;

/* loaded from: input_file:nstream/adapter/confluent/ConfluentIngestingAgent.class */
public abstract class ConfluentIngestingAgent<K, V> extends KafkaIngestingAgent<K, V> {
    protected void stageReception() {
        loadSettings("confluentIngressConf");
        this.kafkaConsumer = ConfluentAdapterUtils.createConsumer(this.ingressSettings);
        this.kafkaConsumer.subscribe(this.ingressSettings.topics());
        this.pollTimer = schedule(() -> {
            return this.pollTimer();
        }, this.ingressSettings.firstFetchDelayMillis(), () -> {
            while (true) {
                try {
                    ingest(this.kafkaConsumer.poll(Duration.ofMillis(this.ingressSettings.pollTimeoutMillis())));
                } catch (RuntimeException e) {
                    didFail(new RuntimeException(nodeUri() + ": non-deferrable ExecutorAgent failure; canceled associated timer", e));
                    return;
                } catch (DeferrableException e2) {
                    handleDeferrableException(e2);
                }
            }
        });
    }
}
