/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.tieredstorage.fetch.manifest;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;
import io.aiven.kafka.tieredstorage.config.CacheConfig;
import io.aiven.kafka.tieredstorage.fetch.manifest.SegmentManifestCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;
import io.aiven.kafka.tieredstorage.metrics.ThreadPoolMonitor;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemorySegmentManifestCache
implements SegmentManifestCache {
    private static final Logger log = LoggerFactory.getLogger(MemorySegmentManifestCache.class);
    public static final String METRIC_GROUP = "segment-manifest-cache-metrics";
    public static final String THREAD_POOL_METRIC_GROUP = "segment-manifest-cache-thread-pool-metrics";
    private static final long DEFAULT_MAX_SIZE = 1000L;
    private static final long DEFAULT_RETENTION_MS = 3600000L;
    private AsyncLoadingCache<ObjectKey, SegmentManifest> cache;
    final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter("segment-manifest-cache-metrics");
    final ObjectFetcher fileFetcher;
    final ObjectMapper mapper;
    Duration getTimeout;

    public MemorySegmentManifestCache(ObjectFetcher fileFetcher, ObjectMapper mapper) {
        this.fileFetcher = fileFetcher;
        this.mapper = mapper;
    }

    @Override
    public SegmentManifest get(ObjectKey manifestKey) throws StorageBackendException, IOException {
        try {
            return this.cache.get(manifestKey).get(this.getTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause == null) {
                throw new RuntimeException(e);
            }
            if (e.getCause() instanceof StorageBackendException) {
                throw (StorageBackendException)e.getCause();
            }
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new RuntimeException(e);
        }
        catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    RemovalListener<ObjectKey, SegmentManifest> removalListener() {
        return (key, content, cause) -> log.debug("Deleted cached value for key {} from cache. The reason of the deletion is {}", key, (Object)cause);
    }

    private static Weigher<ObjectKey, SegmentManifest> weigher() {
        return (key, value) -> 1;
    }

    protected AsyncLoadingCache<ObjectKey, SegmentManifest> buildCache(CacheConfig config) {
        ForkJoinPool executor = config.threadPoolSize().map(ForkJoinPool::new).orElse(new ForkJoinPool());
        new ThreadPoolMonitor(THREAD_POOL_METRIC_GROUP, executor);
        this.getTimeout = config.getTimeout();
        Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
        config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight((long)maximumWeight).weigher(MemorySegmentManifestCache.weigher()));
        config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
        AsyncLoadingCache<ObjectKey, SegmentManifest> cache = cacheBuilder.evictionListener(this.removalListener()).scheduler(Scheduler.systemScheduler()).executor(executor).recordStats(() -> this.statsCounter).buildAsync(key -> {
            try (InputStream is = this.fileFetcher.fetch((ObjectKey)key);){
                SegmentManifest segmentManifest = this.mapper.readValue(is, SegmentManifest.class);
                return segmentManifest;
            }
        });
        this.statsCounter.registerSizeMetric(((LoadingCache)cache.synchronous())::estimatedSize);
        return cache;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        CacheConfig config = new CacheConfig(MemorySegmentManifestCache.configDef(), configs);
        this.cache = this.buildCache(config);
    }

    public static ConfigDef configDef() {
        return CacheConfig.defBuilder().withDefaultSize(1000L).withDefaultRetentionMs(3600000L).withSizeDoc("The maximum number of entries in the cache, where `-1` represents an unbounded cache.").build();
    }
}

