package io.trino.plugin.exchange.filesystem;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.ExchangeSink;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.crypto.SecretKey;
import org.openjdk.jol.info.ClassLayout;

@ThreadSafe
/* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchangeSink.class */
public class FileSystemExchangeSink implements ExchangeSink {
    public static final String COMMITTED_MARKER_FILE_NAME = "committed";
    public static final String DATA_FILE_SUFFIX = ".data";
    private static final int INSTANCE_SIZE = ClassLayout.parseClass(FileSystemExchangeSink.class).instanceSize();
    private final FileSystemExchangeStorage exchangeStorage;
    private final FileSystemExchangeStats stats;
    private final URI outputDirectory;
    private final int outputPartitionCount;
    private final Optional<SecretKey> secretKey;
    private final boolean preserveOrderWithinPartition;
    private final int maxPageStorageSizeInBytes;
    private final long maxFileSizeInBytes;
    private final BufferPool bufferPool;
    private final Map<Integer, BufferedStorageWriter> writersMap = new ConcurrentHashMap();
    private final AtomicReference<Throwable> failure = new AtomicReference<>();
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchangeSink$BufferPool.class */
    public static class BufferPool {
        private static final int INSTANCE_SIZE = ClassLayout.parseClass(BufferPool.class).instanceSize();
        private final FileSystemExchangeStats stats;
        private final int maxNumBuffers;
        private final int writeBufferSize;
        private final long bufferRetainedSize;

        @GuardedBy("this")
        private final Queue<SliceOutput> freeBuffersQueue;

        @GuardedBy("this")
        private CompletableFuture<Void> blockedFuture = new CompletableFuture<>();

        @GuardedBy("this")
        private boolean closed;

        @GuardedBy("this")
        private int numBuffersCreated;

        public BufferPool(FileSystemExchangeStats fileSystemExchangeStats, int i, int i2) {
            this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
            Preconditions.checkArgument(i >= 1, "maxNumBuffers must be at least one");
            this.maxNumBuffers = i;
            this.writeBufferSize = i2;
            this.numBuffersCreated = 1;
            this.freeBuffersQueue = new ArrayDeque(i);
            this.freeBuffersQueue.add(Slices.allocate(i2).getOutput());
            this.bufferRetainedSize = this.freeBuffersQueue.peek().getRetainedSize();
        }

        public synchronized CompletableFuture<Void> isBlocked() {
            if (hasFreeBuffers()) {
                return ExchangeSink.NOT_BLOCKED;
            }
            if (this.blockedFuture.isDone()) {
                this.blockedFuture = new CompletableFuture<>();
                this.stats.getExchangeSinkBlocked().record(this.blockedFuture);
            }
            return this.blockedFuture;
        }

        public synchronized SliceOutput take() {
            while (!this.closed) {
                if (hasFreeBuffers()) {
                    return this.freeBuffersQueue.poll();
                }
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
            return null;
        }

        public void offer(SliceOutput sliceOutput) {
            sliceOutput.reset();
            synchronized (this) {
                if (this.closed) {
                    return;
                }
                CompletableFuture<Void> completableFuture = this.blockedFuture;
                this.freeBuffersQueue.add(sliceOutput);
                notify();
                completableFuture.complete(null);
            }
        }

        public synchronized long getRetainedSize() {
            return this.closed ? INSTANCE_SIZE : INSTANCE_SIZE + (this.numBuffersCreated * this.bufferRetainedSize);
        }

        public void close() {
            synchronized (this) {
                if (this.closed) {
                    return;
                }
                this.closed = true;
                notifyAll();
                CompletableFuture<Void> completableFuture = this.blockedFuture;
                this.freeBuffersQueue.clear();
                completableFuture.complete(null);
            }
        }

        private boolean hasFreeBuffers() {
            if (!this.freeBuffersQueue.isEmpty()) {
                return true;
            }
            if (this.numBuffersCreated >= this.maxNumBuffers) {
                return false;
            }
            this.freeBuffersQueue.add(Slices.allocate(this.writeBufferSize).getOutput());
            this.numBuffersCreated++;
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchangeSink$BufferedStorageWriter.class */
    public static class BufferedStorageWriter {
        private static final int INSTANCE_SIZE = ClassLayout.parseClass(BufferedStorageWriter.class).instanceSize();
        private final FileSystemExchangeStorage exchangeStorage;
        private final FileSystemExchangeStats stats;
        private final URI outputDirectory;
        private final Optional<SecretKey> secretKey;
        private final boolean preserveOrderWithinPartition;
        private final int partitionId;
        private final BufferPool bufferPool;
        private final AtomicReference<Throwable> failure;
        private final int maxPageStorageSizeInBytes;
        private final long maxFileSizeInBytes;

        @GuardedBy("this")
        private ExchangeStorageWriter currentWriter;

        @GuardedBy("this")
        private long currentFileSize;

        @GuardedBy("this")
        private SliceOutput currentBuffer;

        @GuardedBy("this")
        private final List<ExchangeStorageWriter> writers = new ArrayList();

        @GuardedBy("this")
        private boolean closed;

        public BufferedStorageWriter(FileSystemExchangeStorage fileSystemExchangeStorage, FileSystemExchangeStats fileSystemExchangeStats, URI uri, Optional<SecretKey> optional, boolean z, int i, BufferPool bufferPool, AtomicReference<Throwable> atomicReference, int i2, long j) {
            this.exchangeStorage = (FileSystemExchangeStorage) Objects.requireNonNull(fileSystemExchangeStorage, "exchangeStorage is null");
            this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
            this.outputDirectory = (URI) Objects.requireNonNull(uri, "outputDirectory is null");
            this.secretKey = (Optional) Objects.requireNonNull(optional, "secretKey is null");
            this.preserveOrderWithinPartition = z;
            this.partitionId = i;
            this.bufferPool = (BufferPool) Objects.requireNonNull(bufferPool, "bufferPool is null");
            this.failure = (AtomicReference) Objects.requireNonNull(atomicReference, "failure is null");
            this.maxPageStorageSizeInBytes = i2;
            this.maxFileSizeInBytes = j;
            setupWriterForNextPart();
        }

        public synchronized void write(Slice slice) {
            if (this.closed) {
                return;
            }
            int length = 4 + slice.length();
            if (length > this.maxPageStorageSizeInBytes) {
                throw new TrinoException(StandardErrorCode.NOT_SUPPORTED, String.format("Max row size of %s exceeded: %s", DataSize.succinctBytes(this.maxPageStorageSizeInBytes), DataSize.succinctBytes(length)));
            }
            if (this.currentFileSize + length > this.maxFileSizeInBytes && !this.preserveOrderWithinPartition) {
                this.stats.getFileSizeInBytes().add(this.currentFileSize);
                flushIfNeeded(true);
                setupWriterForNextPart();
                this.currentFileSize = 0L;
                this.currentBuffer = null;
            }
            writeInternal(Slices.wrappedIntArray(new int[]{slice.length()}));
            writeInternal(slice);
            this.currentFileSize += length;
        }

        public synchronized ListenableFuture<Void> finish() {
            if (this.closed) {
                return Futures.immediateFailedFuture(new IllegalStateException("BufferedStorageWriter has already closed"));
            }
            this.stats.getFileSizeInBytes().add(this.currentFileSize);
            flushIfNeeded(true);
            return this.writers.size() == 1 ? this.currentWriter.finish() : MoreFutures.asVoid(Futures.allAsList((Iterable) this.writers.stream().map((v0) -> {
                return v0.finish();
            }).collect(ImmutableList.toImmutableList())));
        }

        public synchronized ListenableFuture<Void> abort() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            this.closed = true;
            return this.writers.size() == 1 ? this.currentWriter.abort() : MoreFutures.asVoid(Futures.allAsList((Iterable) this.writers.stream().map((v0) -> {
                return v0.abort();
            }).collect(ImmutableList.toImmutableList())));
        }

        public synchronized long getRetainedSize() {
            return INSTANCE_SIZE + SizeOf.estimatedSizeOf(this.writers, (v0) -> {
                return v0.getRetainedSize();
            });
        }

        @GuardedBy("this")
        private void setupWriterForNextPart() {
            this.currentWriter = this.exchangeStorage.createExchangeStorageWriter(this.outputDirectory.resolve(this.partitionId + "_" + this.writers.size() + ".data"), this.secretKey);
            this.writers.add(this.currentWriter);
        }

        @GuardedBy("this")
        private void writeInternal(Slice slice) {
            int i = 0;
            while (i < slice.length()) {
                if (this.currentBuffer == null) {
                    this.currentBuffer = this.bufferPool.take();
                    if (this.currentBuffer == null) {
                        return;
                    }
                }
                int min = Math.min(this.currentBuffer.writableBytes(), slice.length() - i);
                this.currentBuffer.writeBytes(slice.getBytes(i, min));
                i += min;
                flushIfNeeded(false);
            }
        }

        @GuardedBy("this")
        private void flushIfNeeded(boolean z) {
            SliceOutput sliceOutput = this.currentBuffer;
            if (sliceOutput != null) {
                if (!sliceOutput.isWritable() || z) {
                    if (!sliceOutput.isWritable()) {
                        this.currentBuffer = null;
                    }
                    ListenableFuture<Void> write = this.currentWriter.write(sliceOutput.slice());
                    write.addListener(() -> {
                        this.bufferPool.offer(sliceOutput);
                    }, MoreExecutors.directExecutor());
                    MoreFutures.addExceptionCallback(write, th -> {
                        this.failure.compareAndSet(null, th);
                    });
                }
            }
        }
    }

    public FileSystemExchangeSink(FileSystemExchangeStorage fileSystemExchangeStorage, FileSystemExchangeStats fileSystemExchangeStats, URI uri, int i, Optional<SecretKey> optional, boolean z, int i2, int i3, int i4, long j) {
        Preconditions.checkArgument(((long) i2) <= j, String.format("maxPageStorageSizeInBytes %s exceeded maxFileSizeInBytes %s", DataSize.succinctBytes(i2), DataSize.succinctBytes(j)));
        this.exchangeStorage = (FileSystemExchangeStorage) Objects.requireNonNull(fileSystemExchangeStorage, "exchangeStorage is null");
        this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
        this.outputDirectory = (URI) Objects.requireNonNull(uri, "outputDirectory is null");
        this.outputPartitionCount = i;
        this.secretKey = (Optional) Objects.requireNonNull(optional, "secretKey is null");
        this.preserveOrderWithinPartition = z;
        this.maxPageStorageSizeInBytes = i2;
        this.maxFileSizeInBytes = j;
        this.bufferPool = new BufferPool(fileSystemExchangeStats, Math.max(i * i4, i3), fileSystemExchangeStorage.getWriteBufferSize());
    }

    public CompletableFuture<Void> isBlocked() {
        return this.bufferPool.isBlocked();
    }

    public void add(int i, Slice slice) {
        throwIfFailed();
        Preconditions.checkArgument(i < this.outputPartitionCount, "partition id is expected to be less than %s: %s", this.outputPartitionCount, i);
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.writersMap.computeIfAbsent(Integer.valueOf(i), (v1) -> {
                return createWriter(v1);
            }).write(slice);
        }
    }

    private BufferedStorageWriter createWriter(int i) {
        return new BufferedStorageWriter(this.exchangeStorage, this.stats, this.outputDirectory, this.secretKey, this.preserveOrderWithinPartition, i, this.bufferPool, this.failure, this.maxPageStorageSizeInBytes, this.maxFileSizeInBytes);
    }

    public long getMemoryUsage() {
        return INSTANCE_SIZE + this.bufferPool.getRetainedSize() + SizeOf.estimatedSizeOf(this.writersMap, SizeOf::sizeOf, (v0) -> {
            return v0.getRetainedSize();
        });
    }

    public synchronized CompletableFuture<Void> finish() {
        if (this.closed) {
            return CompletableFuture.failedFuture(new IllegalStateException("Exchange sink has already closed"));
        }
        ListenableFuture asVoid = MoreFutures.asVoid(Futures.allAsList((Iterable) this.writersMap.values().stream().map((v0) -> {
            return v0.finish();
        }).collect(ImmutableList.toImmutableList())));
        MoreFutures.addSuccessCallback(asVoid, this::destroy);
        ListenableFuture transformAsync = Futures.transformAsync(asVoid, r5 -> {
            return this.exchangeStorage.createEmptyFile(this.outputDirectory.resolve(COMMITTED_MARKER_FILE_NAME));
        }, MoreExecutors.directExecutor());
        Futures.addCallback(transformAsync, new FutureCallback<Void>() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchangeSink.1
            public void onSuccess(Void r4) {
                FileSystemExchangeSink.this.closed = true;
            }

            public void onFailure(Throwable th) {
                FileSystemExchangeSink.this.abort();
            }
        }, MoreExecutors.directExecutor());
        return this.stats.getExchangeSinkFinish().record(MoreFutures.toCompletableFuture(transformAsync));
    }

    public synchronized CompletableFuture<Void> abort() {
        if (this.closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.closed = true;
        ListenableFuture asVoid = MoreFutures.asVoid(Futures.allAsList((Iterable) this.writersMap.values().stream().map((v0) -> {
            return v0.abort();
        }).collect(ImmutableList.toImmutableList())));
        MoreFutures.addSuccessCallback(asVoid, this::destroy);
        return this.stats.getExchangeSinkAbort().record(MoreFutures.toCompletableFuture(Futures.transformAsync(asVoid, r4 -> {
            return this.exchangeStorage.deleteRecursively(ImmutableList.of(this.outputDirectory));
        }, MoreExecutors.directExecutor())));
    }

    private void throwIfFailed() {
        Throwable th = this.failure.get();
        if (th != null) {
            Throwables.throwIfUnchecked(th);
            throw new RuntimeException(th);
        }
    }

    private void destroy() {
        this.writersMap.clear();
        this.bufferPool.close();
    }
}
