/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.filesystem.node.queue;

import io.datarouter.filesystem.node.queue.BaseDirectoryQueueNode;
import io.datarouter.filesystem.raw.queue.DirectoryQueue;
import io.datarouter.filesystem.raw.queue.DirectoryQueueMessage;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.StringDatabeanCodec;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.node.NodeParams;
import io.datarouter.storage.node.op.raw.GroupQueueStorage;
import io.datarouter.storage.queue.BaseQueueMessage;
import io.datarouter.storage.queue.GroupQueueMessage;
import io.datarouter.storage.queue.QueueMessageKey;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

public class DirectoryGroupQueueNode<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>>
extends BaseDirectoryQueueNode<PK, D, F>
implements GroupQueueStorage.PhysicalGroupQueueStorageNode<PK, D, F> {
    private static final int MAX_MESSAGE_BYTES = 0x100000;

    public DirectoryGroupQueueNode(DirectoryQueue directoryQueue, NodeParams<PK, D, F> params) {
        super(directoryQueue, params);
    }

    public GroupQueueMessage<PK, D> peek(Config config) {
        Config limitedConfig = config.clone().setLimit(Integer.valueOf(1));
        return this.peekMulti(limitedConfig).stream().findFirst().orElse(null);
    }

    public List<GroupQueueMessage<PK, D>> peekMulti(Config config) {
        int limit = config.findLimit().orElse(1);
        DatabeanFielder fielder = this.getFieldInfo().getSampleFielder();
        StringDatabeanCodec codec = this.getFieldInfo().getSampleFielder().getStringDatabeanCodec();
        Supplier databeanSupplier = this.getFieldInfo().getDatabeanSupplier();
        return Scanner.generate(this.directoryQueue::peek).limit((long)limit).advanceWhile(Optional::isPresent).map(Optional::get).map(arg_0 -> DirectoryGroupQueueNode.lambda$3(codec, fielder, (Supplier)databeanSupplier, arg_0)).list();
    }

    public Scanner<GroupQueueMessage<PK, D>> peekUntilEmpty(Config config) {
        return Scanner.generate(() -> this.peek(config)).advanceUntil(Objects::isNull);
    }

    public void put(D databean, Config config) {
        this.putMulti(List.of(databean), config);
    }

    public void putMulti(Collection<D> databeans, Config config) {
        DatabeanFielder fielder = this.getFieldInfo().getSampleFielder();
        StringDatabeanCodec codec = this.getFieldInfo().getSampleFielder().getStringDatabeanCodec();
        List databeansAsBytes = Scanner.of(databeans).map(databean -> codec.toBytes(databean, fielder)).list();
        List groups = codec.makeGroups(databeansAsBytes, 0x100000);
        Scanner.of((Iterable)groups).map(arg_0 -> ((StringDatabeanCodec)codec).concatGroup(arg_0)).forEach(this.directoryQueue::putMessage);
    }

    public List<D> pollMulti(Config config) {
        List<GroupQueueMessage<PK, D>> results = this.peekMulti(config);
        Scanner.of(results).map(BaseQueueMessage::getKey).flush(keys -> this.ackMulti((Collection<QueueMessageKey>)keys, config));
        return Scanner.of(results).map(GroupQueueMessage::getDatabeans).concat(Scanner::of).list();
    }

    private static /* synthetic */ GroupQueueMessage lambda$3(StringDatabeanCodec stringDatabeanCodec, DatabeanFielder databeanFielder, Supplier supplier, DirectoryQueueMessage directoryQueueMessage) {
        byte[] id = directoryQueueMessage.getBytesId();
        List databeans = stringDatabeanCodec.fromStringMulti(directoryQueueMessage.content, databeanFielder, supplier);
        return new GroupQueueMessage(id, databeans);
    }
}

