/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.client.hbase.balancer;

import io.datarouter.client.hbase.HBaseClientManager;
import io.datarouter.client.hbase.balancer.BaseHBaseRegionBalancer;
import io.datarouter.client.hbase.balancer.HBaseBalancerFactory;
import io.datarouter.client.hbase.balancer.HBaseRegionMovement;
import io.datarouter.client.hbase.cluster.DrRegionInfo;
import io.datarouter.client.hbase.cluster.DrRegionListFactory;
import io.datarouter.client.hbase.cluster.DrServerInfo;
import io.datarouter.client.hbase.cluster.DrServerList;
import io.datarouter.client.hbase.compaction.HBaseCompactionInfo;
import io.datarouter.client.hbase.config.DatarouterHBaseSettingRoot;
import io.datarouter.instrumentation.task.TaskTracker;
import io.datarouter.job.BaseJob;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.node.DatarouterNodes;
import io.datarouter.storage.node.type.physical.PhysicalNode;
import io.datarouter.util.collection.CollectionTool;
import io.datarouter.util.concurrent.ThreadTool;
import io.datarouter.util.timer.PhaseTimer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HBaseRegionBalancerJob
extends BaseJob {
    private static final Logger logger = LoggerFactory.getLogger(HBaseRegionBalancerJob.class);
    private static final long RECALCULATE_AFTER_MS = TimeUnit.MINUTES.toMillis(3L);
    @Inject
    private DatarouterHBaseSettingRoot hbaseSettings;
    @Inject
    private DatarouterNodes nodes;
    @Inject
    private HBaseBalancerFactory balancerFactory;
    @Inject
    private HBaseCompactionInfo compactionInfo;
    @Inject
    private DrRegionListFactory drRegionListFactory;
    @Inject
    private HBaseClientManager hBaseClientManager;

    public void run(TaskTracker tracker) {
        logger.warn("starting Balancer " + System.identityHashCode((Object)this));
        for (ClientId clientId : this.compactionInfo.getManagedClientIds()) {
            while (!this.balanceClient(tracker, clientId)) {
                if (!tracker.shouldStop()) continue;
                return;
            }
        }
    }

    private boolean balanceClient(TaskTracker tracker, ClientId clientId) {
        List<HBaseRegionMovement> movementsForServer;
        logger.warn("calculating region movements for client {}", (Object)clientId.getName());
        Admin admin = this.hBaseClientManager.getAdmin(clientId);
        ArrayList<HBaseRegionMovement> movements = new ArrayList<HBaseRegionMovement>();
        DrServerList serverList = new DrServerList(admin);
        List tableNames = this.nodes.getTableNamesForClient(clientId.getName());
        Collections.sort(tableNames);
        int tableCounter = 0;
        for (String tableName : tableNames) {
            DrRegionListFactory.DrRegionList regionList;
            if (tracker.shouldStop()) {
                return false;
            }
            PhaseTimer timer = new PhaseTimer("generating movements for table " + tableName + " #" + ++tableCounter + "/" + tableNames.size());
            ArrayList<HBaseRegionMovement> tableMovements = new ArrayList<HBaseRegionMovement>();
            PhysicalNode physicalNodeForTable = this.nodes.getPhysicalNodeForClientAndTable(clientId.getName(), tableName);
            BaseHBaseRegionBalancer balancer = this.balancerFactory.getBalancerForTable(clientId, tableName);
            try {
                regionList = this.drRegionListFactory.make(clientId, serverList, tableName, physicalNodeForTable, balancer, this.compactionInfo);
            }
            catch (Exception e) {
                logger.error("skipping table " + tableName, (Throwable)e);
                continue;
            }
            for (DrRegionInfo<?> region : regionList.getRegions()) {
                if (region.isNotOnAnyServer()) {
                    logger.warn("region {} is not currently hosted, so not attempting to move it", (Object)region.getRegion().getRegionNameAsString());
                    continue;
                }
                if (region.isOnCorrectServer()) continue;
                HBaseRegionMovement movement = new HBaseRegionMovement(tableName, region.getRegion().getEncodedName(), region.getHBaseServerName(), region.getBalancerDestinationHBaseServerName());
                tableMovements.add(movement);
            }
            timer.add("generated " + tableMovements.size() + " movements");
            logger.warn(timer.toString());
            movements.addAll(tableMovements);
        }
        long iterationStartTimeMs = System.currentTimeMillis();
        Map<ServerName, List<HBaseRegionMovement>> movementsByCurrentServer = HBaseRegionMovement.getByCurrentServer(movements);
        int clusterMovementCounter = 0;
        logger.warn("processing {} total movements", (Object)movements.size());
        for (DrServerInfo serverInfo : serverList.getServersSortedByDescendingLoad()) {
            movementsForServer = movementsByCurrentServer.get(serverInfo.getServerName());
            logger.warn("expecting {} movements for server {}", (Object)CollectionTool.nullSafeSize(movementsForServer), (Object)serverInfo.getServerName());
        }
        for (DrServerInfo serverInfo : serverList.getServersSortedByDescendingLoad()) {
            movementsForServer = movementsByCurrentServer.getOrDefault(serverInfo.getServerName(), List.of());
            logger.warn("processing {} movements for server {}", (Object)CollectionTool.nullSafeSize(movementsForServer), (Object)serverInfo.getServerName());
            int serverMovementCounter = 0;
            for (HBaseRegionMovement movement : movementsForServer) {
                logger.warn("moving region {}/{} ({}/{}): {}", new Object[]{++serverMovementCounter, movementsForServer.size(), ++clusterMovementCounter, movements.size(), movement});
                try {
                    admin.move(movement.getRegionNameBytes(), movement.getDestinationServerNameBytes());
                }
                catch (Exception ex) {
                    logger.error("exception moving region, skipping", (Throwable)ex);
                }
                ThreadTool.sleepUnchecked((long)this.hbaseSettings.getSleepBetweenRegionMovementMs());
                if (tracker.increment().shouldStop()) {
                    return false;
                }
                if (System.currentTimeMillis() - iterationStartTimeMs <= RECALCULATE_AFTER_MS) continue;
                logger.warn("suspending to check for new servers", (Object)RECALCULATE_AFTER_MS);
                return false;
            }
            logger.warn("processed {} movements for server {}", (Object)serverMovementCounter, (Object)serverInfo.getServerName());
        }
        return true;
    }
}

