/*
 * Decompiled with CFR 0.152.
 */
package io.kareldb.kafka;

import com.google.common.collect.ImmutableMap;
import io.kareldb.avro.AvroKeyComparator;
import io.kareldb.avro.AvroUtils;
import io.kareldb.kafka.KafkaSchema;
import io.kareldb.kafka.KafkaSchemaValue;
import io.kareldb.kafka.serialization.KafkaKeySerde;
import io.kareldb.kafka.serialization.KafkaValueSerde;
import io.kareldb.schema.FilterableTable;
import io.kareldb.schema.RelDef;
import io.kareldb.version.VersionedCache;
import io.kareldb.version.VersionedValue;
import io.kcache.Cache;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.TransformedRawCache;
import io.kcache.utils.rocksdb.RocksDBCache;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.calcite.util.Pair;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

public class KafkaTable
extends FilterableTable {
    private KafkaCache<byte[], byte[]> rows;
    public static final GenericData GENERIC = new GenericData();

    public KafkaTable(io.kareldb.schema.Schema schema, String name, RelDef rowType) {
        super(schema, name, rowType);
    }

    @Override
    public VersionedCache getRows() {
        KafkaSchema schema = (KafkaSchema)this.getSchema();
        KafkaSchemaValue schemaValue = schema.getLatestSchemaValue(this.getName());
        Schema avroSchema = AvroUtils.parseSchema(schemaValue.getSchema());
        Pair<Schema, Schema> schemas = KafkaTable.getKeyValueSchemas(avroSchema);
        KafkaKeySerde keySerde = new KafkaKeySerde();
        KafkaValueSerde valueSerde = new KafkaValueSerde();
        keySerde.configure((Map<String, ?>)ImmutableMap.of((Object)"table", (Object)this, (Object)"avroSchema", (Object)schemas.left, (Object)"version", (Object)schemaValue.getVersion()), true);
        valueSerde.configure((Map<String, ?>)ImmutableMap.of((Object)"table", (Object)this, (Object)"avroSchema", (Object)schemas.right, (Object)"version", (Object)schemaValue.getVersion()), false);
        TransformedRawCache transformedCache = new TransformedRawCache((Serde)keySerde, (Serde)valueSerde, this.rows);
        return new VersionedCache(this.getName(), (Cache<Comparable[], NavigableMap<Long, VersionedValue>>)transformedCache);
    }

    @Override
    public void configure(Map<String, ?> operand) {
        super.configure(operand);
        if (this.getRowType() == null) {
            throw new IllegalStateException("Custom tables not yet supported for Kafka");
        }
        HashMap configs = new HashMap(operand);
        String groupId = configs.getOrDefault("kafkacache.group.id", "kareldb-1");
        int epoch = (Integer)configs.get("epoch");
        Schema avroSchema = (Schema)configs.get("avroSchema");
        Pair<Schema, Schema> schemas = KafkaTable.getKeyValueSchemas(avroSchema);
        String topic = this.getName() + "_" + epoch;
        configs.put("kafkacache.topic", topic);
        configs.put("kafkacache.group.id", groupId);
        configs.put("kafkacache.client.id", groupId + "-" + topic);
        String enableRocksDbStr = configs.getOrDefault("rocksdb.enable", "true");
        boolean enableRocksDb = Boolean.parseBoolean(enableRocksDbStr);
        String rootDir = configs.getOrDefault("rocksdb.root.dir", "/tmp");
        AvroKeyComparator cmp = new AvroKeyComparator((Schema)schemas.left);
        RocksDBCache cache = enableRocksDb ? new RocksDBCache(topic, "rocksdb", rootDir, Serdes.ByteArray(), Serdes.ByteArray(), (Comparator)cmp) : new InMemoryCache((Comparator)cmp);
        this.rows = new KafkaCache(new KafkaCacheConfig(configs), Serdes.ByteArray(), Serdes.ByteArray(), null, (Cache)cache);
    }

    @Override
    public void init() {
        this.rows.init();
    }

    @Override
    public void sync() {
        this.rows.sync();
    }

    @Override
    public void close() throws IOException {
        this.rows.close();
    }

    public static Pair<Schema, Schema> getKeyValueSchemas(Schema schema) {
        Schema.Field field;
        Schema.Field field22;
        SchemaBuilder.FieldAssembler keySchemaBuilder = SchemaBuilder.record((String)(schema.getName() + "_key")).fields();
        SchemaBuilder.FieldAssembler valueSchemaBuilder = SchemaBuilder.record((String)(schema.getName() + "_value")).fields();
        int size = schema.getFields().size();
        Schema.Field keyFields = new Schema.Field[size];
        Schema.Field[] valueFields = new Schema.Field[size];
        int valueIndex = 0;
        for (Schema.Field field22 : schema.getFields()) {
            Integer keyIndex = (Integer)field22.getObjectProp("sql.key.index");
            if (keyIndex != null) {
                keyFields[keyIndex.intValue()] = field22;
                continue;
            }
            valueFields[valueIndex++] = field22;
        }
        int keyCount = 0;
        field22 = keyFields;
        int keyIndex = ((Schema.Field[])field22).length;
        for (int i = 0; i < keyIndex && (field = field22[i]) != null; ++i) {
            keySchemaBuilder = keySchemaBuilder.name(field.name()).type(field.schema()).noDefault();
            ++keyCount;
        }
        valueIndex = 0;
        if (keyCount == 0) {
            field22 = valueFields[valueIndex++];
            keySchemaBuilder = keySchemaBuilder.name(field22.name()).type(field22.schema()).noDefault();
        }
        valueSchemaBuilder = valueSchemaBuilder.name("_version").type().longType().noDefault().name("_commit").type().longType().noDefault().name("_deleted").type().booleanType().noDefault();
        while (valueIndex < valueFields.length && (field22 = valueFields[valueIndex]) != null) {
            Object defaultVal;
            SchemaBuilder.GenericDefault builder = valueSchemaBuilder.name(field22.name()).type(field22.schema());
            valueSchemaBuilder = field22.hasDefaultValue() ? builder.withDefault((defaultVal = field22.defaultVal()) == JsonProperties.NULL_VALUE ? null : defaultVal) : builder.noDefault();
            ++valueIndex;
        }
        Schema keySchema = (Schema)keySchemaBuilder.endRecord();
        Schema valueSchema = (Schema)SchemaBuilder.array().items((Schema)valueSchemaBuilder.endRecord());
        return Pair.of((Object)keySchema, (Object)valueSchema);
    }

    static {
        GENERIC.addLogicalTypeConversion((Conversion)new Conversions.DecimalConversion());
    }
}

