package reactor.aeron.publisher;

import java.util.Arrays;
import java.util.Iterator;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.AeronUtils;
import reactor.aeron.utils.DemandTracker;
import reactor.aeron.utils.Serializer;
import reactor.aeron.utils.ServiceMessagePublicationFailedException;
import reactor.aeron.utils.ServiceMessageType;
import reactor.aeron.utils.SignalType;
import reactor.core.Exceptions;
import reactor.core.MultiProducer;
import reactor.core.Producer;
import reactor.core.Trackable;
import reactor.core.publisher.Operators;
import reactor.ipc.buffer.Buffer;
import reactor.util.Logger;
import reactor.util.Loggers;
import uk.co.real_logic.aeron.ControlledFragmentAssembler;
import uk.co.real_logic.aeron.logbuffer.ControlledFragmentHandler;
import uk.co.real_logic.aeron.logbuffer.Header;
import uk.co.real_logic.agrona.DirectBuffer;
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/publisher/SignalPoller.class */
public class SignalPoller implements Subscription, Runnable, Producer, Trackable, MultiProducer {
    private static final Logger logger = Loggers.getLogger(SignalPoller.class);
    private final AeronInfra aeronInfra;
    private final Runnable shutdownTask;
    private final Context context;
    private volatile boolean running;
    private final ServiceMessageSender serviceMessageSender;
    private final Serializer<Throwable> exceptionSerializer;
    private uk.co.real_logic.aeron.Subscription signalSub;
    protected final Subscriber<? super Buffer> subscriber;
    private boolean isLastSignalAborted = false;
    private final ControlledFragmentHandler fragmentAssembler = new ControlledFragmentAssembler(new ControlledFragmentHandler() { // from class: reactor.aeron.publisher.SignalPoller.1
        public ControlledFragmentHandler.Action onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
            byte[] bArr = new byte[i2 - 1];
            directBuffer.getBytes(i + 1, bArr);
            byte b = directBuffer.getByte(i);
            Throwable th = null;
            try {
                if (b == SignalType.Next.getCode()) {
                    if (SignalPoller.this.demand <= 0) {
                        SignalPoller.this.isLastSignalAborted = true;
                        return ControlledFragmentHandler.Action.ABORT;
                    }
                    SignalPoller.this.demand--;
                    SignalPoller.this.isLastSignalAborted = false;
                    SignalPoller.this.subscriber.onNext(Buffer.wrap(bArr));
                } else if (b == SignalType.Complete.getCode()) {
                    SignalPoller.this.running = false;
                    SignalPoller.this.subscriber.onComplete();
                } else {
                    th = b == SignalType.Error.getCode() ? (Throwable) SignalPoller.this.exceptionSerializer.deserialize(bArr) : Exceptions.propagate(new IllegalStateException(String.format("Received message with unknown signal type code of %d and length of %d", Byte.valueOf(b), Integer.valueOf(bArr.length))));
                }
            } catch (Throwable th2) {
                Exceptions.throwIfFatal(th2);
                th = th2;
            }
            if (th != null) {
                SignalPoller.this.running = false;
                SignalPoller.this.subscriber.onError(th);
            }
            return ControlledFragmentHandler.Action.COMMIT;
        }
    });
    long demand = 0;
    private final DemandTracker demandTracker = new DemandTracker();

    public SignalPoller(Context context, ServiceMessageSender serviceMessageSender, Subscriber<? super Buffer> subscriber, AeronInfra aeronInfra, Runnable runnable) {
        this.context = context;
        this.serviceMessageSender = serviceMessageSender;
        this.subscriber = subscriber;
        this.aeronInfra = aeronInfra;
        this.shutdownTask = runnable;
        this.exceptionSerializer = context.exceptionSerializer();
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        logger.debug("Signal poller started, sessionId: {}", new Object[]{this.serviceMessageSender.getSessionId()});
        this.signalSub = this.aeronInfra.addSubscription(this.context.receiverChannel(), this.context.streamId());
        setSubscriberSubscription();
        BackoffIdleStrategy newBackoffIdleStrategy = AeronUtils.newBackoffIdleStrategy();
        while (this.running) {
            try {
                if (this.demand == 0) {
                    this.demand = this.demandTracker.getAndReset();
                }
                int min = (int) Math.min(this.demand, this.context.signalPollerFragmentLimit());
                if (min == 0 && !this.isLastSignalAborted) {
                    min = 1;
                }
                newBackoffIdleStrategy.idle(this.signalSub.controlledPoll(this.fragmentAssembler, min));
            } finally {
                this.aeronInfra.close(this.signalSub);
                logger.trace("about to execute shutdownTask");
                this.shutdownTask.run();
            }
        }
        logger.debug("Signal poller shutdown, sessionId: {}", new Object[]{this.serviceMessageSender.getSessionId()});
    }

    private void setSubscriberSubscription() {
        try {
            if (this.running) {
                this.subscriber.onSubscribe(this);
            }
        } catch (Throwable th) {
            Exceptions.throwIfFatal(th);
            this.subscriber.onError(th);
        }
    }

    public void shutdown() {
        this.running = false;
    }

    public Iterator<?> downstreams() {
        return Arrays.asList(this.signalSub != null ? this.signalSub.channel() + "/" + this.signalSub.streamId() : this.context.receiverChannel() + "/" + this.context.streamId(), this.serviceMessageSender).iterator();
    }

    public long downstreamCount() {
        return this.running ? 2L : 0L;
    }

    public boolean isCancelled() {
        return !this.running;
    }

    public boolean isStarted() {
        return this.running;
    }

    public boolean isTerminated() {
        return !this.running;
    }

    public Object downstream() {
        return this.subscriber;
    }

    public long requestedFromDownstream() {
        return this.demandTracker.current();
    }

    public void request(long j) {
        if (this.running && Operators.checkRequest(j, this.subscriber)) {
            try {
                this.serviceMessageSender.sendRequest(j);
                this.demandTracker.request(j);
            } catch (Exception e) {
                this.subscriber.onError(new ServiceMessagePublicationFailedException(ServiceMessageType.Request, e));
            }
        }
    }

    public void cancel() {
        this.running = false;
    }
}
