/*
 * Decompiled with CFR 0.152.
 */
package io.kareldb.kafka;

import io.kareldb.avro.AvroSchema;
import io.kareldb.avro.AvroUtils;
import io.kareldb.kafka.KafkaSchemaKey;
import io.kareldb.kafka.KafkaSchemaValue;
import io.kareldb.kafka.KafkaTable;
import io.kareldb.kafka.serialization.KafkaSchemaKeySerde;
import io.kareldb.kafka.serialization.KafkaSchemaValueSerde;
import io.kareldb.schema.ColumnDef;
import io.kareldb.schema.ColumnStrategy;
import io.kareldb.schema.RelDef;
import io.kcache.Cache;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.utils.Caches;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.Streams;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.ddl.SqlAlterTableExtension;
import org.apache.calcite.util.Pair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchema
extends io.kareldb.schema.Schema {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSchema.class);
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    private final Map<String, Table> tableMap = new ConcurrentHashMap<String, Table>();
    private Cache<KafkaSchemaKey, KafkaSchemaValue> schemaMap;

    @Override
    public Map<String, Table> getTableMap() {
        return this.tableMap;
    }

    @Override
    public void configure(Map<String, ?> operand) {
        super.configure(operand);
        HashMap configs = new HashMap(operand);
        String groupId = configs.getOrDefault("kafkacache.group.id", "kareldb-1");
        String topic = "_tables";
        configs.put("kafkacache.topic", topic);
        configs.put("kafkacache.group.id", groupId);
        configs.put("kafkacache.client.id", groupId + "-" + topic);
        KafkaCache schemaMap = new KafkaCache(new KafkaCacheConfig(configs), (Serde)new KafkaSchemaKeySerde(), (Serde)new KafkaSchemaValueSerde(), (CacheUpdateHandler)new TableUpdateHandler(), (Cache)new InMemoryCache());
        this.schemaMap = Caches.concurrentCache((Cache)schemaMap);
    }

    @Override
    public void init() {
        this.schemaMap.init();
        CompletableFuture.allOf((CompletableFuture[])this.tableMap.values().stream().map(t -> CompletableFuture.runAsync(((io.kareldb.schema.Table)t)::init)).toArray(CompletableFuture[]::new)).join();
    }

    @Override
    public void sync() {
        this.schemaMap.sync();
        CompletableFuture.allOf((CompletableFuture[])this.tableMap.values().stream().map(t -> CompletableFuture.runAsync(((io.kareldb.schema.Table)t)::sync)).toArray(CompletableFuture[]::new)).join();
    }

    public KafkaSchemaValue getSchemaValue(String name, int version) {
        KafkaSchemaKey key = new KafkaSchemaKey(name, version);
        return (KafkaSchemaValue)this.schemaMap.get((Object)key);
    }

    public List<KafkaSchemaValue> getLatestSchemaValuesDescending(String name) {
        KafkaSchemaKey key1 = new KafkaSchemaKey(name, 1);
        KafkaSchemaKey key2 = new KafkaSchemaKey(name, Integer.MAX_VALUE);
        List<KafkaSchemaValue> schemas = Streams.streamOf((Iterator)this.schemaMap.descendingCache().range((Object)key2, false, (Object)key1, true)).map(kv -> (KafkaSchemaValue)kv.value).collect(Collectors.toList());
        if (schemas.isEmpty()) {
            return schemas;
        }
        int epoch = schemas.get(0).getEpoch();
        return schemas.stream().filter(s -> s.getEpoch() == epoch).collect(Collectors.toList());
    }

    public KafkaSchemaValue getLatestSchemaValue(String name) {
        KafkaSchemaKey key1 = new KafkaSchemaKey(name, 1);
        KafkaSchemaKey key2 = new KafkaSchemaKey(name, Integer.MAX_VALUE);
        return Streams.streamOf((Iterator)this.schemaMap.range((Object)key1, true, (Object)key2, false)).reduce((e1, e2) -> e2).map(kv -> (KafkaSchemaValue)kv.value).orElse(null);
    }

    @Override
    public io.kareldb.schema.Table createTable(String tableName, Map<String, Object> operand, RelDef rowType) {
        KafkaSchemaValue latest = this.getLatestSchemaValue(tableName);
        if (latest != null && latest.getAction() != Action.DROP) {
            throw new IllegalStateException("Table " + tableName + " already exists");
        }
        int version = latest != null ? latest.getVersion() + 1 : 1;
        Schema avroSchema = AvroSchema.toAvroSchema(tableName, rowType);
        this.schemaMap.put((Object)new KafkaSchemaKey(tableName, version), (Object)new KafkaSchemaValue(tableName, version, avroSchema.toString(), Action.CREATE, version));
        this.schemaMap.flush();
        io.kareldb.schema.Table table = (io.kareldb.schema.Table)this.tableMap.get(tableName);
        table.init();
        return table;
    }

    @Override
    public void alterTable(String tableName, List<SqlAlterTableExtension.Action> actions, RelDef relDef) {
        if (relDef.getKeyFields().size() > 0) {
            throw new IllegalArgumentException("Key fields cannot be altered");
        }
        KafkaSchemaValue latest = this.getLatestSchemaValue(tableName);
        if (latest == null || latest.getAction() == Action.DROP) {
            throw new IllegalStateException("Table " + tableName + " does not exist");
        }
        int version = latest.getVersion();
        Schema avroSchema = AvroUtils.parseSchema(latest.getSchema());
        Pair<LinkedHashMap<String, ColumnDef>, List<String>> columnTypes = AvroSchema.toColumnDefs(avroSchema);
        LinkedHashMap oldColumnDefs = (LinkedHashMap)columnTypes.left;
        List oldKeyFields = (List)columnTypes.right;
        LinkedHashMap<String, ColumnDef> newColumnDefs = io.kareldb.schema.Schema.toColumnDefs(relDef.getRowType());
        for (Ord ordColumnType : Ord.zip(newColumnDefs.entrySet())) {
            int index = ordColumnType.i;
            Map.Entry entry = (Map.Entry)ordColumnType.e;
            ColumnDef columnDef = (ColumnDef)entry.getValue();
            SqlAlterTableExtension.Action action = actions.get(index);
            ColumnStrategy strategy = relDef.getStrategies().get(index);
            if (action == SqlAlterTableExtension.Action.DROP) {
                oldColumnDefs.remove(entry.getKey());
                continue;
            }
            oldColumnDefs.put(entry.getKey(), new ColumnDef(columnDef.getColumnType(), strategy, columnDef.getPrecision(), columnDef.getScale()));
        }
        RelDef newRowType = io.kareldb.schema.Schema.toRowType(oldColumnDefs, oldKeyFields);
        Schema newSchema = AvroSchema.toAvroSchema(tableName, newRowType);
        try {
            AvroUtils.checkCompatibility(newSchema, this.getLatestSchemaValuesDescending(tableName).stream().map(v -> AvroUtils.parseSchema(v.getSchema())).collect(Collectors.toList()));
        }
        catch (SchemaValidationException e) {
            LOG.error(e.getMessage());
            throw new IllegalArgumentException(e);
        }
        this.schemaMap.put((Object)new KafkaSchemaKey(tableName, version + 1), (Object)new KafkaSchemaValue(tableName, version + 1, newSchema.toString(), Action.ALTER, latest.getEpoch()));
        this.schemaMap.flush();
    }

    @Override
    public boolean dropTable(String tableName) {
        KafkaSchemaValue latest = this.getLatestSchemaValue(tableName);
        if (latest == null || latest.getAction() == Action.DROP) {
            return false;
        }
        boolean exists = this.tableMap.get(tableName) != null;
        int version = latest.getVersion();
        this.schemaMap.put((Object)new KafkaSchemaKey(tableName, version + 1), (Object)new KafkaSchemaValue(tableName, version + 1, null, Action.DROP, latest.getEpoch()));
        this.schemaMap.flush();
        return exists;
    }

    @Override
    public void close() throws IOException {
        for (Table table : this.tableMap.values()) {
            ((io.kareldb.schema.Table)table).close();
        }
        this.schemaMap.close();
    }

    public static enum Action {
        CREATE,
        ALTER,
        DROP;

    }

    private class TableUpdateHandler
    implements CacheUpdateHandler<KafkaSchemaKey, KafkaSchemaValue> {
        private TableUpdateHandler() {
        }

        public void handleUpdate(KafkaSchemaKey schemaKey, KafkaSchemaValue schemaValue, KafkaSchemaValue oldSchemaValue, TopicPartition tp, long offset, long timestamp) {
            String tableName = schemaKey.getTableName();
            switch (schemaValue.getAction()) {
                case CREATE: {
                    Schema avroSchema = AvroUtils.parseSchema(schemaValue.getSchema());
                    RelDef rowType = AvroSchema.toRowType(avroSchema);
                    HashMap<String, Object> configs = new HashMap<String, Object>(KafkaSchema.this.getConfigs());
                    configs.put("avroSchema", avroSchema);
                    configs.put("epoch", schemaValue.getEpoch());
                    KafkaTable table = new KafkaTable(KafkaSchema.this, avroSchema.getName(), rowType);
                    ((io.kareldb.schema.Table)table).configure(configs);
                    KafkaSchema.this.tableMap.put(tableName, table);
                    break;
                }
                case ALTER: {
                    Schema avroSchema = AvroUtils.parseSchema(schemaValue.getSchema());
                    RelDef rowType = AvroSchema.toRowType(avroSchema);
                    io.kareldb.schema.Table table = (io.kareldb.schema.Table)KafkaSchema.this.tableMap.get(tableName);
                    table.setRelDef(rowType);
                    break;
                }
                case DROP: {
                    io.kareldb.schema.Table table = (io.kareldb.schema.Table)KafkaSchema.this.tableMap.get(tableName);
                    if (table == null) break;
                    try {
                        table.close();
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    KafkaSchema.this.tableMap.remove(tableName);
                }
            }
        }
    }
}

