package reactor.aeron.subscriber;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.AeronUtils;
import reactor.aeron.utils.SignalType;
import reactor.core.MultiProducer;
import reactor.core.publisher.FluxProcessor;
import reactor.ipc.buffer.Buffer;
import reactor.util.Logger;
import reactor.util.Loggers;
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;

/* loaded from: input_file:reactor/aeron/subscriber/UnicastServiceMessageHandler.class */
class UnicastServiceMessageHandler implements ServiceMessageHandler, MultiProducer {
    private static final Logger logger = Loggers.getLogger(UnicastServiceMessageHandler.class);
    private static final long SUBSCRIPTION_TIMEOUT_NS = TimeUnit.MILLISECONDS.toNanos(100);
    private final Processor<Buffer, Buffer> processor;
    private final AeronInfra aeronInfra;
    private final Context context;
    private final Runnable onTerminalEventTask;
    private final SessionTracker<UnicastSession> sessionTracker = new BasicSessionTracker();
    private final HeartbeatWatchdog heartbeatWatchdog;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/subscriber/UnicastServiceMessageHandler$InnerSubscriber.class */
    public class InnerSubscriber implements Subscriber<Buffer> {
        private final SignalSender signalSender;
        private final UnicastSession session;

        InnerSubscriber(UnicastSession unicastSession) {
            this.session = unicastSession;
            this.signalSender = UnicastServiceMessageHandler.this.createSignalSender();
        }

        public void onSubscribe(Subscription subscription) {
            this.session.subscription = subscription;
            long andResetDemad = this.session.getAndResetDemad();
            if (andResetDemad > 0) {
                this.session.subscription.request(andResetDemad);
            }
        }

        public void onNext(Buffer buffer) {
            this.signalSender.publishSignal(this.session.getSessionId(), this.session.getPublication(), buffer, SignalType.Next, true);
        }

        public void onError(Throwable th) {
            this.session.setTerminal();
            this.signalSender.publishSignal(this.session.getSessionId(), this.session.getPublication(), Buffer.wrap(UnicastServiceMessageHandler.this.context.exceptionSerializer().serialize(th)), SignalType.Error, true);
        }

        public void onComplete() {
            this.session.setTerminal();
            this.signalSender.publishSignal(this.session.getSessionId(), this.session.getPublication(), new Buffer(0, true), SignalType.Complete, true);
        }
    }

    public UnicastServiceMessageHandler(FluxProcessor<Buffer, Buffer> fluxProcessor, AeronInfra aeronInfra, Context context, Runnable runnable) {
        this.processor = fluxProcessor;
        this.aeronInfra = aeronInfra;
        this.context = context;
        this.onTerminalEventTask = runnable;
        this.heartbeatWatchdog = new HeartbeatWatchdog(context, this, this.sessionTracker);
    }

    protected SignalSender createSignalSender() {
        return new BasicSignalSender(this.aeronInfra, this.context.errorConsumer());
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleMore(String str, long j) {
        UnicastSession orCreateSession = getOrCreateSession(str);
        orCreateSession.requestMore(j);
        if (orCreateSession.subscription != null) {
            long andResetDemad = orCreateSession.getAndResetDemad();
            if (andResetDemad > 0) {
                orCreateSession.subscription.request(andResetDemad);
            }
        }
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleHeartbeat(String str) {
        getOrCreateSession(str).setLastHeartbeatTimeNs(System.nanoTime());
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void handleCancel(String str) {
        UnicastSession remove = this.sessionTracker.remove(str);
        if (remove == null) {
            logger.debug("Could not find a session to close with Id: {}", new Object[]{str});
            return;
        }
        boolean isTerminal = remove.isTerminal();
        cancel(remove);
        if ((isTerminal || this.context.autoCancel()) && this.sessionTracker.getSessionCounter() == 0) {
            this.onTerminalEventTask.run();
        }
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void start() {
        this.heartbeatWatchdog.start();
    }

    @Override // reactor.aeron.subscriber.ServiceMessageHandler
    public void shutdown() {
        this.heartbeatWatchdog.shutdown();
        sendCompleteIntoNonTerminalSessions();
        closeAllSessions();
    }

    private void sendCompleteIntoNonTerminalSessions() {
        BasicSignalSender basicSignalSender = new BasicSignalSender(this.aeronInfra, this.context.errorConsumer());
        Buffer buffer = new Buffer(0, true);
        for (UnicastSession unicastSession : this.sessionTracker.getSessions()) {
            if (!unicastSession.isTerminal()) {
                unicastSession.setTerminal();
                basicSignalSender.publishSignal(unicastSession.getSessionId(), unicastSession.getPublication(), buffer, SignalType.Complete, true);
            }
        }
    }

    private void closeAllSessions() {
        Iterator<UnicastSession> it = this.sessionTracker.getSessions().iterator();
        while (it.hasNext()) {
            cancel(it.next());
        }
    }

    private UnicastSession createSession(String str) {
        Matcher matcher = AeronUtils.UNICAST_SESSION_ID_PATTERN.matcher(str);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Malformed unicast sessionId: " + str);
        }
        String group = matcher.group(1);
        boolean z = false;
        try {
            z = AeronUtils.isUnicastChannel(group);
        } catch (Exception e) {
        }
        if (!z) {
            throw new IllegalArgumentException("Invalid unicast receiver channel: " + group);
        }
        return new UnicastSession(str, this.aeronInfra.addPublication(group, Integer.parseInt(matcher.group(2))));
    }

    private void cancel(UnicastSession unicastSession) {
        if (waitTillSubscriptionIsAssigned(unicastSession)) {
            unicastSession.subscription.cancel();
        } else {
            this.context.errorConsumer().accept(new RuntimeException(String.format("No subscription for inner subscriber for sessionId: %s was assigned during %d millis", unicastSession.getSessionId(), Long.valueOf(SUBSCRIPTION_TIMEOUT_NS))));
        }
        this.aeronInfra.close(unicastSession.getPublication());
        logger.debug("Closed session with sessionId: {}", new Object[]{unicastSession.getSessionId()});
    }

    public boolean waitTillSubscriptionIsAssigned(UnicastSession unicastSession) {
        long nanoTime = System.nanoTime();
        if (unicastSession.subscription != null) {
            return true;
        }
        BackoffIdleStrategy newBackoffIdleStrategy = AeronUtils.newBackoffIdleStrategy();
        while (unicastSession.subscription == null) {
            newBackoffIdleStrategy.idle(0);
            if (System.nanoTime() - nanoTime > SUBSCRIPTION_TIMEOUT_NS) {
                return false;
            }
        }
        return true;
    }

    public UnicastSession getOrCreateSession(String str) {
        UnicastSession unicastSession = this.sessionTracker.get(str);
        if (unicastSession == null) {
            unicastSession = createSession(str);
            this.sessionTracker.put(str, unicastSession);
            this.processor.subscribe(new InnerSubscriber(unicastSession));
            logger.debug("New session established with sessionId: {}", new Object[]{str});
        }
        return unicastSession;
    }

    public Iterator<?> downstreams() {
        return this.sessionTracker.getSessions().iterator();
    }

    public long downstreamCount() {
        return this.sessionTracker.getSessionCounter();
    }
}
