package reactor.aeron.subscriber;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import reactor.aeron.Context;
import reactor.aeron.utils.AeronInfra;
import reactor.aeron.utils.AeronUtils;
import reactor.aeron.utils.ServiceMessageType;
import reactor.core.Receiver;
import reactor.util.Logger;
import reactor.util.Loggers;
import uk.co.real_logic.aeron.FragmentAssembler;
import uk.co.real_logic.aeron.Subscription;
import uk.co.real_logic.aeron.logbuffer.FragmentHandler;
import uk.co.real_logic.aeron.logbuffer.Header;
import uk.co.real_logic.agrona.DirectBuffer;
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/subscriber/ServiceMessagePoller.class */
public class ServiceMessagePoller implements Runnable, Receiver {
    private static final Logger logger = Loggers.getLogger(ServiceMessagePoller.class);
    private final Subscription serviceRequestSub;
    private final Context context;
    private final ServiceMessageHandler serviceMessageHandler;
    private final ExecutorService executor;
    private final AeronInfra aeronInfra;
    private volatile boolean running;
    private volatile boolean terminated = false;

    /* loaded from: input_file:reactor/aeron/subscriber/ServiceMessagePoller$PollerFragmentHandler.class */
    private class PollerFragmentHandler implements FragmentHandler {
        private byte[] dst;

        private PollerFragmentHandler() {
            this.dst = new byte[255];
        }

        public void onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
            if (ServiceMessagePoller.this.running) {
                byte b = directBuffer.getByte(i);
                if (b == ServiceMessageType.Request.getCode()) {
                    handleRequest(directBuffer, i);
                    return;
                }
                if (b == ServiceMessageType.Cancel.getCode()) {
                    handleCancel(directBuffer, i);
                } else if (b == ServiceMessageType.Heartbeat.getCode()) {
                    handleHeartbeat(directBuffer, i, header);
                } else {
                    ServiceMessagePoller.logger.error("Unknown type code: {} received", new Object[]{Byte.valueOf(b)});
                }
            }
        }

        private void handleHeartbeat(DirectBuffer directBuffer, int i, Header header) {
            ServiceMessagePoller.this.serviceMessageHandler.handleHeartbeat(readSessionId(directBuffer, i + 1));
        }

        private void handleCancel(DirectBuffer directBuffer, int i) {
            String readSessionId = readSessionId(directBuffer, i + 1);
            if (ServiceMessagePoller.logger.isTraceEnabled()) {
                ServiceMessagePoller.logger.trace("Cancel request received for sessionId: {}", new Object[]{readSessionId});
            }
            ServiceMessagePoller.this.serviceMessageHandler.handleCancel(readSessionId);
        }

        private void handleRequest(DirectBuffer directBuffer, int i) {
            long j = directBuffer.getLong(i + 1);
            String readSessionId = readSessionId(directBuffer, i + 1 + 8);
            if (ServiceMessagePoller.logger.isTraceEnabled()) {
                ServiceMessagePoller.logger.trace("Requested {} signals for sessionId: {}", new Object[]{Long.valueOf(j), readSessionId});
            }
            ServiceMessagePoller.this.serviceMessageHandler.handleMore(readSessionId, j);
        }

        private String readSessionId(DirectBuffer directBuffer, int i) {
            int i2 = directBuffer.getByte(i) & 255;
            directBuffer.getBytes(i + 1, this.dst, 0, i2);
            return new String(this.dst, 0, i2, AeronUtils.UTF_8_CHARSET);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceMessagePoller(Context context, AeronInfra aeronInfra, ServiceMessageHandler serviceMessageHandler) {
        this.context = context;
        this.serviceMessageHandler = serviceMessageHandler;
        this.aeronInfra = aeronInfra;
        this.serviceRequestSub = aeronInfra.addSubscription(context.senderChannel(), context.serviceRequestStreamId());
        this.executor = Executors.newCachedThreadPool(runnable -> {
            return new Thread(runnable, AeronUtils.makeThreadName(context.name(), "subscriber", "service-poller"));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.executor.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        logger.debug("Service message poller started");
        BackoffIdleStrategy newBackoffIdleStrategy = AeronUtils.newBackoffIdleStrategy();
        FragmentAssembler fragmentAssembler = new FragmentAssembler(new PollerFragmentHandler());
        int serviceMessagePollerFragmentLimit = this.context.serviceMessagePollerFragmentLimit();
        while (this.running) {
            int i = 0;
            try {
                i = this.serviceRequestSub.poll(fragmentAssembler, serviceMessagePollerFragmentLimit);
            } catch (Exception e) {
                this.context.errorConsumer().accept(e);
            }
            newBackoffIdleStrategy.idle(i);
        }
        this.aeronInfra.close(this.serviceRequestSub);
        logger.debug("Service message poller shutdown");
        this.terminated = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.running = false;
        this.executor.shutdown();
    }

    public Object upstream() {
        return this.serviceRequestSub.channel() + "/" + this.serviceRequestSub.streamId();
    }

    public boolean isTerminated() {
        return this.terminated;
    }
}
