package nstream.adapter.pulsar;

import java.util.Map;
import nstream.adapter.common.AdapterUtils;
import nstream.adapter.common.ingress.ValueAssembler;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.common.schedule.DeferrableException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/pulsar/PulsarIngestingPatch.class */
public class PulsarIngestingPatch<V> extends PulsarIngestingAgent<V> {
    protected ValueAssembler<V> valueAssembler;

    /* JADX INFO: Access modifiers changed from: protected */
    public void ingest(Message<V> message) throws DeferrableException {
        try {
            ingestStructure(assembleResponse(message));
            acknowledge(message);
        } catch (Exception e) {
            acknowledge(message);
            throw new DeferrableException(nodeUri() + ": failed to assemble message with ID=" + message.getMessageId(), e);
        }
    }

    protected Value assembleResponse(Message<V> message) {
        return PulsarAdapterUtils.assembleConsumerMessage(message, this.valueAssembler, this.ingressSettings.contentTypeOverride());
    }

    protected void ingestStructure(Value value) throws DeferrableException {
        AdapterUtils.ingressDslRelay(this.ingressSettings.relaySchema(), agentContext(), value);
    }

    @Override // nstream.adapter.pulsar.PulsarIngestingAgent
    protected void stageReception() {
        loadSettings("pulsarIngressConf");
        this.valueAssembler = ValueAssembler.create(this.ingressSettings.valueMolder());
        assignClient((PulsarClient) ProvisionLoader.getProvision(this.ingressSettings.clientProvisionName()).value());
        assignConsumer((Map) ProvisionLoader.getProvision(this.ingressSettings.consumerConfProvisionName()).value(), () -> {
            info(nodeUri() + ": successfully staged consumer for reception");
        });
    }
}
