package reactor.aeron.subscriber;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.AeronUtils;
import reactor.core.Loopback;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.TopicProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.TimedScheduler;
import reactor.ipc.buffer.Buffer;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/subscriber/AeronSubscriber.class */
public final class AeronSubscriber implements Subscriber<Buffer>, Trackable, Receiver, Loopback {
    private static final Logger logger = Loggers.getLogger(AeronSubscriber.class);
    private final AtomicBoolean alive;
    private volatile boolean terminated;
    private final Runnable onTerminateTask;
    private final AeronInfra aeronInfra;
    private final ServiceMessageHandler serviceMessageHandler;
    private final ServiceMessagePoller serviceMessagePoller;
    private final TopicProcessor<Buffer> processor;

    public static AeronSubscriber create(Context context) {
        return new AeronSubscriber(context, false);
    }

    public static AeronSubscriber share(Context context) {
        return new AeronSubscriber(context, true);
    }

    public AeronSubscriber(Context context, boolean z, Runnable runnable, Runnable runnable2) {
        this.alive = new AtomicBoolean(true);
        this.terminated = false;
        context.validate();
        this.onTerminateTask = runnable2;
        runnable = runnable == null ? new Runnable() { // from class: reactor.aeron.subscriber.AeronSubscriber.1
            @Override // java.lang.Runnable
            public void run() {
                AeronSubscriber.this.shutdown();
            }
        } : runnable;
        this.aeronInfra = context.aeronInfra();
        this.processor = createTopicProcessor(context, z);
        boolean isMulticastCommunication = AeronUtils.isMulticastCommunication(context);
        if (isMulticastCommunication) {
            this.serviceMessageHandler = new MulticastServiceMessageHandler(this.processor, this.aeronInfra, context, runnable);
        } else {
            this.serviceMessageHandler = new UnicastServiceMessageHandler(this.processor, this.aeronInfra, context, runnable);
        }
        this.serviceMessagePoller = createServiceMessagePoller(context, this.aeronInfra, this.serviceMessageHandler);
        this.serviceMessageHandler.start();
        this.serviceMessagePoller.start();
        Logger logger2 = logger;
        Object[] objArr = new Object[2];
        objArr[0] = isMulticastCommunication ? "multicast" : "unicast";
        objArr[1] = context.senderChannel() + "/" + context.serviceRequestStreamId();
        logger2.info("subscriber initialized in {} mode, service request channel/streamId: {}", objArr);
    }

    public AeronSubscriber(Context context, boolean z) {
        this(context, z, null, new Runnable() { // from class: reactor.aeron.subscriber.AeronSubscriber.2
            @Override // java.lang.Runnable
            public void run() {
            }
        });
    }

    public void onSubscribe(Subscription subscription) {
        this.processor.onSubscribe(subscription);
    }

    public void onNext(Buffer buffer) {
        this.processor.onNext(buffer);
    }

    public void onError(Throwable th) {
        this.processor.onError(th);
    }

    public void onComplete() {
        this.processor.onComplete();
    }

    private ServiceMessagePoller createServiceMessagePoller(Context context, AeronInfra aeronInfra, ServiceMessageHandler serviceMessageHandler) {
        return new ServiceMessagePoller(context, aeronInfra, serviceMessageHandler);
    }

    private TopicProcessor<Buffer> createTopicProcessor(Context context, boolean z) {
        String makeThreadName = AeronUtils.makeThreadName(context.name(), "subscriber", "signal-sender");
        return z ? TopicProcessor.share(makeThreadName, context.ringBufferSize(), context.autoCancel()) : TopicProcessor.create(makeThreadName, context.ringBufferSize(), context.autoCancel());
    }

    public void shutdown() {
        if (this.alive.compareAndSet(true, false)) {
            TimedScheduler timer = Schedulers.timer();
            timer.schedule(() -> {
                this.processor.shutdown();
                this.serviceMessagePoller.shutdown();
                this.serviceMessageHandler.shutdown();
                timer.schedule(new Runnable() { // from class: reactor.aeron.subscriber.AeronSubscriber.3
                    @Override // java.lang.Runnable
                    public void run() {
                        if (!AeronSubscriber.this.serviceMessagePoller.isTerminated()) {
                            timer.schedule(this);
                            return;
                        }
                        AeronSubscriber.this.aeronInfra.shutdown();
                        AeronSubscriber.logger.info("subscriber shutdown");
                        AeronSubscriber.this.terminated = true;
                        AeronSubscriber.this.onTerminateTask.run();
                    }
                });
            });
        }
    }

    public boolean isTerminated() {
        return this.terminated;
    }

    public boolean isStarted() {
        return this.alive.get();
    }

    public Object connectedInput() {
        return this.serviceMessageHandler;
    }

    public Object upstream() {
        return this.processor;
    }

    public Object connectedOutput() {
        return this.serviceMessagePoller;
    }
}
