/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.tieredstorage;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.aiven.kafka.tieredstorage.ClosableInputStreamHolder;
import io.aiven.kafka.tieredstorage.InvalidRecordBatchException;
import io.aiven.kafka.tieredstorage.ObjectKeyFactory;
import io.aiven.kafka.tieredstorage.SegmentCompressionChecker;
import io.aiven.kafka.tieredstorage.config.RemoteStorageManagerConfig;
import io.aiven.kafka.tieredstorage.fetch.ChunkManager;
import io.aiven.kafka.tieredstorage.fetch.ChunkManagerFactory;
import io.aiven.kafka.tieredstorage.fetch.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException;
import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache;
import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache;
import io.aiven.kafka.tieredstorage.fetch.manifest.MemorySegmentManifestCache;
import io.aiven.kafka.tieredstorage.fetch.manifest.SegmentManifestCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndex;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1;
import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule;
import io.aiven.kafka.tieredstorage.manifest.serde.KafkaTypeSerdeModule;
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataBuilder;
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataField;
import io.aiven.kafka.tieredstorage.metadata.SegmentCustomMetadataSerde;
import io.aiven.kafka.tieredstorage.metrics.Metrics;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.security.RsaKeyReader;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DecryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.RateLimitedInputStream;
import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
import io.github.bucket4j.Bucket;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.security.KeyPair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteStorageManager
implements org.apache.kafka.server.log.remote.storage.RemoteStorageManager {
    private static final Logger log = LoggerFactory.getLogger(RemoteStorageManager.class);
    private final Time time;
    private Metrics metrics;
    private ObjectFetcher fetcher;
    private ObjectUploader uploader;
    private ObjectDeleter deleter;
    private boolean compressionEnabled;
    private boolean compressionHeuristic;
    private boolean encryptionEnabled;
    private int chunkSize;
    private RsaEncryptionProvider rsaEncryptionProvider;
    private AesEncryptionProvider aesEncryptionProvider;
    private ObjectMapper mapper;
    private ChunkManager chunkManager;
    private ObjectKeyFactory objectKeyFactory;
    private SegmentCustomMetadataSerde customMetadataSerde;
    private Set<SegmentCustomMetadataField> customMetadataFields;
    private SegmentManifestCache segmentManifestCache;
    private SegmentIndexesCache segmentIndexesCache;
    private Bucket rateLimitingBucket;

    public RemoteStorageManager() {
        this(Time.SYSTEM);
    }

    RemoteStorageManager(Time time) {
        this.time = time;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        Objects.requireNonNull(configs, "configs must not be null");
        RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(configs);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString("metrics.recording.level")));
        this.metrics = new Metrics(this.time, metricConfig);
        this.setStorage(config.storage());
        this.objectKeyFactory = new ObjectKeyFactory(config.keyPrefix(), config.keyPrefixMask());
        this.encryptionEnabled = config.encryptionEnabled();
        if (this.encryptionEnabled) {
            HashMap<String, KeyPair> keyRing = new HashMap<String, KeyPair>();
            config.encryptionKeyRing().forEach((keyId, keyPaths) -> keyRing.put((String)keyId, RsaKeyReader.read(keyPaths.publicKey, keyPaths.privateKey)));
            this.rsaEncryptionProvider = new RsaEncryptionProvider(config.encryptionKeyPairId(), keyRing);
            this.aesEncryptionProvider = new AesEncryptionProvider();
        }
        ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory();
        chunkManagerFactory.configure(configs);
        this.chunkManager = chunkManagerFactory.initChunkManager(this.fetcher, this.aesEncryptionProvider);
        this.chunkSize = config.chunkSize();
        this.compressionEnabled = config.compressionEnabled();
        this.compressionHeuristic = config.compressionHeuristicEnabled();
        this.mapper = this.getObjectMapper();
        this.segmentManifestCache = new MemorySegmentManifestCache(this.fetcher, this.mapper);
        this.segmentManifestCache.configure(config.segmentManifestCacheConfigs());
        this.segmentIndexesCache = new MemorySegmentIndexesCache();
        this.segmentIndexesCache.configure(config.fetchIndexesCacheConfigs());
        this.customMetadataSerde = new SegmentCustomMetadataSerde();
        this.customMetadataFields = config.customMetadataKeysIncluded();
        config.uploadRateLimit().ifPresent(value -> {
            this.rateLimitingBucket = RateLimitedInputStream.rateLimitBucket(value);
        });
    }

    void setStorage(StorageBackend storage) {
        this.fetcher = storage;
        this.uploader = storage;
        this.deleter = storage;
    }

    void setSegmentManifestCache(MemorySegmentManifestCache segmentManifestCache) {
        this.segmentManifestCache = segmentManifestCache;
    }

    void setChunkManager(ChunkManager chunkManager) {
        this.chunkManager = chunkManager;
    }

    private ObjectMapper getObjectMapper() {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new Jdk8Module());
        objectMapper.registerModule(KafkaTypeSerdeModule.create());
        if (this.encryptionEnabled) {
            objectMapper.registerModule(EncryptionSerdeModule.create(this.rsaEncryptionProvider));
        }
        return objectMapper;
    }

    @Override
    public Optional<RemoteLogSegmentMetadata.CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData) throws RemoteStorageException {
        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentId must not be null");
        Objects.requireNonNull(logSegmentData, "logSegmentData must not be null");
        log.info("Copying log segment data, metadata: {}", (Object)remoteLogSegmentMetadata);
        SegmentCustomMetadataBuilder customMetadataBuilder = new SegmentCustomMetadataBuilder(this.customMetadataFields, this.objectKeyFactory, remoteLogSegmentMetadata);
        long startedMs = this.time.milliseconds();
        try {
            boolean requiresCompression = this.requiresCompression(logSegmentData);
            DataKeyAndAAD maybeEncryptionKey = this.encryptionEnabled ? this.aesEncryptionProvider.createDataKeyAndAAD() : null;
            ChunkIndex chunkIndex = this.uploadSegmentLog(remoteLogSegmentMetadata, logSegmentData, requiresCompression, maybeEncryptionKey, customMetadataBuilder);
            SegmentIndexesV1 segmentIndexes = this.uploadIndexes(remoteLogSegmentMetadata, logSegmentData, maybeEncryptionKey, customMetadataBuilder);
            this.uploadManifest(remoteLogSegmentMetadata, chunkIndex, segmentIndexes, requiresCompression, maybeEncryptionKey, customMetadataBuilder);
        }
        catch (Exception e) {
            try {
                this.deleteSegmentObjects(remoteLogSegmentMetadata);
            }
            catch (Exception ex) {
                log.warn("Removing orphan files failed", ex);
            }
            throw new RemoteStorageException(e);
        }
        this.metrics.recordSegmentCopyTime(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), startedMs, this.time.milliseconds());
        Optional<RemoteLogSegmentMetadata.CustomMetadata> customMetadata = this.buildCustomMetadata(customMetadataBuilder);
        log.info("Copying log segment data completed successfully, metadata: {}", (Object)remoteLogSegmentMetadata);
        return customMetadata;
    }

    private void deleteSegmentObjects(RemoteLogSegmentMetadata metadata) throws StorageBackendException {
        Set<ObjectKey> keys = Arrays.stream(ObjectKeyFactory.Suffix.values()).map(s -> this.objectKeyFactory.key(metadata, (ObjectKeyFactory.Suffix)((Object)s))).collect(Collectors.toSet());
        this.deleter.delete(keys);
    }

    SegmentIndexesV1 uploadIndexes(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData segmentData, DataKeyAndAAD maybeEncryptionKey, SegmentCustomMetadataBuilder customMetadataBuilder) throws IOException, RemoteStorageException, StorageBackendException {
        ArrayList<InputStream> indexes = new ArrayList<InputStream>(RemoteStorageManager.IndexType.values().length);
        SegmentIndexesV1Builder segmentIndexBuilder = new SegmentIndexesV1Builder();
        try (ClosableInputStreamHolder closableInputStreamHolder = new ClosableInputStreamHolder();){
            InputStream offsetIndex = this.transformIndex(RemoteStorageManager.IndexType.OFFSET, closableInputStreamHolder.add(Files.newInputStream(segmentData.offsetIndex(), new OpenOption[0])), RemoteStorageManager.indexSize(segmentData.offsetIndex()), maybeEncryptionKey, segmentIndexBuilder);
            indexes.add(offsetIndex);
            InputStream timeIndex = this.transformIndex(RemoteStorageManager.IndexType.TIMESTAMP, closableInputStreamHolder.add(Files.newInputStream(segmentData.timeIndex(), new OpenOption[0])), RemoteStorageManager.indexSize(segmentData.timeIndex()), maybeEncryptionKey, segmentIndexBuilder);
            indexes.add(timeIndex);
            InputStream producerSnapshotIndex = this.transformIndex(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, closableInputStreamHolder.add(Files.newInputStream(segmentData.producerSnapshotIndex(), new OpenOption[0])), RemoteStorageManager.indexSize(segmentData.producerSnapshotIndex()), maybeEncryptionKey, segmentIndexBuilder);
            indexes.add(producerSnapshotIndex);
            InputStream leaderEpoch = this.transformIndex(RemoteStorageManager.IndexType.LEADER_EPOCH, closableInputStreamHolder.add(new ByteBufferInputStream(segmentData.leaderEpochIndex())), segmentData.leaderEpochIndex().remaining(), maybeEncryptionKey, segmentIndexBuilder);
            indexes.add(leaderEpoch);
            if (segmentData.transactionIndex().isPresent()) {
                InputStream transactionIndex = this.transformIndex(RemoteStorageManager.IndexType.TRANSACTION, closableInputStreamHolder.add(Files.newInputStream(segmentData.transactionIndex().get(), new OpenOption[0])), RemoteStorageManager.indexSize(segmentData.transactionIndex().get()), maybeEncryptionKey, segmentIndexBuilder);
                indexes.add(transactionIndex);
            }
            ObjectKeyFactory.Suffix suffix = ObjectKeyFactory.Suffix.INDEXES;
            ObjectKey key = this.objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
            try (SequenceInputStream in = new SequenceInputStream(Collections.enumeration(indexes));){
                long bytes = this.uploader.upload(in, key);
                this.metrics.recordObjectUpload(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), suffix, bytes);
                customMetadataBuilder.addUploadResult(suffix, bytes);
                log.debug("Uploaded indexes file for {}, size: {}", (Object)remoteLogSegmentMetadata, (Object)bytes);
            }
        }
        return segmentIndexBuilder.build();
    }

    static int indexSize(Path indexPath) throws RemoteStorageException {
        try {
            long size = Files.size(indexPath);
            if (size > Integer.MAX_VALUE) {
                throw new IllegalStateException("Index at path " + String.valueOf(indexPath) + " has size larger than Integer.MAX_VALUE");
            }
            return (int)size;
        }
        catch (IOException e) {
            throw new RemoteStorageException("Error while getting index path size", e);
        }
    }

    private Optional<RemoteLogSegmentMetadata.CustomMetadata> buildCustomMetadata(SegmentCustomMetadataBuilder customMetadataBuilder) {
        NavigableMap<Integer, Object> customFields = customMetadataBuilder.build();
        if (!customFields.isEmpty()) {
            byte[] customMetadataBytes = this.customMetadataSerde.serialize(customFields);
            return Optional.of(new RemoteLogSegmentMetadata.CustomMetadata(customMetadataBytes));
        }
        return Optional.empty();
    }

    boolean requiresCompression(LogSegmentData logSegmentData) {
        boolean requiresCompression = false;
        if (this.compressionEnabled) {
            if (this.compressionHeuristic) {
                try {
                    File segmentFile = logSegmentData.logSegment().toFile();
                    boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile);
                    requiresCompression = !alreadyCompressed;
                }
                catch (InvalidRecordBatchException e) {
                    log.warn("Failed to check compression on log segment: {}", (Object)logSegmentData.logSegment(), (Object)e);
                }
            } else {
                requiresCompression = true;
            }
        }
        return requiresCompression;
    }

    ChunkIndex uploadSegmentLog(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData, boolean requiresCompression, DataKeyAndAAD maybeEncryptionKey, SegmentCustomMetadataBuilder customMetadataBuilder) throws IOException, StorageBackendException {
        ObjectKey objectKey = this.objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG);
        try (InputStream logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment(), new OpenOption[0]);){
            TransformChunkEnumeration transformEnum = this.transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey);
            TransformFinisher transformFinisher = TransformFinisher.newBuilder(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes()).withRateLimitingBucket(this.rateLimitingBucket).withOriginalFilePath(logSegmentData.logSegment()).build();
            try (InputStream sis = transformFinisher.toInputStream();){
                long bytes = this.uploader.upload(sis, objectKey);
                this.metrics.recordObjectUpload(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), ObjectKeyFactory.Suffix.LOG, bytes);
                customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes);
                log.debug("Uploaded segment log for {}, size: {}", (Object)remoteLogSegmentMetadata, (Object)bytes);
            }
            ChunkIndex chunkIndex = transformFinisher.chunkIndex();
            return chunkIndex;
        }
    }

    private TransformChunkEnumeration transformation(InputStream logSegmentInputStream, boolean requiresCompression, DataKeyAndAAD maybeEncryptionKey) {
        TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(logSegmentInputStream, this.chunkSize);
        if (requiresCompression) {
            transformEnum = new CompressionChunkEnumeration(transformEnum);
        }
        if (this.encryptionEnabled) {
            transformEnum = new EncryptionChunkEnumeration(transformEnum, () -> this.aesEncryptionProvider.encryptionCipher(maybeEncryptionKey));
        }
        return transformEnum;
    }

    InputStream transformIndex(RemoteStorageManager.IndexType indexType, InputStream index, int size, DataKeyAndAAD maybeEncryptionKey, SegmentIndexesV1Builder segmentIndexBuilder) {
        log.debug("Transforming index {} with size {}", (Object)indexType, (Object)size);
        if (size > 0) {
            TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size);
            if (this.encryptionEnabled) {
                transformEnum = new EncryptionChunkEnumeration(transformEnum, () -> this.aesEncryptionProvider.encryptionCipher(maybeEncryptionKey));
            }
            TransformFinisher transformFinisher = TransformFinisher.newBuilder(transformEnum, size).withChunkingDisabled().withRateLimitingBucket(this.rateLimitingBucket).build();
            InputStream inputStream = transformFinisher.nextElement();
            ChunkIndex chunkIndex = transformFinisher.chunkIndex();
            if (chunkIndex == null) {
                throw new IllegalStateException("Chunking disabled when single chunk is expected");
            }
            if (chunkIndex.chunks().size() != 1) {
                throw new IllegalStateException("Number of chunks different than 1, single chunk is expected");
            }
            segmentIndexBuilder.add(indexType, chunkIndex.chunks().get(0).range().size());
            return inputStream;
        }
        segmentIndexBuilder.add(indexType, 0);
        return InputStream.nullInputStream();
    }

    void uploadManifest(RemoteLogSegmentMetadata remoteLogSegmentMetadata, ChunkIndex chunkIndex, SegmentIndexesV1 segmentIndexes, boolean requiresCompression, DataKeyAndAAD maybeEncryptionKey, SegmentCustomMetadataBuilder customMetadataBuilder) throws StorageBackendException, IOException {
        SegmentEncryptionMetadataV1 maybeEncryptionMetadata = maybeEncryptionKey != null ? new SegmentEncryptionMetadataV1(maybeEncryptionKey) : null;
        SegmentManifestV1 segmentManifest = new SegmentManifestV1(chunkIndex, segmentIndexes, requiresCompression, maybeEncryptionMetadata, remoteLogSegmentMetadata);
        String manifest = this.mapper.writeValueAsString(segmentManifest);
        ObjectKey manifestObjectKey = this.objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
        try (ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes());){
            long bytes = this.uploader.upload(manifestContent, manifestObjectKey);
            this.metrics.recordObjectUpload(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), ObjectKeyFactory.Suffix.MANIFEST, bytes);
            customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.MANIFEST, bytes);
            log.debug("Uploaded segment manifest for {}, size: {}", (Object)remoteLogSegmentMetadata, (Object)bytes);
        }
    }

    @Override
    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition) throws RemoteStorageException {
        return this.fetchLogSegment(remoteLogSegmentMetadata, startPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1);
    }

    @Override
    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition, int endPosition) throws RemoteStorageException {
        BytesRange range = BytesRange.of(startPosition, Math.min(endPosition, remoteLogSegmentMetadata.segmentSizeInBytes() - 1));
        try {
            log.trace("Fetching log segment {} with range: {}", (Object)remoteLogSegmentMetadata, (Object)range);
            this.metrics.recordSegmentFetch(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), range.size());
            SegmentManifest segmentManifest = this.fetchSegmentManifest(remoteLogSegmentMetadata);
            ObjectKeyFactory.Suffix suffix = ObjectKeyFactory.Suffix.LOG;
            ObjectKey segmentKey = this.objectKey(remoteLogSegmentMetadata, suffix);
            return new FetchChunkEnumeration(this.chunkManager, segmentKey, segmentManifest, range).toInputStream();
        }
        catch (KeyNotFoundRuntimeException | KeyNotFoundException e) {
            throw new RemoteResourceNotFoundException(e);
        }
        catch (ClosedByInterruptException e) {
            log.debug("Fetching log segment {} with range {} was interrupted", (Object)remoteLogSegmentMetadata, (Object)range);
            return InputStream.nullInputStream();
        }
        catch (StorageBackendException | RuntimeException e) {
            InputStream is = RemoteStorageManager.maybeReturnNullInputStreamIfInterrupted(e, remoteLogSegmentMetadata, range);
            if (is != null) {
                return is;
            }
            throw new RemoteStorageException(e);
        }
        catch (Exception e) {
            throw new RemoteStorageException(e);
        }
    }

    private static InputStream maybeReturnNullInputStreamIfInterrupted(Throwable exception, RemoteLogSegmentMetadata remoteLogSegmentMetadata, BytesRange range) {
        if (exception.getCause() instanceof InterruptedException || exception.getCause() instanceof ClosedByInterruptException) {
            log.debug("Fetching log segment {} with range {} was interrupted", (Object)remoteLogSegmentMetadata, (Object)range);
            return InputStream.nullInputStream();
        }
        if (exception.getCause() instanceof StorageBackendException) {
            return RemoteStorageManager.maybeReturnNullInputStreamIfInterrupted(exception.getCause(), remoteLogSegmentMetadata, range);
        }
        return null;
    }

    @Override
    public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, RemoteStorageManager.IndexType indexType) throws RemoteStorageException {
        try {
            log.trace("Fetching index {} for {}", (Object)indexType, (Object)remoteLogSegmentMetadata);
            SegmentManifest segmentManifest = this.fetchSegmentManifest(remoteLogSegmentMetadata);
            ObjectKey key = this.objectKey(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.INDEXES);
            SegmentIndex segmentIndex = segmentManifest.segmentIndexes().segmentIndex(indexType);
            if (segmentIndex == null) {
                throw new RemoteResourceNotFoundException("Index " + String.valueOf((Object)indexType) + " not found on " + String.valueOf(key));
            }
            if (segmentIndex.range().isEmpty()) {
                return InputStream.nullInputStream();
            }
            return this.segmentIndexesCache.get(key, indexType, () -> this.fetchIndexBytes(key, segmentIndex, segmentManifest));
        }
        catch (RemoteResourceNotFoundException e) {
            throw e;
        }
        catch (KeyNotFoundException e) {
            throw new RemoteResourceNotFoundException(e);
        }
        catch (Exception e) {
            throw new RemoteStorageException(e);
        }
    }

    private byte[] fetchIndexBytes(ObjectKey key, SegmentIndex segmentIndex, SegmentManifest segmentManifest) {
        byte[] byArray;
        block11: {
            InputStream in;
            try {
                in = this.fetcher.fetch(key, segmentIndex.range());
            }
            catch (StorageBackendException e) {
                throw new RuntimeException("Error fetching index from remote storage", e);
            }
            DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(in);
            Optional<SegmentEncryptionMetadata> encryptionMetadata = segmentManifest.encryption();
            if (encryptionMetadata.isPresent()) {
                detransformEnum = new DecryptionChunkEnumeration(detransformEnum, encryptionMetadata.get().ivSize(), encryptedChunk -> this.aesEncryptionProvider.decryptionCipher((byte[])encryptedChunk, (SegmentEncryptionMetadata)encryptionMetadata.get()));
            }
            DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
            InputStream is = detransformFinisher.toInputStream();
            try {
                byArray = is.readAllBytes();
                if (is == null) break block11;
            }
            catch (Throwable throwable) {
                try {
                    if (is != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    throw new RuntimeException("Error reading de-transformed index bytes", e);
                }
            }
            is.close();
        }
        return byArray;
    }

    private ObjectKey objectKey(RemoteLogSegmentMetadata remoteLogSegmentMetadata, ObjectKeyFactory.Suffix suffix) {
        ObjectKey segmentKey;
        if (remoteLogSegmentMetadata.customMetadata().isPresent()) {
            RemoteLogSegmentMetadata.CustomMetadata customMetadataBytes = remoteLogSegmentMetadata.customMetadata().get();
            NavigableMap<Integer, Object> fields = this.customMetadataSerde.deserialize(customMetadataBytes.value());
            segmentKey = this.objectKeyFactory.key(fields, remoteLogSegmentMetadata, suffix);
        } else {
            segmentKey = this.objectKeyFactory.key(remoteLogSegmentMetadata, suffix);
        }
        return segmentKey;
    }

    private SegmentManifest fetchSegmentManifest(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws StorageBackendException, IOException {
        ObjectKey manifestKey = this.objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST);
        return this.segmentManifestCache.get(manifestKey);
    }

    @Override
    public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        log.info("Deleting log segment data for {}", (Object)remoteLogSegmentMetadata);
        this.metrics.recordSegmentDelete(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), remoteLogSegmentMetadata.segmentSizeInBytes());
        long startedMs = this.time.milliseconds();
        try {
            this.deleteSegmentObjects(remoteLogSegmentMetadata);
        }
        catch (Exception e) {
            this.metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition());
            throw new RemoteStorageException(e);
        }
        this.metrics.recordSegmentDeleteTime(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), startedMs, this.time.milliseconds());
        log.info("Deleting log segment data for completed successfully {}", (Object)remoteLogSegmentMetadata);
    }

    @Override
    public void close() {
        this.metrics.close();
    }
}

