package nstream.adapter.confluent;

import java.time.Duration;
import java.util.Iterator;
import nstream.adapter.common.AdapterUtils;
import nstream.adapter.common.content.ContentMolder;
import nstream.adapter.common.schedule.DeferrableException;
import nstream.adapter.kafka.KafkaAdapterUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/confluent/ConfluentIngestingPatch.class */
public class ConfluentIngestingPatch<K, V> extends ConfluentIngestingAgent<K, V> {
    protected ContentMolder keyMolder;
    protected ContentMolder valueMolder;

    @Override // nstream.adapter.confluent.ConfluentIngestingAgent
    protected void stageReception() {
        loadSettings("confluentIngressConf");
        this.kafkaConsumer = ConfluentAdapterUtils.createConsumer(this.ingressSettings);
        this.kafkaConsumer.subscribe(this.ingressSettings.topics());
        this.keyMolder = ContentMolder.cast(this.ingressSettings.keyMolder());
        this.valueMolder = ContentMolder.cast(this.ingressSettings.valueMolder());
        this.pollTimer = schedule(() -> {
            return this.pollTimer();
        }, this.ingressSettings.firstFetchDelayMillis(), () -> {
            while (true) {
                try {
                    ingest((ConsumerRecords) poll());
                } catch (RuntimeException e) {
                    didFail(new RuntimeException(nodeUri() + ": non-deferrable ExecutorAgent failure; canceled associated timer", e));
                    return;
                } catch (DeferrableException e2) {
                    handleDeferrableException(e2);
                }
            }
        });
    }

    protected ConsumerRecords<K, V> poll() {
        return this.kafkaConsumer.poll(Duration.ofMillis(this.ingressSettings.pollTimeoutMillis()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ingest(ConsumerRecords<K, V> consumerRecords) throws DeferrableException {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ingestConsumerRecordStructure(KafkaAdapterUtils.assembleConsumerRecord((ConsumerRecord) it.next(), this.ingressSettings, this.keyMolder, this.valueMolder));
        }
    }

    protected void ingestConsumerRecordStructure(Value value) throws DeferrableException {
        AdapterUtils.ingressDslRelay(this.ingressSettings.relaySchema(), agentContext(), value);
    }
}
