/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.tieredstorage.fetch.cache;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.config.ChunkCacheConfig;
import io.aiven.kafka.tieredstorage.fetch.ChunkKey;
import io.aiven.kafka.tieredstorage.fetch.ChunkManager;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.Configurable;

public abstract class ChunkCache<T>
implements ChunkManager,
Configurable {
    public static final String METRIC_GROUP = "chunk-cache-metrics";
    public static final String THREAD_POOL_METRIC_GROUP = "chunk-cache-thread-pool-metrics";
    private final ChunkManager chunkManager;
    private ExecutorService executor;
    final CaffeineStatsCounter statsCounter;
    protected AsyncCache<ChunkKey, T> cache;
    private int prefetchingSize;
    private Duration getTimeout;

    protected ChunkCache(ChunkManager chunkManager) {
        this.chunkManager = chunkManager;
        this.statsCounter = new CaffeineStatsCounter(METRIC_GROUP);
    }

    @Override
    public InputStream getChunk(ObjectKey objectKey, SegmentManifest manifest, int chunkId) throws StorageBackendException, IOException {
        Chunk currentChunk = manifest.chunkIndex().chunks().get(chunkId);
        this.startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);
        ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
        AtomicReference result = new AtomicReference();
        try {
            return (InputStream)((CompletableFuture)this.cache.asMap().compute(chunkKey, (key, val) -> CompletableFuture.supplyAsync(() -> {
                if (val == null) {
                    this.statsCounter.recordMiss();
                    try {
                        InputStream chunk = this.chunkManager.getChunk(objectKey, manifest, chunkId);
                        T t = this.cacheChunk(chunkKey, chunk);
                        result.getAndSet(this.cachedChunkToInputStream(t));
                        return t;
                    }
                    catch (StorageBackendException | IOException e) {
                        throw new CompletionException(e);
                    }
                }
                this.statsCounter.recordHit();
                try {
                    Object cachedChunk = val.get();
                    result.getAndSet(this.cachedChunkToInputStream(cachedChunk));
                    return cachedChunk;
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new CompletionException(e);
                }
            }, this.executor)).thenApplyAsync(t -> (InputStream)result.get())).get(this.getTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause == null) {
                throw new RuntimeException(e);
            }
            if (e.getCause() instanceof StorageBackendException) {
                throw (StorageBackendException)e.getCause();
            }
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new RuntimeException(e);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public abstract InputStream cachedChunkToInputStream(T var1);

    public abstract T cacheChunk(ChunkKey var1, InputStream var2) throws IOException;

    public abstract RemovalListener<ChunkKey, T> removalListener();

    public abstract Weigher<ChunkKey, T> weigher();

    protected AsyncCache<ChunkKey, T> buildCache(ChunkCacheConfig config) {
        this.executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
        new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, this.executor);
        this.getTimeout = config.getTimeout();
        this.prefetchingSize = config.cachePrefetchingSize();
        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
        config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight((long)maximumWeight).weigher(this.weigher()));
        config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
        AsyncCache cache = cacheBuilder.evictionListener(this.removalListener()).scheduler(Scheduler.systemScheduler()).executor(this.executor).recordStats(() -> this.statsCounter).buildAsync();
        this.statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
        return cache;
    }

    private void startPrefetching(ObjectKey segmentKey, SegmentManifest segmentManifest, int startPosition) {
        if (this.prefetchingSize > 0) {
            BytesRange prefetchingRange = Integer.MAX_VALUE - startPosition < this.prefetchingSize ? BytesRange.of(startPosition, Integer.MAX_VALUE) : BytesRange.ofFromPositionAndSize(startPosition, this.prefetchingSize);
            List<Chunk> chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange);
            chunks.forEach(chunk -> {
                ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
                this.cache.asMap().computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> {
                    try {
                        InputStream chunkStream = this.chunkManager.getChunk(segmentKey, segmentManifest, chunk.id);
                        return this.cacheChunk(chunkKey, chunkStream);
                    }
                    catch (StorageBackendException | IOException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor));
            });
        }
    }
}

