package nstream.adapter.pulsar;

import java.lang.invoke.SerializedLambda;
import java.util.Map;
import nstream.adapter.common.ext.PulsarIngressSettings;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/pulsar/PulsarIngestingAgent.class */
public abstract class PulsarIngestingAgent<V> extends IngestorMetricsAgent<PulsarIngressSettings, Message<V>> {
    protected PulsarClient pulsarClient;
    protected volatile Consumer<V> pulsarConsumer;

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignClient(PulsarClient pulsarClient) {
        this.pulsarClient = pulsarClient;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignConsumer(Map<String, Object> map, Runnable runnable) {
        if (this.pulsarClient == null) {
            throw new RuntimeException(nodeUri() + ": can't assign consumer without assigned client");
        }
        if (this.pulsarConsumer != null) {
            throw new RuntimeException(nodeUri() + ": consumer already assigned");
        }
        try {
            this.pulsarConsumer = this.pulsarClient.newConsumer(pulsarSchema()).loadConf(map).messageListener((consumer, message) -> {
                executeNonblocking(() -> {
                    ingestOrCancel(message);
                });
            }).subscribe();
            runnable.run();
        } catch (Exception e) {
            throw new RuntimeException(nodeUri() + ": failed to assign consumer", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void acknowledge(Message<V> message) {
        execute(() -> {
            try {
                this.pulsarConsumer.acknowledge(message);
            } catch (Exception e) {
                throw new RuntimeException(nodeUri() + ": failed to acknowledge message", e);
            }
        });
    }

    protected void cancel() {
        if (this.pulsarConsumer == null) {
            throw new RuntimeException(nodeUri() + ": already unassigned");
        }
        try {
            try {
                this.pulsarConsumer.unsubscribe();
            } catch (Exception e) {
                throw new RuntimeException(nodeUri() + ": failed to unsubscribe consumer", e);
            }
        } finally {
            try {
                this.pulsarConsumer.close();
            } catch (Exception e2) {
                didFail(new RuntimeException(nodeUri() + ": exception observed in closing pulsarConsumer (resources should be released regardless)", e2));
            }
            this.pulsarConsumer = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void didFailIngest(Message<V> message, Exception exc) {
        didFail(new RuntimeException(nodeUri() + ": message with ID=" + message.getMessageId() + " triggered fatal exception; stopping consumer", exc));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public PulsarIngressSettings m5parseIngressSettings(Value value) {
        PulsarIngressSettings pulsarIngressSettings = (PulsarIngressSettings) PulsarIngressSettings.form().cast(value);
        return pulsarIngressSettings == null ? PulsarIngressSettings.defaultSettings() : pulsarIngressSettings;
    }

    protected Schema<V> pulsarSchema() {
        return PulsarAdapterUtils.schemaForName(this.ingressSettings.schemaType());
    }

    protected void stageReception() {
        loadSettings("pulsarIngressConf");
        assignClient((PulsarClient) ProvisionLoader.getProvision(this.ingressSettings.clientProvisionName()).value());
        assignConsumer((Map) ProvisionLoader.getProvision(this.ingressSettings.consumerConfProvisionName()).value(), () -> {
            info(nodeUri() + ": successfully staged consumer for reception");
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 399692886:
                if (implMethodName.equals("lambda$assignConsumer$c77fff59$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("nstream/adapter/pulsar/PulsarIngestingAgent") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarIngestingAgent pulsarIngestingAgent = (PulsarIngestingAgent) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        executeNonblocking(() -> {
                            ingestOrCancel(message);
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
