/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.filesystem.node.queue;

import io.datarouter.filesystem.node.queue.BaseDirectoryQueueNode;
import io.datarouter.filesystem.raw.queue.DirectoryQueue;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.StringDatabeanCodec;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.node.NodeParams;
import io.datarouter.storage.node.op.raw.QueueStorage;
import io.datarouter.storage.op.scan.queue.PollUntilEmptyQueueStorageScanner;
import io.datarouter.storage.queue.BaseQueueMessage;
import io.datarouter.storage.queue.QueueMessage;
import io.datarouter.storage.queue.QueueMessageKey;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

public class DirectoryQueueNode<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>>
extends BaseDirectoryQueueNode<PK, D, F>
implements QueueStorage.PhysicalQueueStorageNode<PK, D, F> {
    public DirectoryQueueNode(DirectoryQueue directoryQueue, NodeParams<PK, D, F> params) {
        super(directoryQueue, params);
    }

    public QueueMessage<PK, D> peek(Config config) {
        return this.directoryQueue.peek().map(directoryQueueMessage -> {
            DatabeanFielder fielder = this.getFieldInfo().getSampleFielder();
            Supplier databeanSupplier = this.getFieldInfo().getDatabeanSupplier();
            StringDatabeanCodec codec = fielder.getStringDatabeanCodec();
            Databean databean = codec.fromString(directoryQueueMessage.getContentUtf8(), fielder, databeanSupplier);
            byte[] receiptHandle = directoryQueueMessage.getIdUtf8Bytes();
            return new QueueMessage(receiptHandle, databean, Map.of());
        }).orElse(null);
    }

    public List<QueueMessage<PK, D>> peekMulti(Config config) {
        int limit = config.findLimit().orElse(10);
        return Scanner.generate(() -> this.peek(config)).limit((long)limit).list();
    }

    public Scanner<QueueMessage<PK, D>> peekUntilEmpty(Config config) {
        return Scanner.generate(() -> this.peek(config)).advanceUntil(Objects::isNull);
    }

    public void put(D databean, Config config) {
        DatabeanFielder fielder = this.getFieldInfo().getSampleFielder();
        String content = fielder.getStringDatabeanCodec().toString(databean, fielder);
        this.directoryQueue.putMessage(content);
    }

    public void putMulti(Collection<D> databeans, Config config) {
        databeans.forEach(databean -> this.put(databean, config));
    }

    public D poll(Config config) {
        QueueMessage<PK, D> message = this.peek(config);
        if (message == null) {
            return null;
        }
        this.ack(message.getKey(), config);
        return (D)message.getDatabean();
    }

    public List<D> pollMulti(Config config) {
        List<QueueMessage<PK, D>> messages = this.peekMulti(config);
        Scanner.of(messages).map(BaseQueueMessage::getKey).flush(keys -> this.ackMulti((Collection<QueueMessageKey>)keys, config));
        return Scanner.of(messages).map(QueueMessage::getDatabean).list();
    }

    public Scanner<D> pollUntilEmpty(Config config) {
        return new PollUntilEmptyQueueStorageScanner((QueueStorage)this, config);
    }
}

