package nstream.adapter.common.ingress;

import java.util.concurrent.atomic.AtomicLong;
import nstream.adapter.common.AdapterSettings;
import nstream.adapter.common.schedule.DeferrableException;
import swim.api.SwimLane;
import swim.api.lane.DemandLane;
import swim.concurrent.TimerRef;
import swim.structure.Record;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/common/ingress/IngestorMetricsAgent.class */
public abstract class IngestorMetricsAgent<S extends AdapterSettings, V> extends IngestorAgent<S, V> {
    private static final long MESSAGE_RATE_INTERVAL = 1000;
    private TimerRef messageRateTimer;
    private volatile long startTime;
    private volatile long bucketStartTime;
    private volatile double currentMessageRate;
    private volatile double currentErrorRate;
    private final AtomicLong messageCount = new AtomicLong();
    private final AtomicLong errorCount = new AtomicLong();
    private final AtomicLong recentMessageCount = new AtomicLong();
    private final AtomicLong recentErrorCount = new AtomicLong();

    @SwimLane("pulse")
    DemandLane<Value> pulse = demandLane().onCue(warpUplink -> {
        return pulse();
    });

    protected IngestorMetricsAgent() {
    }

    private Value pulse() {
        if (System.currentTimeMillis() <= this.startTime || this.startTime == 0) {
            return Value.absent();
        }
        return Record.create(6).slot("startTimestamp", this.startTime).slot("messageCount", this.messageCount.get()).slot("errorCount", this.errorCount.get()).slot("messageRate", (this.messageCount.get() * MESSAGE_RATE_INTERVAL) / (r0 - this.startTime)).slot("currentMessageRate", this.currentMessageRate).slot("currentErrorRate", this.currentErrorRate);
    }

    private void updateRecentBucket() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = currentTimeMillis - this.bucketStartTime;
        this.currentMessageRate = (this.recentMessageCount.getAndSet(0L) * MESSAGE_RATE_INTERVAL) / j;
        this.currentErrorRate = (this.recentErrorCount.getAndSet(0L) * MESSAGE_RATE_INTERVAL) / j;
        this.bucketStartTime = currentTimeMillis;
    }

    private void incrementMessageCount() {
        this.messageCount.incrementAndGet();
        this.recentMessageCount.incrementAndGet();
    }

    private void incrementErrorCount() {
        this.errorCount.incrementAndGet();
        this.recentErrorCount.incrementAndGet();
    }

    protected void didFailIngest(V v, Exception exc) {
        didFail(exc);
    }

    protected final void ingestOrCancel(V v) {
        try {
            incrementMessageCount();
            ingest(v);
        } catch (DeferrableException e) {
            incrementErrorCount();
            handleDeferrableException(e);
        } catch (Exception e2) {
            didFailIngest(v, e2);
            cancel();
        }
    }

    @Override // nstream.adapter.common.ingress.IngestorAgent
    protected void didStageReception() {
        this.startTime = System.currentTimeMillis();
        this.bucketStartTime = this.startTime;
        this.messageRateTimer = setTimer(MESSAGE_RATE_INTERVAL, () -> {
            updateRecentBucket();
            this.pulse.cue();
            this.messageRateTimer.reschedule(MESSAGE_RATE_INTERVAL);
        });
    }
}
