/*
 * Decompiled with CFR 0.152.
 */
package io.delta.storage;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.Condition;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.QueryRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import io.delta.storage.BaseExternalLogStore;
import io.delta.storage.ExternalCommitEntry;
import io.delta.storage.utils.ReflectionUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3DynamoDBLogStore
extends BaseExternalLogStore {
    private static final Logger LOG = LoggerFactory.getLogger(S3DynamoDBLogStore.class);
    public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore";
    public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore";
    public static final String DDB_CLIENT_TABLE = "ddb.tableName";
    public static final String DDB_CLIENT_REGION = "ddb.region";
    public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
    public static final String DDB_CREATE_TABLE_RCU = "provisionedThroughput.rcu";
    public static final String DDB_CREATE_TABLE_WCU = "provisionedThroughput.wcu";
    private static final String ATTR_TABLE_PATH = "tablePath";
    private static final String ATTR_FILE_NAME = "fileName";
    private static final String ATTR_TEMP_PATH = "tempPath";
    private static final String ATTR_COMPLETE = "complete";
    private static final String ATTR_COMMIT_TIME = "commitTime";
    private final AmazonDynamoDBClient client;
    private final String tableName;
    private final String credentialsProviderName;
    private final String regionName;

    public S3DynamoDBLogStore(Configuration configuration) throws IOException {
        super(configuration);
        this.tableName = S3DynamoDBLogStore.getParam(configuration, DDB_CLIENT_TABLE, "delta_log");
        this.credentialsProviderName = S3DynamoDBLogStore.getParam(configuration, DDB_CLIENT_CREDENTIALS_PROVIDER, "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
        this.regionName = S3DynamoDBLogStore.getParam(configuration, DDB_CLIENT_REGION, "us-east-1");
        LOG.info("using tableName {}", (Object)this.tableName);
        LOG.info("using credentialsProviderName {}", (Object)this.credentialsProviderName);
        LOG.info("using regionName {}", (Object)this.regionName);
        this.client = this.getClient();
        this.tryEnsureTableExists(configuration);
    }

    @Override
    protected void putExternalEntry(ExternalCommitEntry externalCommitEntry, boolean bl) throws IOException {
        try {
            LOG.debug(String.format("putItem %s, overwrite: %s", externalCommitEntry, bl));
            this.client.putItem(this.createPutItemRequest(externalCommitEntry, bl));
        }
        catch (ConditionalCheckFailedException conditionalCheckFailedException) {
            LOG.debug(conditionalCheckFailedException.toString());
            throw new FileAlreadyExistsException(externalCommitEntry.absoluteFilePath().toString());
        }
    }

    @Override
    protected Optional<ExternalCommitEntry> getExternalEntry(String string, String string2) {
        ConcurrentHashMap<String, AttributeValue> concurrentHashMap = new ConcurrentHashMap<String, AttributeValue>();
        concurrentHashMap.put(ATTR_TABLE_PATH, new AttributeValue(string));
        concurrentHashMap.put(ATTR_FILE_NAME, new AttributeValue(string2));
        Map map = this.client.getItem(new GetItemRequest(this.tableName, concurrentHashMap).withConsistentRead(Boolean.valueOf(true))).getItem();
        return map != null ? Optional.of(this.dbResultToCommitEntry(map)) : Optional.empty();
    }

    @Override
    protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path path) {
        ConcurrentHashMap<String, Condition> concurrentHashMap = new ConcurrentHashMap<String, Condition>();
        concurrentHashMap.put(ATTR_TABLE_PATH, new Condition().withComparisonOperator(ComparisonOperator.EQ).withAttributeValueList(new AttributeValue[]{new AttributeValue(path.toString())}));
        List list = this.client.query(new QueryRequest(this.tableName).withConsistentRead(Boolean.valueOf(true)).withScanIndexForward(Boolean.valueOf(false)).withLimit(Integer.valueOf(1)).withKeyConditions(concurrentHashMap)).getItems();
        if (list.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(this.dbResultToCommitEntry((Map)list.get(0)));
    }

    private ExternalCommitEntry dbResultToCommitEntry(Map<String, AttributeValue> map) {
        AttributeValue attributeValue = map.get(ATTR_COMMIT_TIME);
        return new ExternalCommitEntry(new Path(map.get(ATTR_TABLE_PATH).getS()), map.get(ATTR_FILE_NAME).getS(), map.get(ATTR_TEMP_PATH).getS(), map.get(ATTR_COMPLETE).getS().equals("true"), attributeValue != null ? Long.valueOf(Long.parseLong(attributeValue.getN())) : null);
    }

    private PutItemRequest createPutItemRequest(ExternalCommitEntry externalCommitEntry, boolean bl) {
        ConcurrentHashMap<String, AttributeValue> concurrentHashMap = new ConcurrentHashMap<String, AttributeValue>();
        concurrentHashMap.put(ATTR_TABLE_PATH, new AttributeValue(externalCommitEntry.tablePath.toString()));
        concurrentHashMap.put(ATTR_FILE_NAME, new AttributeValue(externalCommitEntry.fileName));
        concurrentHashMap.put(ATTR_TEMP_PATH, new AttributeValue(externalCommitEntry.tempPath));
        concurrentHashMap.put(ATTR_COMPLETE, new AttributeValue().withS(Boolean.toString(externalCommitEntry.complete)));
        if (externalCommitEntry.complete) {
            concurrentHashMap.put(ATTR_COMMIT_TIME, new AttributeValue().withN(externalCommitEntry.commitTime.toString()));
        }
        PutItemRequest putItemRequest = new PutItemRequest(this.tableName, concurrentHashMap);
        if (!bl) {
            ConcurrentHashMap<String, ExpectedAttributeValue> concurrentHashMap2 = new ConcurrentHashMap<String, ExpectedAttributeValue>();
            concurrentHashMap2.put(ATTR_FILE_NAME, new ExpectedAttributeValue(Boolean.valueOf(false)));
            putItemRequest.withExpected(concurrentHashMap2);
        }
        return putItemRequest;
    }

    private void tryEnsureTableExists(Configuration configuration) throws IOException {
        boolean bl = false;
        for (int i = 0; i < 20; ++i) {
            String string = "CREATING";
            try {
                DescribeTableResult describeTableResult = this.client.describeTable(this.tableName);
                TableDescription tableDescription = describeTableResult.getTable();
                string = tableDescription.getTableStatus();
            }
            catch (ResourceNotFoundException resourceNotFoundException) {
                long l = Long.parseLong(S3DynamoDBLogStore.getParam(configuration, DDB_CREATE_TABLE_RCU, "5"));
                long l2 = Long.parseLong(S3DynamoDBLogStore.getParam(configuration, DDB_CREATE_TABLE_WCU, "5"));
                LOG.info("DynamoDB table `{}` in region `{}` does not exist. Creating it now with provisioned throughput of {} RCUs and {} WCUs.", new Object[]{this.tableName, this.regionName, l, l2});
                try {
                    this.client.createTable(Arrays.asList(new AttributeDefinition(ATTR_TABLE_PATH, ScalarAttributeType.S), new AttributeDefinition(ATTR_FILE_NAME, ScalarAttributeType.S)), this.tableName, Arrays.asList(new KeySchemaElement(ATTR_TABLE_PATH, KeyType.HASH), new KeySchemaElement(ATTR_FILE_NAME, KeyType.RANGE)), new ProvisionedThroughput(Long.valueOf(l), Long.valueOf(l2)));
                    bl = true;
                }
                catch (ResourceInUseException resourceInUseException) {
                    // empty catch block
                }
            }
            if (string.equals("ACTIVE")) {
                if (bl) {
                    LOG.info("Successfully created DynamoDB table `{}`", (Object)this.tableName);
                    break;
                }
                LOG.info("Table `{}` already exists", (Object)this.tableName);
                break;
            }
            if (string.equals("CREATING")) {
                LOG.info("Waiting for `{}` table creation", (Object)this.tableName);
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException interruptedException) {
                    throw new InterruptedIOException(interruptedException.getMessage());
                }
            }
            LOG.error("table `{}` status: {}", (Object)this.tableName, (Object)string);
            break;
        }
    }

    private AmazonDynamoDBClient getClient() throws IOException {
        try {
            AWSCredentialsProvider aWSCredentialsProvider = ReflectionUtils.createAwsCredentialsProvider(this.credentialsProviderName, this.initHadoopConf());
            AmazonDynamoDBClient amazonDynamoDBClient = new AmazonDynamoDBClient(aWSCredentialsProvider);
            amazonDynamoDBClient.setRegion(Region.getRegion((Regions)Regions.fromName((String)this.regionName)));
            return amazonDynamoDBClient;
        }
        catch (ReflectiveOperationException reflectiveOperationException) {
            throw new IOException(reflectiveOperationException);
        }
    }

    protected static String getParam(Configuration configuration, String string, String string2) {
        String string3 = String.format("%s.%s", SPARK_CONF_PREFIX, string);
        String string4 = String.format("%s.%s", BASE_CONF_PREFIX, string);
        String string5 = configuration.get(string3);
        String string6 = configuration.get(string4);
        if (string5 != null && string6 != null && !string5.equals(string6)) {
            throw new IllegalArgumentException(String.format("Configuration properties `%s=%s` and `%s=%s` have different values. Please set only one.", string3, string5, string4, string6));
        }
        if (string5 != null) {
            return string5;
        }
        if (string6 != null) {
            return string6;
        }
        return string2;
    }
}

