/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.client.hbase.pool;

import io.datarouter.client.hbase.client.HBaseOptions;
import io.datarouter.client.hbase.config.DatarouterHBaseSettingRoot;
import io.datarouter.client.hbase.pool.HBaseTableExecutorService;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.client.ClientType;
import io.datarouter.storage.util.DatarouterCounters;
import io.datarouter.util.concurrent.ExecutorServiceTool;
import io.datarouter.util.concurrent.SemaphoreTool;
import io.datarouter.util.concurrent.ThreadTool;
import io.datarouter.util.mutable.MutableString;
import io.datarouter.util.number.NumberFormatter;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseTablePool {
    private static final Logger logger = LoggerFactory.getLogger(HBaseTablePool.class);
    private static final int DEFAULT_MIN_THREADS_PER_HTABLE = 1;
    private static final int DEFAULT_MAX_THREADS_PER_HTABLE = 1024;
    private static final boolean LOG_ACTIONS = true;
    private static final long LOG_SEMAPHORE_ACQUISITIONS_OVER_MS = 2000L;
    private static final long THROTTLE_INCONSISTENT_LOG_EVERY_X_MS = 500L;
    private final Connection connection;
    private final ClientId clientId;
    private final ClientType<?, ?> clientType;
    private final int maxHTables;
    private final int minThreadsPerHTable;
    private final int maxThreadsPerHTable;
    private final Semaphore htableSemaphore;
    private final BlockingQueue<HBaseTableExecutorService> executorServiceQueue;
    private final Map<Table, HBaseTableExecutorService> activeHTables;
    private volatile boolean shuttingDown;
    private volatile long lastLoggedWarning = 0L;

    public HBaseTablePool(HBaseOptions hbaseOptions, DatarouterHBaseSettingRoot datarouterHBaseSettingRoot, Connection connection, ClientId clientId, ClientType<?, ?> clientType) {
        this.connection = connection;
        this.clientId = clientId;
        this.clientType = clientType;
        this.maxHTables = hbaseOptions.maxHTables(clientId.getName(), datarouterHBaseSettingRoot.executorThreadCount);
        this.minThreadsPerHTable = hbaseOptions.minThreadsPerHTable(clientId.getName(), 1);
        this.maxThreadsPerHTable = hbaseOptions.maxThreadsPerHTable(clientId.getName(), 1024);
        this.htableSemaphore = new Semaphore(this.maxHTables);
        this.executorServiceQueue = new LinkedBlockingQueue<HBaseTableExecutorService>(this.maxHTables);
        this.activeHTables = new ConcurrentHashMap<Table, HBaseTableExecutorService>();
    }

    public Table checkOut(String tableName, MutableString progress) {
        if (this.shuttingDown) {
            return null;
        }
        long checkoutRequestStartMs = System.currentTimeMillis();
        this.checkConsistencyAndAcquireSempahore(tableName);
        this.setProgress(progress, "passed semaphore");
        HBaseTableExecutorService htableExecutorService = null;
        Table htable = null;
        try {
            DatarouterCounters.incClientTable(this.clientType, (String)"connection getHTable", (String)this.clientId.getName(), (String)tableName, (long)1L);
            while (true) {
                htableExecutorService = (HBaseTableExecutorService)this.executorServiceQueue.poll();
                this.setProgress(progress, "polled queue " + (htableExecutorService == null ? "null" : "success"));
                if (htableExecutorService == null) {
                    htableExecutorService = new HBaseTableExecutorService(this.minThreadsPerHTable, this.maxThreadsPerHTable);
                    this.setProgress(progress, "new HTableExecutorService()");
                    String counterName = "connection create HTable";
                    DatarouterCounters.incClientTable(this.clientType, (String)counterName, (String)this.clientId.getName(), (String)tableName, (long)1L);
                    this.logWithPoolInfo("created new HTableExecutorService", tableName);
                    break;
                }
                if (!htableExecutorService.isExpired()) {
                    DatarouterCounters.incClientTable(this.clientType, (String)"got pooled HTable executor", (String)this.clientId.getName(), (String)tableName, (long)1L);
                    break;
                }
                ExecutorServiceTool.shutdown((ExecutorService)htableExecutorService.getExec(), (Duration)Duration.ofDays(1L));
                this.logWithPoolInfo("discarded expired HTableExecutorService", tableName);
                htableExecutorService = null;
            }
            htable = this.connection.getTable(TableName.valueOf((String)tableName), (ExecutorService)htableExecutorService.getExec());
            this.setProgress(progress, "created HTable");
            this.activeHTables.put(htable, htableExecutorService);
            this.setProgress(progress, "added to activeHTables");
            this.recordSlowCheckout(System.currentTimeMillis() - checkoutRequestStartMs, tableName);
            this.logIfInconsistentCounts(true, tableName);
            return htable;
        }
        catch (Exception e) {
            if (htable != null) {
                this.activeHTables.remove(htable);
                this.setProgress(progress, "removed from activeHTables");
            }
            this.htableSemaphore.release();
            this.setProgress(progress, "released sempahore");
            throw new RuntimeException(e);
        }
    }

    public void checkIn(Table htable, boolean possiblyTarnished) {
        HBaseTableExecutorService htableExecutorService;
        String tableName = htable.getName().getNameAsString();
        try {
            htableExecutorService = this.activeHTables.remove(htable);
            if (htableExecutorService == null) {
                this.logWithPoolInfo("HTable returned to pool but HTableExecutorService not found", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable returned to pool but HTableExecutorService not found", (String)this.clientId.getName(), (String)tableName, (long)1L);
                return;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        try {
            htableExecutorService.markLastCheckinMs();
            htableExecutorService.purge();
            if (possiblyTarnished) {
                this.logWithPoolInfo("ThreadPoolExecutor possibly tarnished, discarding", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable executor possiblyTarnished", (String)this.clientId.getName(), (String)tableName, (long)1L);
                htableExecutorService.terminateAndBlockUntilFinished();
            } else if (htableExecutorService.isDyingOrDead(tableName)) {
                this.logWithPoolInfo("ThreadPoolExecutor not reusable, discarding", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable executor isDyingOrDead", (String)this.clientId.getName(), (String)tableName, (long)1L);
                htableExecutorService.terminateAndBlockUntilFinished();
            } else if (!htableExecutorService.isTaskQueueEmpty()) {
                this.logWithPoolInfo("ThreadPoolExecutor taskQueue not empty, discarding", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable executor taskQueue not empty", (String)this.clientId.getName(), (String)tableName, (long)1L);
                htableExecutorService.terminateAndBlockUntilFinished();
            } else if (!htableExecutorService.waitForActiveThreadsToSettle(tableName)) {
                this.logWithPoolInfo("active thread count would not settle to 0", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable executor pool active threads won't quit", (String)this.clientId.getName(), (String)tableName, (long)1L);
                htableExecutorService.terminateAndBlockUntilFinished();
            } else if (this.executorServiceQueue.offer(htableExecutorService)) {
                DatarouterCounters.incClientTable(this.clientType, (String)"connection HTable returned to pool", (String)this.clientId.getName(), (String)tableName, (long)1L);
            } else {
                this.logWithPoolInfo("checkIn HTable but queue already full, so close and discard", tableName);
                DatarouterCounters.incClientTable(this.clientType, (String)"HTable executor pool overflow", (String)this.clientId.getName(), (String)tableName, (long)1L);
                htableExecutorService.terminateAndBlockUntilFinished();
            }
        }
        finally {
            this.releaseSempahoreAndCheckConsistency(tableName);
        }
    }

    public Integer getTotalPoolSize() {
        return this.executorServiceQueue.size();
    }

    public void shutdown() {
        this.shuttingDown = true;
        if (this.htableSemaphoreActivePermits() != 0) {
            int sleepMs = 5000;
            logger.warn("Still {} active tables.  Sleeping {}ms", (Object)this.htableSemaphoreActivePermits(), (Object)5000);
            ThreadTool.sleepUnchecked((long)5000L);
        }
        for (HBaseTableExecutorService executorService : this.executorServiceQueue) {
            executorService.terminateAndBlockUntilFinished();
        }
        try {
            this.connection.close();
        }
        catch (IOException e) {
            logger.warn("", (Throwable)e);
        }
    }

    private void checkConsistencyAndAcquireSempahore(String tableName) {
        this.logIfInconsistentCounts(true, tableName);
        long startAquireMs = System.currentTimeMillis();
        SemaphoreTool.acquire((Semaphore)this.htableSemaphore);
        long acquireTimeMs = System.currentTimeMillis() - startAquireMs;
        if (acquireTimeMs > 2000L) {
            logger.warn("acquiring semaphore took " + NumberFormatter.addCommas((Number)acquireTimeMs) + "ms");
        }
    }

    private synchronized void releaseSempahoreAndCheckConsistency(String tableName) {
        this.htableSemaphore.release();
        this.logIfInconsistentCounts(false, tableName);
    }

    private int htableSemaphoreActivePermits() {
        return this.maxHTables - this.htableSemaphore.availablePermits();
    }

    private void setProgress(MutableString progress, String message) {
        if (progress == null) {
            return;
        }
        progress.set(message);
    }

    private void recordSlowCheckout(long checkOutDurationMs, String tableName) {
        if (checkOutDurationMs > 1L) {
            DatarouterCounters.incClientTable(this.clientType, (String)"connection open > 1ms", (String)this.clientId.getName(), (String)tableName, (long)1L);
        }
    }

    private boolean areCountsConsistent() {
        int numActivePermits = this.htableSemaphoreActivePermits();
        if (numActivePermits > this.maxHTables) {
            return false;
        }
        int numActiveHTables = this.activeHTables.size();
        if (numActiveHTables > this.maxHTables) {
            return false;
        }
        return numActiveHTables <= numActivePermits;
    }

    public void forceLogIfInconsistentCounts(boolean checkOut, String tableName) {
        this.innerLogIfInconsistentCounts(checkOut, tableName);
    }

    private void logIfInconsistentCounts(boolean checkOut, String tableName) {
        this.innerLogIfInconsistentCounts(checkOut, tableName);
    }

    private void innerLogIfInconsistentCounts(boolean checkOut, String tableName) {
        if (!this.areCountsConsistent()) {
            long msSinceLastLog = System.currentTimeMillis() - this.lastLoggedWarning;
            if (msSinceLastLog < 500L) {
                return;
            }
            this.logWithPoolInfo("inconsistent pool counts on " + (checkOut ? "checkOut" : "checkIn"), tableName);
        }
        this.lastLoggedWarning = System.currentTimeMillis();
    }

    private void logWithPoolInfo(String message, String tableName) {
        this.innerLogWithPoolInfo(message, tableName);
    }

    private void innerLogWithPoolInfo(String message, String tableName) {
        logger.info(String.valueOf(this.getPoolInfoLogMessage(tableName)) + ", " + message);
    }

    private String getPoolInfoLogMessage(String tableName) {
        return "max=" + this.maxHTables + ", blocked=" + this.htableSemaphore.getQueueLength() + ", idle=" + this.executorServiceQueue.size() + ", permits=" + this.htableSemaphoreActivePermits() + ", HTables=" + this.activeHTables.size() + ", client=" + this.clientId.getName() + ", table=" + tableName;
    }
}

