package reactor.aeron.subscriber;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.Serializer;
import reactor.aeron.utils.SignalType;
import reactor.core.publisher.FluxProcessor;
import reactor.ipc.buffer.Buffer;
import reactor.util.Logger;
import reactor.util.Loggers;
import uk.co.real_logic.aeron.Publication;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/subscriber/MulticastServiceMessageHandler.class */
public class MulticastServiceMessageHandler implements ServiceMessageHandler {
    private static final Logger logger = Loggers.getLogger(MulticastServiceMessageHandler.class);
    private final Context context;
    private final Runnable shutdownTask;
    private final HeartbeatWatchdog heartbeatWatchdog;
    private final InnerSubscriber subscriber;
    private volatile long cursor = -1;
    private final AtomicLong requested = new AtomicLong(-1);
    private volatile long minSequence = -1;
    private final SessionTracker<MulticastSession> sessionTracker = new BasicSessionTracker();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/aeron/subscriber/MulticastServiceMessageHandler$InnerSubscriber.class */
    public class InnerSubscriber implements Subscriber<Buffer> {
        private static final String SESSION_ID = "<multicast>";
        private final Publication signalPub;
        private final Serializer<Throwable> exceptionSerializer;
        private final SignalSender signalSender;
        private volatile boolean terminal = false;
        private volatile Subscription subscription;

        InnerSubscriber(Context context, AeronInfra aeronInfra) {
            this.exceptionSerializer = context.exceptionSerializer();
            this.signalPub = aeronInfra.addPublication(context.receiverChannel(), context.streamId());
            this.signalSender = new BasicSignalSender(aeronInfra, context.errorConsumer());
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            MulticastServiceMessageHandler.this.requestFromUpstream();
        }

        public void onNext(Buffer buffer) {
            this.signalSender.publishSignal(SESSION_ID, this.signalPub, buffer, SignalType.Next, true);
            MulticastServiceMessageHandler.this.incrementCursor();
        }

        public void onError(Throwable th) {
            this.signalSender.publishSignal(SESSION_ID, this.signalPub, Buffer.wrap(this.exceptionSerializer.serialize(th)), SignalType.Error, true);
            this.terminal = true;
        }

        public void onComplete() {
            this.signalSender.publishSignal(SESSION_ID, this.signalPub, new Buffer(0, true), SignalType.Complete, true);
            this.terminal = true;
        }

        public boolean isTerminal() {
            return this.terminal;
        }
    }

    public MulticastServiceMessageHandler(FluxProcessor<Buffer, Buffer> fluxProcessor, AeronInfra aeronInfra, Context context, Runnable runnable) {
        this.context = context;
        this.shutdownTask = runnable;
        this.heartbeatWatchdog = new HeartbeatWatchdog(context, this, this.sessionTracker);
        this.subscriber = new InnerSubscriber(context, aeronInfra);
        fluxProcessor.subscribe(this.subscriber);
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleMore(String str, long j) {
        getOrCreateSession(str).requestMore(j);
        this.minSequence = getMinSequence();
        if (this.subscriber.subscription == null) {
            return;
        }
        requestFromUpstream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestFromUpstream() {
        long j = this.requested.get();
        if (this.cursor != j || j >= this.minSequence) {
            return;
        }
        long multicastUpstreamRequest = this.context.multicastUpstreamRequest();
        if (this.requested.compareAndSet(j, j + multicastUpstreamRequest)) {
            this.subscriber.subscription.request(multicastUpstreamRequest);
        }
    }

    private long getMinSequence() {
        if (this.sessionTracker.getSessionCounter() == 0) {
            return this.cursor;
        }
        long j = Long.MAX_VALUE;
        Iterator<MulticastSession> it = this.sessionTracker.getSessions().iterator();
        while (it.hasNext()) {
            j = Math.min(it.next().getDemand(), j);
        }
        return j;
    }

    public void incrementCursor() {
        this.cursor++;
        requestFromUpstream();
    }

    public MulticastSession getOrCreateSession(String str) {
        MulticastSession multicastSession = this.sessionTracker.get(str);
        if (multicastSession == null) {
            multicastSession = new MulticastSession(str, this.cursor);
            this.sessionTracker.put(str, multicastSession);
            logger.debug("New session established with Id: {}", new Object[]{str});
        }
        return multicastSession;
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleHeartbeat(String str) {
        getOrCreateSession(str).setLastHeartbeatTimeNs(System.nanoTime());
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleCancel(String str) {
        MulticastSession remove = this.sessionTracker.remove(str);
        if (remove == null) {
            logger.debug("Could not find a session to close with Id: {}", new Object[]{str});
            return;
        }
        this.minSequence = getMinSequence();
        if (this.sessionTracker.getSessionCounter() == 0) {
            if (this.context.autoCancel()) {
                this.subscriber.subscription.cancel();
                logger.debug("Closed session with Id: {}", new Object[]{remove.getSessionId()});
            }
            if (this.context.autoCancel() || this.subscriber.isTerminal()) {
                this.shutdownTask.run();
            }
        }
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void start() {
        this.heartbeatWatchdog.start();
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void shutdown() {
        this.heartbeatWatchdog.shutdown();
    }

    protected long getCursor() {
        return this.cursor;
    }
}
