/*
 * Decompiled with CFR 0.152.
 */
package io.kareldb;

import io.kareldb.KarelDbConfig;
import io.kareldb.kafka.KafkaSchema;
import io.kareldb.schema.Schema;
import io.kareldb.transaction.KarelDbCommitTable;
import io.kareldb.transaction.KarelDbTimestampStorage;
import io.kareldb.transaction.client.KarelDbTransactionManager;
import io.kcache.Cache;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.utils.Caches;
import io.kcache.utils.InMemoryCache;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.omid.transaction.RollbackException;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.transaction.TransactionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KarelDbEngine
implements Configurable,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KarelDbEngine.class);
    private KarelDbConfig config;
    private Cache<Long, Long> commits;
    private Cache<Long, Long> timestamps;
    private KarelDbTransactionManager transactionManager;
    private Schema schema;
    private final AtomicBoolean initialized = new AtomicBoolean();
    private static KarelDbEngine INSTANCE;

    public static synchronized KarelDbEngine getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new KarelDbEngine();
        }
        return INSTANCE;
    }

    public static synchronized void closeInstance() {
        if (INSTANCE != null) {
            try {
                INSTANCE.close();
            }
            catch (IOException e) {
                LOG.warn("Could not close engine", (Throwable)e);
            }
            INSTANCE = null;
        }
    }

    private KarelDbEngine() {
    }

    public void configure(Map<String, ?> configs) {
        this.configure(new KarelDbConfig(configs));
    }

    public void configure(KarelDbConfig config) {
        this.config = config;
    }

    public void init() {
        String topic;
        Map configs = this.config.originals();
        String bootstrapServers = (String)configs.get("kafkacache.bootstrap.servers");
        String groupId = configs.getOrDefault("kafkacache.group.id", "kareldb-1");
        if (bootstrapServers != null) {
            topic = "_commits";
            configs.put("kafkacache.topic", topic);
            configs.put("kafkacache.group.id", groupId);
            configs.put("kafkacache.client.id", groupId + "-" + topic);
            this.commits = new KafkaCache(new KafkaCacheConfig(configs), Serdes.Long(), Serdes.Long(), null, (Cache)new InMemoryCache());
        } else {
            this.commits = new InMemoryCache();
        }
        this.commits = Caches.concurrentCache(this.commits);
        this.commits.init();
        if (bootstrapServers != null) {
            topic = "_timestamps";
            configs.put("kafkacache.topic", topic);
            configs.put("kafkacache.group.id", groupId);
            configs.put("kafkacache.client.id", groupId + "-" + topic);
            this.timestamps = new KafkaCache(new KafkaCacheConfig(configs), Serdes.Long(), Serdes.Long(), null, (Cache)new InMemoryCache());
        } else {
            this.timestamps = new InMemoryCache();
        }
        this.timestamps = Caches.concurrentCache(this.timestamps);
        this.timestamps.init();
        KarelDbCommitTable commitTable = new KarelDbCommitTable(this.commits);
        KarelDbTimestampStorage timestampStorage = new KarelDbTimestampStorage(this.timestamps);
        this.transactionManager = KarelDbTransactionManager.newInstance(commitTable, timestampStorage);
        String schemaClass = configs.getOrDefault("kind", KafkaSchema.class.getName());
        this.schema = (Schema)KarelDbEngine.getConfiguredInstance(schemaClass, configs);
        this.schema.init();
        boolean isInitialized = this.initialized.compareAndSet(false, true);
        if (!isInitialized) {
            throw new IllegalStateException("Illegal state while initializing engine. Engine was already initialized");
        }
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public void sync() {
        this.commits.sync();
        this.timestamps.sync();
        this.schema.sync();
    }

    public Schema getSchema() {
        return this.schema;
    }

    public KarelDbTransactionManager getTxManager() {
        return this.transactionManager;
    }

    public Transaction beginTx() throws TransactionException {
        return this.transactionManager.begin();
    }

    public void commitTx(Transaction tx) throws RollbackException, TransactionException {
        this.transactionManager.commit(tx);
    }

    public void rollbackTx(Transaction tx) throws TransactionException {
        this.transactionManager.rollback(tx);
    }

    @Override
    public void close() throws IOException {
        this.transactionManager.close();
        this.schema.close();
        this.timestamps.close();
        this.commits.close();
    }

    public static <T> T getConfiguredInstance(String className, Map<String, ?> configs) {
        try {
            Class<?> cls = Class.forName(className);
            if (cls == null) {
                return null;
            }
            Object o = Utils.newInstance(cls);
            if (o instanceof Configurable) {
                ((Configurable)o).configure(configs);
            }
            return (T)cls.cast(o);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}

