package io.trino.spooling.filesystem;

import io.airlift.units.DataSize;
import io.azam.ulidj.ULID;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.s3.S3FileSystemConfig;
import io.trino.filesystem.s3.S3FileSystemFactory;
import io.trino.filesystem.s3.S3FileSystemStats;
import io.trino.spi.QueryId;
import io.trino.spi.protocol.SpooledSegmentHandle;
import io.trino.spi.protocol.SpoolingContext;
import io.trino.spi.protocol.SpoolingManager;
import io.trino.spooling.filesystem.encryption.EncryptionUtils;
import io.trino.testing.containers.Minio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/spooling/filesystem/TestFileSystemSpoolingManager.class */
public class TestFileSystemSpoolingManager {
    private static final String BUCKET_NAME = "spooling" + UUID.randomUUID().toString().replace("-", "");
    private Minio minio;

    @BeforeAll
    public void setup() {
        this.minio = Minio.builder().build();
        this.minio.start();
        this.minio.createBucket(BUCKET_NAME);
    }

    @AfterAll
    public void teardown() {
        this.minio.stop();
    }

    @Test
    public void testRetrieveSpooledSegment() throws Exception {
        SpoolingManager spoolingManager = getSpoolingManager();
        SpooledSegmentHandle create = spoolingManager.create(new SpoolingContext("json", QueryId.valueOf("a"), 0L, 0L));
        OutputStream createOutputStream = spoolingManager.createOutputStream(create);
        try {
            createOutputStream.write("data".getBytes(StandardCharsets.UTF_8));
            if (createOutputStream != null) {
                createOutputStream.close();
            }
            InputStream openInputStream = spoolingManager.openInputStream(create);
            try {
                byte[] bArr = new byte[4];
                Assertions.assertThat(openInputStream.read(bArr)).isEqualTo(bArr.length);
                Assertions.assertThat(bArr).isEqualTo("data".getBytes(StandardCharsets.UTF_8));
                if (openInputStream != null) {
                    openInputStream.close();
                }
            } catch (Throwable th) {
                if (openInputStream != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createOutputStream != null) {
                try {
                    createOutputStream.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAcknowledgedSegmentCantBeRetrievedAgain() throws Exception {
        SpoolingManager spoolingManager = getSpoolingManager();
        SpooledSegmentHandle create = spoolingManager.create(new SpoolingContext("json", QueryId.valueOf("a"), 0L, 0L));
        OutputStream createOutputStream = spoolingManager.createOutputStream(create);
        try {
            createOutputStream.write("data".getBytes(StandardCharsets.UTF_8));
            if (createOutputStream != null) {
                createOutputStream.close();
            }
            InputStream openInputStream = spoolingManager.openInputStream(create);
            try {
                byte[] bArr = new byte[4];
                Assertions.assertThat(openInputStream.read(bArr)).isEqualTo(bArr.length);
                Assertions.assertThat(bArr).isEqualTo("data".getBytes(StandardCharsets.UTF_8));
                if (openInputStream != null) {
                    openInputStream.close();
                }
                spoolingManager.acknowledge(create);
                Assertions.assertThatThrownBy(() -> {
                    spoolingManager.openInputStream(create).read();
                }).isInstanceOf(IOException.class).hasMessage("Segment not found or expired");
            } catch (Throwable th) {
                if (openInputStream != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createOutputStream != null) {
                try {
                    createOutputStream.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testHandleRoundTrip() {
        FileSystemSpooledSegmentHandle fileSystemSpooledSegmentHandle = new FileSystemSpooledSegmentHandle("json", QueryId.valueOf("a"), ULID.randomBinary(), Optional.of(EncryptionUtils.generateRandomKey()));
        FileSystemSpooledSegmentHandle handle = getSpoolingManager().handle(getSpoolingManager().location(fileSystemSpooledSegmentHandle));
        Assertions.assertThat(fileSystemSpooledSegmentHandle.queryId()).isEqualTo(handle.queryId());
        Assertions.assertThat(fileSystemSpooledSegmentHandle.storageObjectName()).isEqualTo(handle.storageObjectName());
        Assertions.assertThat(fileSystemSpooledSegmentHandle.uuid()).isEqualTo(handle.uuid());
        Assertions.assertThat(fileSystemSpooledSegmentHandle.expirationTime()).isEqualTo(handle.expirationTime());
        Assertions.assertThat(fileSystemSpooledSegmentHandle.encryptionKey()).isEqualTo(handle.encryptionKey());
    }

    private SpoolingManager getSpoolingManager() {
        FileSystemSpoolingConfig fileSystemSpoolingConfig = new FileSystemSpoolingConfig();
        fileSystemSpoolingConfig.setS3Enabled(true);
        fileSystemSpoolingConfig.setLocation("s3://%s/".formatted(BUCKET_NAME));
        return new FileSystemSpoolingManager(fileSystemSpoolingConfig, new S3FileSystemFactory(OpenTelemetry.noop(), new S3FileSystemConfig().setEndpoint(this.minio.getMinioAddress()).setRegion("us-east-1").setPathStyleAccess(true).setAwsAccessKey("accesskey").setAwsSecretKey("secretkey").setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats()));
    }
}
