package nstream.persist.kv;

import java.time.Clock;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.Stream;
import nstream.persist.api.PersistenceException;
import nstream.persist.api.PersistenceFactory;
import nstream.persist.api.cache.PersistenceApi;
import nstream.persist.api.kv.KvStoreApi;
import nstream.persist.api.kv.KvStoreFactory;
import nstream.persist.kv.commit.TimeoutCommit;
import nstream.persist.kv.state.StoreState;
import nstream.persist.kv.task.DbCommitTask;
import swim.concurrent.Stage;
import swim.structure.Value;

/* loaded from: input_file:nstream/persist/kv/KvAdapterFactory.class */
public final class KvAdapterFactory implements PersistenceFactory {
    private static final String IMPL_NAME = "KeyValueAdapter";

    public String getName() {
        return IMPL_NAME;
    }

    private static KvAdapterConfig loadConfig(Value value) throws PersistenceException {
        KvAdapterConfig kvAdapterConfig = (KvAdapterConfig) KvAdapterConfig.FORM.cast(value);
        if (kvAdapterConfig == null) {
            throw new PersistenceException("Invalid key-value configuration.");
        }
        return kvAdapterConfig;
    }

    private static KvStoreApi loadStore(String str, KvAdapterConfig kvAdapterConfig, Stage stage) throws PersistenceException {
        Stream map = ServiceLoader.load(KvStoreFactory.class).stream().map((v0) -> {
            return v0.get();
        });
        Optional flatMap = kvAdapterConfig.getImplName().map(str2 -> {
            return map.filter(kvStoreFactory -> {
                return Objects.equals(kvStoreFactory.getName(), str2);
            });
        }).or(() -> {
            return Optional.of(map);
        }).flatMap((v0) -> {
            return v0.findFirst();
        });
        if (flatMap.isPresent()) {
            return ((KvStoreFactory) flatMap.get()).openKvStore(str, kvAdapterConfig.getParameters().orElseGet(Value::absent), stage);
        }
        throw new PersistenceException(String.format("No suitable implementation of the %s service is available.", KvStoreFactory.class.getName()));
    }

    public PersistenceApi openPersistenceStore(String str, Value value, Stage stage) throws PersistenceException {
        KvAdapterConfig loadConfig = loadConfig(value);
        TimeoutCommit timeoutCommit = new TimeoutCommit(Clock.systemUTC(), loadConfig.getMinCommitInterval(), loadConfig.getMaxCommitInterval().orElse(-1L), loadConfig.getMinCommitSize());
        KvStoreApi loadStore = loadStore(str, loadConfig, stage);
        StoreState storeState = new StoreState(timeoutCommit);
        DbCommitTask dbCommitTask = new DbCommitTask(loadStore, storeState, loadConfig.getMaxBatchSize());
        stage.task(dbCommitTask);
        storeState.setTaskRef(dbCommitTask);
        return new KvAdapter(loadStore, storeState, loadConfig.getShutdownTimeout());
    }
}
