/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.metrics.prometheus;

import com.sun.net.httpserver.HttpHandler;
import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.archive.Archive;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.EgressListener;
import io.aeron.cluster.codecs.AdminRequestType;
import io.aeron.cluster.codecs.AdminResponseCode;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.Header;
import io.scalecube.metrics.CountersHandler;
import io.scalecube.metrics.CountersReaderAgent;
import io.scalecube.metrics.CountersRegistry;
import io.scalecube.metrics.HistogramMetric;
import io.scalecube.metrics.MetricsHandler;
import io.scalecube.metrics.MetricsReaderAgent;
import io.scalecube.metrics.MetricsRecorder;
import io.scalecube.metrics.MetricsTransmitter;
import io.scalecube.metrics.TpsMetric;
import io.scalecube.metrics.aeron.CncCountersReaderAgent;
import io.scalecube.metrics.prometheus.CountersPrometheusAdapter;
import io.scalecube.metrics.prometheus.MetricsPrometheusAdapter;
import io.scalecube.metrics.prometheus.PrometheusMetricsHandler;
import io.scalecube.metrics.prometheus.PrometheusMetricsServer;
import io.scalecube.metrics.prometheus.PrometheusWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.CompositeAgent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.MountableFile;

public class PrometheusAllInOneAeron {
    public static void main(String[] args) throws IOException, InterruptedException {
        Network network = Network.newNetwork();
        GenericContainer prometheus = new GenericContainer("prom/prometheus").withNetwork(network).withNetworkAliases(new String[]{"prometheus"}).withExposedPorts(new Integer[]{9090}).withCopyFileToContainer(MountableFile.forClasspathResource((String)"prometheus.yml"), "/etc/prometheus/prometheus.yml").withCommand(new String[]{"--config.file=/etc/prometheus/prometheus.yml", "--log.level=debug"});
        prometheus.start();
        GenericContainer loki = new GenericContainer("grafana/loki").withNetwork(network).withNetworkAliases(new String[]{"loki"}).withExposedPorts(new Integer[]{3100}).withCommand("-config.file=/etc/loki/local-config.yaml");
        loki.start();
        GenericContainer grafana = new GenericContainer("grafana/grafana").withNetwork(network).withExposedPorts(new Integer[]{3000}).withEnv("GF_SECURITY_ADMIN_USER", "user").withEnv("GF_SECURITY_ADMIN_PASSWORD", "password").withCopyFileToContainer(MountableFile.forClasspathResource((String)"prometheus.datasource.yml"), "/etc/grafana/provisioning/datasources/datasource.yml").withCopyFileToContainer(MountableFile.forClasspathResource((String)"loki.datasource.yml"), "/etc/grafana/provisioning/datasources/loki.yml");
        grafana.start();
        String grafanaUrl = "http://" + grafana.getHost() + ":" + grafana.getMappedPort(3000);
        System.out.println("Prometheus: " + prometheus.getMappedPort(9090));
        System.out.println("Loki: " + loki.getMappedPort(3100));
        System.out.println("Grafana: " + grafanaUrl);
        MetricsRecorder metricsRecorder = MetricsRecorder.launch();
        MetricsTransmitter metricsTransmitter = MetricsTransmitter.launch();
        long highestTrackableValue = 1000000000L;
        double conversionFactor = 0.001;
        int resolutionMs = 1000;
        TpsMetric tps = metricsRecorder.newTps(keyFlyweight -> keyFlyweight.tagsCount(1).stringValue("name", "tps"));
        HistogramMetric pingLatency = metricsRecorder.newHistogram(keyFlyweight -> keyFlyweight.tagsCount(1).stringValue("name", "ping_latency"), 1000000000L, 0.001, 1000L);
        HistogramMetric pongLatency = metricsRecorder.newHistogram(keyFlyweight -> keyFlyweight.tagsCount(1).stringValue("name", "pong_latency"), 1000000000L, 0.001, 1000L);
        HistogramMetric rttLatency = metricsRecorder.newHistogram(keyFlyweight -> keyFlyweight.tagsCount(1).stringValue("name", "rtt_latency"), 1000000000L, 0.001, 1000L);
        CountersRegistry countersRegistry = CountersRegistry.create();
        CountersManager countersManager = countersRegistry.countersManager();
        AtomicCounter sessionCounter = countersManager.newCounter("session_count");
        AtomicCounter servicePosition = countersManager.newCounter("service_position");
        MediaDriver mediaDriver = MediaDriver.launch((MediaDriver.Context)new MediaDriver.Context().dirDeleteOnStart(true).dirDeleteOnShutdown(true));
        Aeron aeron = Aeron.connect();
        Archive archive = Archive.launch((Archive.Context)new Archive.Context().recordingEventsEnabled(false).controlChannel("aeron:udp?endpoint=localhost:8010").replicationChannel("aeron:udp?endpoint=localhost:0"));
        ConsensusModule consensusModule = ConsensusModule.launch((ConsensusModule.Context)new ConsensusModule.Context().ingressChannel("aeron:udp").replicationChannel("aeron:udp?endpoint=localhost:0").clusterMemberId(0).clusterMembers("0,localhost:8005,localhost:8006,localhost:8007,localhost:8008,localhost:8010"));
        ClusteredServiceContainer serviceContainer = ClusteredServiceContainer.launch((ClusteredServiceContainer.Context)new ClusteredServiceContainer.Context().clusteredService((ClusteredService)new ClusteredServiceImpl(tps, pingLatency, sessionCounter, servicePosition)));
        final AeronCluster aeronCluster = AeronCluster.connect((AeronCluster.Context)new AeronCluster.Context().ingressChannel("aeron:udp").ingressEndpoints("0=localhost:8005").isIngressExclusive(true).egressChannel("aeron:udp?endpoint=localhost:0").egressListener((EgressListener)new EgressListenerImpl(pongLatency, rttLatency)));
        System.out.println("Started mediaDriver: " + String.valueOf(mediaDriver));
        System.out.println("Started aeron: " + String.valueOf(aeron));
        System.out.println("Started archive: " + String.valueOf(archive));
        System.out.println("Started consensusModule: " + String.valueOf(consensusModule));
        System.out.println("Started serviceContainer: " + String.valueOf(serviceContainer));
        System.out.println("Connected aeronCluster: " + String.valueOf(aeronCluster));
        Map<String, String> labels = Map.of("app", "hft_app");
        CountersPrometheusAdapter countersAdapter = new CountersPrometheusAdapter(labels);
        CountersPrometheusAdapter cncCountersAdapter = new CountersPrometheusAdapter(labels);
        MetricsPrometheusAdapter metricsAdapter = new MetricsPrometheusAdapter(labels);
        CompositeAgent compositeAgent = new CompositeAgent(new Agent[]{new CountersReaderAgent("CountersReaderAgent", countersRegistry.context().countersDir(), true, (EpochClock)SystemEpochClock.INSTANCE, Duration.ofSeconds(1L), (CountersHandler)countersAdapter), new CncCountersReaderAgent("CncCountersReaderAgent", mediaDriver.aeronDirectoryName(), true, (EpochClock)SystemEpochClock.INSTANCE, Duration.ofSeconds(3L), Duration.ofSeconds(5L), (CountersHandler)cncCountersAdapter), new MetricsReaderAgent("MetricsReaderAgent", metricsTransmitter.context().broadcastBuffer(), (EpochClock)SystemEpochClock.INSTANCE, Duration.ofSeconds(3L), (MetricsHandler)metricsAdapter)});
        AgentRunner.startOnThread((AgentRunner)new AgentRunner((IdleStrategy)new BackoffIdleStrategy(), Throwable::printStackTrace, null, (Agent)compositeAgent));
        PrometheusMetricsServer.launch((InetSocketAddress)new InetSocketAddress(8080), (HttpHandler)new PrometheusMetricsHandler(new PrometheusWriter[]{countersAdapter, cncCountersAdapter, metricsAdapter}));
        AgentRunner.startOnThread((AgentRunner)new AgentRunner((IdleStrategy)new BackoffIdleStrategy(), Throwable::printStackTrace, null, new Agent(){

            public int doWork() {
                return aeronCluster.pollEgress();
            }

            public String roleName() {
                return "";
            }
        }));
        BufferClaim bufferClaim = new BufferClaim();
        while (true) {
            long claim;
            if ((claim = aeronCluster.tryClaim(8, bufferClaim)) == -4L || claim == -5L || Thread.currentThread().isInterrupted()) {
                throw new RuntimeException("Good bye");
            }
            if (claim > 0L) {
                MutableDirectBuffer buffer = bufferClaim.buffer();
                int offset = bufferClaim.offset();
                int i = offset + 32;
                long now = System.nanoTime();
                buffer.putLong(i, now);
                bufferClaim.commit();
            }
            Thread.sleep(1L);
        }
    }

    private static class ClusteredServiceImpl
    implements ClusteredService {
        private static final int LENGTH = 16;
        private final TpsMetric tps;
        private final HistogramMetric pingLatency;
        private final AtomicCounter sessionCounter;
        private final AtomicCounter servicePosition;
        private final BufferClaim bufferClaim = new BufferClaim();

        private ClusteredServiceImpl(TpsMetric tps, HistogramMetric pingLatency, AtomicCounter sessionCounter, AtomicCounter servicePosition) {
            this.tps = tps;
            this.pingLatency = pingLatency;
            this.sessionCounter = sessionCounter;
            this.servicePosition = servicePosition;
        }

        public void onStart(Cluster cluster, Image snapshotImage) {
        }

        public void onSessionOpen(ClientSession session, long timestamp) {
            System.out.println("onSessionOpen: " + String.valueOf(session));
            this.sessionCounter.increment();
        }

        public void onSessionClose(ClientSession session, long timestamp, CloseReason closeReason) {
            System.out.println("onSessionClose: " + String.valueOf(session) + ", closeReason=" + String.valueOf(closeReason));
            this.sessionCounter.decrement();
        }

        public void onSessionMessage(ClientSession session, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
            this.tps.record();
            this.servicePosition.set(header.position());
            long ping = buffer.getLong(offset);
            long pong = System.nanoTime();
            long delta = pong - ping;
            this.pingLatency.record(delta);
            if (session.tryClaim(16, this.bufferClaim) > 0L) {
                MutableDirectBuffer buf = this.bufferClaim.buffer();
                int index = this.bufferClaim.offset() + 32;
                buf.putLong(index, ping);
                buf.putLong(index + 8, pong);
                this.bufferClaim.commit();
            }
        }

        public void onTimerEvent(long correlationId, long timestamp) {
        }

        public void onTakeSnapshot(ExclusivePublication snapshotPublication) {
        }

        public void onRoleChange(Cluster.Role newRole) {
        }

        public void onTerminate(Cluster cluster) {
        }
    }

    private static class EgressListenerImpl
    implements EgressListener {
        private final HistogramMetric pongLatency;
        private final HistogramMetric rttLatency;

        private EgressListenerImpl(HistogramMetric pongLatency, HistogramMetric rttLatency) {
            this.pongLatency = pongLatency;
            this.rttLatency = rttLatency;
        }

        public void onMessage(long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
            long now = System.nanoTime();
            long ping = buffer.getLong(offset);
            long pong = buffer.getLong(offset + 8);
            this.pongLatency.record(now - pong);
            this.rttLatency.record(now - ping);
        }

        public void onSessionEvent(long correlationId, long clusterSessionId, long leadershipTermId, int leaderMemberId, EventCode code, String detail) {
            System.out.println("onSessionEvent: clusterSessionId=" + clusterSessionId + ", leadershipTermId=" + leadershipTermId + ", leaderMemberId=" + leaderMemberId + ", code=" + String.valueOf(code) + ", detail=" + detail);
        }

        public void onNewLeader(long clusterSessionId, long leadershipTermId, int leaderMemberId, String ingressEndpoints) {
            System.out.println("onNewLeader: leadershipTermId=" + leadershipTermId + ", leaderMemberId=" + leaderMemberId);
        }

        public void onAdminResponse(long clusterSessionId, long correlationId, AdminRequestType requestType, AdminResponseCode responseCode, String message, DirectBuffer payload, int payloadOffset, int payloadLength) {
            System.out.println("onAdminResponse: requestType=" + String.valueOf(requestType) + ", responseCode=" + String.valueOf(responseCode));
        }
    }
}

