/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.tieredstorage.transform;

import io.github.bucket4j.Bucket;
import io.github.bucket4j.local.SynchronizationStrategy;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;

public class RateLimitedInputStream
extends FilterInputStream {
    static final int MIN_RATE = Runtime.version().feature() >= 21 ? 16384 : 8192;
    final Bucket bucket;

    public RateLimitedInputStream(InputStream delegated, Bucket bucket) {
        super(delegated);
        this.bucket = bucket;
    }

    public static Bucket rateLimitBucket(int uploadRate) {
        int rate = Math.max(uploadRate, MIN_RATE);
        return Bucket.builder().withSynchronizationStrategy(SynchronizationStrategy.LOCK_FREE).addLimit(limit -> limit.capacity(rate).refillGreedy(rate, Duration.ofSeconds(1L))).build();
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        int read;
        if (len > 0) {
            try {
                this.bucket.asBlocking().consume(len);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Rate limited consumption of input stream interrupted", e);
            }
        }
        if ((read = super.read(b, off, len)) > -1) {
            if (len > read) {
                this.bucket.forceAddTokens(len - read);
            }
        } else if (len > 0) {
            this.bucket.forceAddTokens(len);
        }
        return read;
    }
}

