/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.tieredstorage.fetch.index;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexKey;
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemorySegmentIndexesCache
implements SegmentIndexesCache {
    private static final Logger log = LoggerFactory.getLogger(MemorySegmentIndexesCache.class);
    public static final String METRIC_GROUP = "segment-indexes-cache-metrics";
    public static final String THREAD_POOL_METRIC_GROUP = "segment-indexes-cache-thread-pool-metrics";
    private static final long DEFAULT_MAX_SIZE_BYTES = 0xA00000L;
    private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter("segment-indexes-cache-metrics");
    private ExecutorService executor;
    protected AsyncCache<SegmentIndexKey, byte[]> cache;
    private Duration getTimeout;

    RemovalListener<SegmentIndexKey, byte[]> removalListener() {
        return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache. The reason of the deletion is {}", key, (Object)cause);
    }

    private static Weigher<SegmentIndexKey, byte[]> weigher() {
        return (key, value) -> ((byte[])value).length;
    }

    protected AsyncCache<SegmentIndexKey, byte[]> buildCache(CacheConfig config) {
        this.executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
        new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, this.executor);
        this.getTimeout = config.getTimeout();
        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
        config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight((long)maximumWeight).weigher(MemorySegmentIndexesCache.weigher()));
        config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
        AsyncCache<SegmentIndexKey, byte[]> cache = cacheBuilder.evictionListener(this.removalListener()).scheduler(Scheduler.systemScheduler()).executor(this.executor).recordStats(() -> this.statsCounter).buildAsync();
        this.statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
        return cache;
    }

    @Override
    public InputStream get(ObjectKey objectKey, RemoteStorageManager.IndexType indexType, Supplier<byte[]> indexSupplier) throws StorageBackendException, IOException {
        try {
            return (InputStream)((CompletableFuture)this.cache.asMap().compute(new SegmentIndexKey(objectKey, indexType), (key, val) -> {
                if (val == null) {
                    this.statsCounter.recordMiss();
                    return CompletableFuture.supplyAsync(indexSupplier, this.executor);
                }
                this.statsCounter.recordHit();
                return val;
            }).thenApplyAsync(ByteArrayInputStream::new, (Executor)this.executor)).get(this.getTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RuntimeException) {
                cause = cause.getCause();
            }
            if (cause == null) {
                throw new RuntimeException(e);
            }
            if (cause instanceof StorageBackendException) {
                throw (StorageBackendException)cause;
            }
            if (cause instanceof IOException) {
                throw (IOException)cause;
            }
            throw new RuntimeException(e);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs) {
        CacheConfig config = new CacheConfig(MemorySegmentIndexesCache.configDef(), configs);
        this.cache = this.buildCache(config);
    }

    public static ConfigDef configDef() {
        return CacheConfig.defBuilder().withDefaultSize(0xA00000L).build();
    }
}

