package nstream.adapter.pulsar;

import java.util.Map;
import java.util.concurrent.Executor;
import nstream.adapter.common.AdapterSettings;
import nstream.adapter.common.egress.PublisherAgent;
import nstream.adapter.common.schedule.DeferrableException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/pulsar/PulsarPublishingAgent.class */
public abstract class PulsarPublishingAgent<V> extends PublisherAgent<AdapterSettings, V> {
    protected PulsarClient pulsarClient;
    protected volatile Producer<V> pulsarProducer;

    protected AdapterSettings parseEgressSettings(Value value) {
        return null;
    }

    protected abstract Schema<V> pulsarSchema();

    protected PulsarClient pulsarClient() {
        return this.pulsarClient;
    }

    protected void assignClient(PulsarClient pulsarClient) {
        this.pulsarClient = pulsarClient;
    }

    protected Producer<V> pulsarProducer() {
        return this.pulsarProducer;
    }

    protected void assignProducer(Producer<V> producer) {
        if (this.pulsarProducer != null) {
            throw new RuntimeException(nodeUri() + ": producer already assigned");
        }
        this.pulsarProducer = producer;
    }

    protected void assignProducer(Map<String, Object> map, Runnable runnable) {
        if (this.pulsarClient == null) {
            throw new RuntimeException(nodeUri() + ": can't producer producer with unassigned client");
        }
        if (this.pulsarProducer != null) {
            throw new RuntimeException(nodeUri() + ": producer already assigned");
        }
        execute(() -> {
            try {
                this.pulsarClient.newProducer(pulsarSchema()).loadConf(map).create();
                runnable.run();
            } catch (Exception e) {
                throw new RuntimeException(nodeUri() + ": failed to assign producer", e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publish(V v) throws DeferrableException {
        if (this.pulsarProducer == null) {
            throw new RuntimeException(nodeUri() + ": can't publish before assigning producer");
        }
        this.pulsarProducer.sendAsync(v).exceptionally(th -> {
            didFail(new RuntimeException(nodeUri() + ": failed to produce message", th));
            return null;
        }).thenAcceptAsync(messageId -> {
            if (messageId != null) {
                trace(nodeUri() + ": successfully published " + messageId);
            }
        }, (Executor) asyncStage());
    }

    public void didStart() {
        info(nodeUri() + ": didStart");
        stagePublication();
    }
}
