package reactor.aeron.publisher;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.AeronUtils;
import reactor.aeron.utils.ServiceMessagePublicationFailedException;
import reactor.aeron.utils.ServiceMessageType;
import reactor.core.Exceptions;
import reactor.core.Producer;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.TimedScheduler;
import reactor.ipc.buffer.Buffer;
import reactor.util.Logger;
import reactor.util.Loggers;
import uk.co.real_logic.aeron.Publication;

/* loaded from: input_file:reactor/aeron/publisher/AeronFlux.class */
public final class AeronFlux extends Flux<Buffer> implements Producer {
    private static final Logger logger = Loggers.getLogger(AeronFlux.class);
    final AeronInfra aeronInfra;
    private final Context context;
    private final ExecutorService executor;
    private final Publication serviceRequestPub;
    private final HeartbeatSender heartbeatSender;
    private final ServiceMessageSender serviceMessageSender;
    private volatile SignalPoller signalPoller;
    private final String sessionId;
    private final AtomicBoolean alive = new AtomicBoolean(true);
    private volatile boolean terminated = false;
    private final AtomicBoolean subscribed = new AtomicBoolean(false);

    public static Flux<Buffer> listenOn(Context context) {
        return new AeronFlux(context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AeronFlux(Context context) {
        Objects.requireNonNull(context.receiverChannel(), "'receiverChannel' should be provided");
        context.validate();
        this.context = context;
        this.aeronInfra = context.aeronInfra();
        this.executor = Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, AeronUtils.makeThreadName(context.name(), "publisher", "signal-poller"));
        });
        this.serviceRequestPub = createServiceRequestPub(context, this.aeronInfra);
        this.sessionId = getSessionId(context);
        this.serviceMessageSender = new ServiceMessageSender(this, this.serviceRequestPub, this.sessionId);
        this.heartbeatSender = new HeartbeatSender(context, new ServiceMessageSender(this, this.serviceRequestPub, this.sessionId), new Consumer<Throwable>() { // from class: reactor.aeron.publisher.AeronFlux.1
            @Override // java.util.function.Consumer
            public void accept(Throwable th) {
                AeronFlux.this.shutdown();
            }
        });
        logger.info("publisher initialized, sessionId: {}", new Object[]{this.sessionId});
    }

    protected String getSessionId(Context context) {
        return AeronUtils.isMulticastCommunication(context) ? UUIDUtils.create().toString() : context.receiverChannel() + "/" + context.streamId();
    }

    private Publication createServiceRequestPub(Context context, AeronInfra aeronInfra) {
        return aeronInfra.addPublication(context.senderChannel(), context.serviceRequestStreamId());
    }

    public void subscribe(Subscriber<? super Buffer> subscriber) {
        if (subscriber == null) {
            throw Exceptions.argumentIsNullException();
        }
        if (!this.subscribed.compareAndSet(false, true)) {
            throw new IllegalStateException("Only single subscriber is supported");
        }
        this.signalPoller = createSignalsPoller(subscriber);
        try {
            this.executor.execute(this.signalPoller);
            this.heartbeatSender.start();
        } catch (Throwable th) {
            this.signalPoller = null;
            this.subscribed.set(false);
            subscriber.onError(new RuntimeException("Failed to schedule poller for signals", th));
        }
    }

    private SignalPoller createSignalsPoller(Subscriber<? super Buffer> subscriber) {
        return new SignalPoller(this.context, this.serviceMessageSender, subscriber, this.aeronInfra, () -> {
            this.heartbeatSender.shutdown();
            terminateSession();
            this.signalPoller = null;
            this.subscribed.set(false);
            shutdown();
        });
    }

    private void terminateSession() {
        try {
            this.serviceMessageSender.sendCancel();
        } catch (Exception e) {
            this.context.errorConsumer().accept(new ServiceMessagePublicationFailedException(ServiceMessageType.Cancel, e));
        }
    }

    public void shutdown() {
        if (this.alive.compareAndSet(true, false)) {
            TimedScheduler timer = Schedulers.timer();
            timer.schedule(() -> {
                if (this.signalPoller != null) {
                    this.signalPoller.shutdown();
                }
                this.executor.shutdown();
                timer.schedule(new Runnable() { // from class: reactor.aeron.publisher.AeronFlux.2
                    @Override // java.lang.Runnable
                    public void run() {
                        if (!AeronFlux.this.executor.isTerminated()) {
                            timer.schedule(this);
                            return;
                        }
                        AeronFlux.this.aeronInfra.close(AeronFlux.this.serviceRequestPub);
                        AeronFlux.this.aeronInfra.shutdown();
                        AeronFlux.logger.info("publisher shutdown, sessionId: {}", new Object[]{AeronFlux.this.sessionId});
                        AeronFlux.this.terminated = true;
                    }
                });
            });
        }
    }

    public Object downstream() {
        return this.signalPoller;
    }

    public boolean alive() {
        return this.alive.get();
    }

    public boolean isTerminated() {
        return this.terminated;
    }
}
