package org.tio.utils.queue;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/tio/utils/queue/Reader.class */
final class Reader<E> extends Mapped {
    private static final Logger log = LoggerFactory.getLogger(Reader.class);
    static final String NAME = "data.read";
    private final BlockingQueue<Path> clearQueue;
    private final Path path;
    private final long mfs;
    private final long mds;
    Writer<E> writer;
    long dataIdx;
    private OffsetFile offset;
    private long offsetIdx;
    private long offsetName;
    private DataFile data;
    private long dataName;
    private long maxDataIdx;
    private long maxOffsetIdx;
    private final ReentrantLock lock;

    public Reader(Path path, long j, long j2, Writer<E> writer) throws IOException {
        super(path.resolve(NAME), 0L, 8L);
        this.clearQueue = new LinkedBlockingQueue(12);
        this.lock = new ReentrantLock();
        this.path = path;
        this.mfs = j;
        this.mds = j2;
        this.writer = writer;
        this.dataIdx = readCurrentDataIndex();
        this.offset = initOffsetMapped();
        this.offsetIdx = readCurrentOffsetIndex();
        this.data = initDataMapped();
        this.maxDataIdx = readMaxDataIndex();
        startCleanThread();
    }

    public E take(Function<byte[], E> function) throws InterruptedException {
        while (true) {
            E poll = poll(function);
            if (poll != null) {
                return poll;
            }
            while (this.offsetIdx >= this.writer.offsetIdx) {
                this.writer.waiting();
            }
        }
    }

    public E poll(Function<byte[], E> function) {
        this.lock.lock();
        try {
            byte[] poll0 = poll0();
            if (poll0 == null) {
                return null;
            }
            this.dataIdx++;
            this.buffer.putLong(this.dataIdx);
            this.buffer.flip();
            E apply = function.apply(poll0);
            this.lock.unlock();
            return apply;
        } finally {
            this.lock.unlock();
        }
    }

    private byte[] poll0() {
        if (this.offset == null || this.offsetIdx >= this.writer.offsetIdx) {
            return null;
        }
        if (this.dataIdx >= this.maxDataIdx) {
            nextOffset();
        }
        long read = this.offset.read();
        if (read > this.maxOffsetIdx) {
            nextData();
        }
        int i = (int) (read - this.offsetIdx);
        if (i > this.mds) {
            throw new IllegalArgumentException("数据超长！ 最大长度：" + this.mds + ", 当前长度: " + i);
        }
        byte[] bArr = new byte[i];
        this.data.read(bArr, i);
        this.offsetIdx += i;
        return bArr;
    }

    private void nextData() {
        try {
            Path pathname = pathname(this.path, this.offsetIdx, ".data");
            this.data.close();
            this.maxOffsetIdx = this.offsetIdx + this.mfs;
            this.data = new DataFile(pathname, 0L, this.mfs);
            this.clearQueue.put(pathname(this.path, this.dataName, ".data"));
            this.dataName = this.offsetIdx;
            log.debug("读取下一个数据文件: {}", pathname);
        } catch (Exception e) {
            throw new IllegalStateException("创建读取日志文件映射地址异常", e);
        }
    }

    private void nextOffset() {
        try {
            Path pathname = pathname(this.path, this.dataIdx, ".offset");
            if (Files.notExists(pathname, new LinkOption[0])) {
                throw new IllegalArgumentException("文件不存在！- " + pathname);
            }
            this.offset.close();
            this.maxDataIdx += this.mfs / 8;
            this.offset = new OffsetFile(pathname, 0L, this.mfs);
            this.clearQueue.put(pathname(this.path, this.offsetName, ".offset"));
            this.offsetName = this.dataIdx;
            log.debug("读取下一个偏移量文件:{}", pathname);
        } catch (Exception e) {
            throw new IllegalStateException("创建数据偏移量文件映射地址异常", e);
        }
    }

    private DataFile initDataMapped() throws IOException {
        this.dataName = DataFile.name(this.path, this.offsetIdx, this.mfs);
        if (this.dataName < 0 || this.offsetIdx - this.dataName > this.mfs) {
            throw new IOException("文件偏移量异常, 获取的数据文件: " + this.dataName + ", 当前需要写入的偏移量: " + this.offsetIdx);
        }
        Path pathname = pathname(this.path, this.dataName, ".data");
        if (this.dataIdx != 0 && Files.notExists(pathname, new LinkOption[0])) {
            throw new FileNotFoundException("文件不存在！" + pathname);
        }
        this.maxOffsetIdx = this.dataName + this.mfs;
        return new DataFile(pathname, this.offsetIdx - this.dataName, (this.dataName + this.mfs) - this.offsetIdx);
    }

    private long readCurrentOffsetIndex() throws IOException {
        if (this.dataIdx == 0) {
            return 0L;
        }
        long j = ((this.dataIdx * 8) % this.mfs) - 8;
        if ((this.dataIdx * 8) % this.mfs == 0) {
            j = this.mfs - 8;
        }
        return this.offset.get(j, 8);
    }

    private OffsetFile initOffsetMapped() throws IOException {
        long j = 0;
        if (this.dataIdx != 0 && this.dataIdx % (this.mfs / 8) == 0) {
            j = this.dataIdx - (this.mfs / 8);
        } else if (this.dataIdx % (this.mfs / 8) != 0) {
            j = this.dataIdx - (this.dataIdx % (this.mfs / 8));
        }
        Path pathname = pathname(this.path, j, ".offset");
        if (this.dataIdx != 0 && Files.notExists(pathname, new LinkOption[0])) {
            throw new FileNotFoundException("程序有误,需要读的文件找不到,文件名:" + pathname);
        }
        this.offsetName = j;
        return (this.dataIdx == 0 || (this.dataIdx * 8) % this.mfs != 0) ? new OffsetFile(pathname, (this.dataIdx * 8) % this.mfs, this.mfs - ((this.dataIdx * 8) % this.mfs)) : new OffsetFile(pathname, this.mfs, 0L);
    }

    private long readCurrentDataIndex() {
        long j;
        if (this.newed) {
            MappedByteBuffer mappedByteBuffer = this.buffer;
            long j2 = this.writer.dataIdx;
            j = j2;
            mappedByteBuffer.putLong(j2);
            log.debug("首次读取:{}", this.path);
        } else {
            j = this.buffer.getLong();
            log.debug("继续读取:{}", this.path);
        }
        this.buffer.flip();
        return j;
    }

    private long readMaxDataIndex() {
        return ((this.dataIdx == 0 || this.dataIdx % (this.mfs / 8) != 0) ? (this.dataIdx / (this.mfs / 8)) + 1 : this.dataIdx / (this.mfs / 8)) * (this.mfs / 8);
    }

    private void startCleanThread() {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Path take = this.clearQueue.take();
                    Files.deleteIfExists(take);
                    log.debug("已读并删除：{}", take);
                } catch (IOException | InterruptedException e) {
                    log.error(e.getMessage(), e);
                    return;
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("FileQueueClear");
        thread.start();
    }

    @Override // org.tio.utils.queue.Mapped, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.lock.lock();
        try {
            super.close();
            this.data.close();
            this.data = null;
            this.offset.close();
            this.offset = null;
        } finally {
            this.lock.unlock();
        }
    }
}
