package org.apache.hadoop.mapreduce.v2.app;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.class */
public class TestRMContainerAllocator {
    static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
    static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory((Configuration) null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator$MyContainerAllocator.class */
    public static class MyContainerAllocator extends RMContainerAllocator {
        static final List<TaskAttemptContainerAssignedEvent> events = new ArrayList();
        static final List<TaskAttemptKillEvent> taskAttemptKillEvents = new ArrayList();
        static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents = new ArrayList();
        private MyResourceManager rm;

        private static AppContext createAppContext(ApplicationAttemptId applicationAttemptId, Job job) {
            AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
            Mockito.when(appContext.getApplicationID()).thenReturn(applicationAttemptId.getApplicationId());
            Mockito.when(appContext.getApplicationAttemptId()).thenReturn(applicationAttemptId);
            Mockito.when(appContext.getJob((JobId) Matchers.isA(JobId.class))).thenReturn(job);
            Mockito.when(appContext.getClusterInfo()).thenReturn(new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils.newResource(10240, 1)));
            Mockito.when(appContext.getEventHandler()).thenReturn(new EventHandler() { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.MyContainerAllocator.1
                public void handle(Event event) {
                    if (event instanceof TaskAttemptContainerAssignedEvent) {
                        MyContainerAllocator.events.add((TaskAttemptContainerAssignedEvent) event);
                    } else if (event instanceof TaskAttemptKillEvent) {
                        MyContainerAllocator.taskAttemptKillEvents.add((TaskAttemptKillEvent) event);
                    } else if (event instanceof JobUpdatedNodesEvent) {
                        MyContainerAllocator.jobUpdatedNodeEvents.add((JobUpdatedNodesEvent) event);
                    }
                }
            });
            return appContext;
        }

        private static AppContext createAppContext(ApplicationAttemptId applicationAttemptId, Job job, Clock clock) {
            AppContext createAppContext = createAppContext(applicationAttemptId, job);
            Mockito.when(createAppContext.getClock()).thenReturn(clock);
            return createAppContext;
        }

        private static ClientService createMockClientService() {
            ClientService clientService = (ClientService) Mockito.mock(ClientService.class);
            Mockito.when(clientService.getBindAddress()).thenReturn(NetUtils.createSocketAddr("localhost:4567"));
            Mockito.when(Integer.valueOf(clientService.getHttpPort())).thenReturn(890);
            return clientService;
        }

        MyContainerAllocator(MyResourceManager myResourceManager, ApplicationAttemptId applicationAttemptId, AppContext appContext) {
            super(createMockClientService(), appContext);
            this.rm = myResourceManager;
        }

        public MyContainerAllocator(MyResourceManager myResourceManager, Configuration configuration, ApplicationAttemptId applicationAttemptId, Job job) {
            super(createMockClientService(), createAppContext(applicationAttemptId, job));
            this.rm = myResourceManager;
            super.init(configuration);
            super.start();
        }

        public MyContainerAllocator(MyResourceManager myResourceManager, Configuration configuration, ApplicationAttemptId applicationAttemptId, Job job, Clock clock) {
            super(createMockClientService(), createAppContext(applicationAttemptId, job, clock));
            this.rm = myResourceManager;
            super.init(configuration);
            super.start();
        }

        protected AMRMProtocol createSchedulerProxy() {
            return this.rm.getApplicationMasterService();
        }

        protected void register() {
            super.register();
        }

        protected void unregister() {
        }

        protected Resource getMinContainerCapability() {
            return BuilderUtils.newResource(1024, 1);
        }

        protected Resource getMaxContainerCapability() {
            return BuilderUtils.newResource(10240, 1);
        }

        public void sendRequest(ContainerRequestEvent containerRequestEvent) {
            sendRequests(Arrays.asList(containerRequestEvent));
        }

        public void sendRequests(List<ContainerRequestEvent> list) {
            Iterator<ContainerRequestEvent> it = list.iterator();
            while (it.hasNext()) {
                super.handleEvent(it.next());
            }
        }

        public void sendFailure(ContainerFailedEvent containerFailedEvent) {
            super.handleEvent(containerFailedEvent);
        }

        public List<TaskAttemptContainerAssignedEvent> schedule() {
            try {
                super.heartbeat();
                ArrayList arrayList = new ArrayList(events);
                events.clear();
                return arrayList;
            } catch (Exception e) {
                TestRMContainerAllocator.LOG.error("error in heartbeat ", e);
                throw new YarnException(e);
            }
        }

        List<TaskAttemptKillEvent> getTaskAttemptKillEvents() {
            return taskAttemptKillEvents;
        }

        List<JobUpdatedNodesEvent> getJobUpdatedNodeEvents() {
            return jobUpdatedNodeEvents;
        }

        protected void startAllocatorThread() {
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator$MyFifoScheduler.class */
    private static class MyFifoScheduler extends FifoScheduler {
        static final /* synthetic */ boolean $assertionsDisabled;

        public MyFifoScheduler(RMContext rMContext) {
            try {
                reinitialize(new Configuration(), rMContext);
            } catch (IOException e) {
                TestRMContainerAllocator.LOG.info("add application failed with ", e);
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
            }
        }

        public synchronized Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> list, List<ContainerId> list2) {
            ArrayList arrayList = new ArrayList();
            for (ResourceRequest resourceRequest : list) {
                arrayList.add(BuilderUtils.newResourceRequest(resourceRequest.getPriority(), resourceRequest.getHostName(), resourceRequest.getCapability(), resourceRequest.getNumContainers()));
            }
            return super.allocate(applicationAttemptId, arrayList, list2);
        }

        static {
            $assertionsDisabled = !TestRMContainerAllocator.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator$MyResourceManager.class */
    public static class MyResourceManager extends MockRM {
        public MyResourceManager(Configuration configuration) {
            super(configuration);
        }

        protected Dispatcher createDispatcher() {
            return new DrainDispatcher();
        }

        protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
            return new EventHandler<SchedulerEvent>() { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.MyResourceManager.1
                public void handle(SchedulerEvent schedulerEvent) {
                    MyResourceManager.this.scheduler.handle(schedulerEvent);
                }
            };
        }

        protected ResourceScheduler createScheduler() {
            return new MyFifoScheduler(getRMContext());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator$RecalculateContainerAllocator.class */
    public static class RecalculateContainerAllocator extends MyContainerAllocator {
        public boolean recalculatedReduceSchedule;

        public RecalculateContainerAllocator(MyResourceManager myResourceManager, Configuration configuration, ApplicationAttemptId applicationAttemptId, Job job) {
            super(myResourceManager, configuration, applicationAttemptId, job);
            this.recalculatedReduceSchedule = false;
        }

        public void scheduleReduces(int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9, float f, float f2) {
            this.recalculatedReduceSchedule = true;
        }
    }

    @After
    public void tearDown() {
        DefaultMetricsSystem.shutdown();
    }

    @Test
    public void testSimple() throws Exception {
        LOG.info("Running testSimple");
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h2:1234", 10240);
        MockNM registerNode3 = myResourceManager.registerNode("h3:1234", 10240);
        dispatcher.await();
        ContainerRequestEvent createReq = createReq(newJobId, 1, 1024, new String[]{"h1"});
        myContainerAllocator.sendRequest(createReq);
        ContainerRequestEvent createReq2 = createReq(newJobId, 2, 1024, new String[]{"h2"});
        myContainerAllocator.sendRequest(createReq2);
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        ContainerRequestEvent createReq3 = createReq(newJobId, 3, 1024, new String[]{"h3"});
        myContainerAllocator.sendRequest(createReq3);
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule2.size());
        registerNode.nodeHeartbeat(true);
        registerNode2.nodeHeartbeat(true);
        registerNode3.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule3 = myContainerAllocator.schedule();
        dispatcher.await();
        checkAssignments(new ContainerRequestEvent[]{createReq, createReq2, createReq3}, schedule3, false);
    }

    @Test
    public void testMapNodeLocality() throws Exception {
        LOG.info("Running testMapNodeLocality");
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 3072);
        myResourceManager.registerNode("h2:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h3:1234", 1536);
        dispatcher.await();
        ContainerRequestEvent createReq = createReq(newJobId, 1, 1024, new String[]{"h1"});
        myContainerAllocator.sendRequest(createReq);
        ContainerRequestEvent createReq2 = createReq(newJobId, 2, 1024, new String[]{"h1"});
        myContainerAllocator.sendRequest(createReq2);
        ContainerRequestEvent createReq3 = createReq(newJobId, 3, 1024, new String[]{"h2"});
        myContainerAllocator.sendRequest(createReq3);
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        registerNode2.nodeHeartbeat(true);
        registerNode.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        checkAssignments(new ContainerRequestEvent[]{createReq, createReq2, createReq3}, schedule2, false);
        Iterator<TaskAttemptContainerAssignedEvent> it = schedule2.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TaskAttemptContainerAssignedEvent next = it.next();
            if (next.getTaskAttemptID().equals(createReq3.getAttemptID())) {
                schedule2.remove(next);
                Assert.assertTrue(next.getContainer().getNodeId().getHost().equals("h3"));
                break;
            }
        }
        checkAssignments(new ContainerRequestEvent[]{createReq, createReq2}, schedule2, true);
    }

    @Test
    public void testResource() throws Exception {
        LOG.info("Running testResource");
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h2:1234", 10240);
        MockNM registerNode3 = myResourceManager.registerNode("h3:1234", 10240);
        dispatcher.await();
        ContainerRequestEvent createReq = createReq(newJobId, 1, 1024, new String[]{"h1"});
        myContainerAllocator.sendRequest(createReq);
        ContainerRequestEvent createReq2 = createReq(newJobId, 2, 2048, new String[]{"h2"});
        myContainerAllocator.sendRequest(createReq2);
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        registerNode.nodeHeartbeat(true);
        registerNode2.nodeHeartbeat(true);
        registerNode3.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        checkAssignments(new ContainerRequestEvent[]{createReq, createReq2}, schedule2, false);
    }

    @Test
    public void testMapReduceScheduling() throws Exception {
        LOG.info("Running testMapReduceScheduling");
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 1024);
        MockNM registerNode2 = myResourceManager.registerNode("h2:1234", 10240);
        MockNM registerNode3 = myResourceManager.registerNode("h3:1234", 10240);
        dispatcher.await();
        ContainerRequestEvent createReq = createReq(newJobId, 1, 2048, new String[]{"h1", "h2"}, true, false);
        myContainerAllocator.sendRequest(createReq);
        myContainerAllocator.sendRequest(createReq(newJobId, 2, 3000, new String[]{"h1"}, false, true));
        ContainerRequestEvent createReq2 = createReq(newJobId, 3, 2048, new String[]{"h3"}, false, false);
        myContainerAllocator.sendRequest(createReq2);
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        registerNode.nodeHeartbeat(true);
        registerNode2.nodeHeartbeat(true);
        registerNode3.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        checkAssignments(new ContainerRequestEvent[]{createReq, createReq2}, schedule2, false);
        Iterator<TaskAttemptContainerAssignedEvent> it = schedule2.iterator();
        while (it.hasNext()) {
            Assert.assertFalse("Assigned count not correct", "h1".equals(it.next().getContainer().getNodeId().getHost()));
        }
    }

    @Test
    public void testReportedAppProgress() throws Exception {
        LOG.info("Running testReportedAppProgress");
        Configuration configuration = new Configuration();
        final MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher drainDispatcher = (DrainDispatcher) myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        drainDispatcher.await();
        MockNM registerNode = myResourceManager.registerNode("amNM:1234", 21504);
        registerNode.nodeHeartbeat(true);
        drainDispatcher.await();
        final ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        drainDispatcher.await();
        MRApp mRApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(appAttemptId, 0), 10, 10, false, getClass().getName(), true, 1) { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.1
            protected Dispatcher createDispatcher() {
                return new DrainDispatcher();
            }

            @Override // org.apache.hadoop.mapreduce.v2.app.MRApp
            protected ContainerAllocator createContainerAllocator(ClientService clientService, AppContext appContext) {
                return new MyContainerAllocator(myResourceManager, appAttemptId, appContext);
            }
        };
        Assert.assertEquals(0.0d, submitApp.getProgress(), 0.0d);
        mRApp.submit(configuration);
        Job job = (Job) ((Map.Entry) mRApp.getContext().getAllJobs().entrySet().iterator().next()).getValue();
        DrainDispatcher dispatcher = mRApp.getDispatcher();
        MyContainerAllocator containerAllocator = mRApp.getContainerAllocator();
        mRApp.waitForState(job, JobState.RUNNING);
        dispatcher.await();
        for (Task task : job.getTasks().values()) {
            if (task.getType() == TaskType.MAP) {
                mRApp.waitForInternalState((TaskAttemptImpl) task.getAttempts().values().iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
            }
        }
        dispatcher.await();
        containerAllocator.schedule();
        drainDispatcher.await();
        registerNode.nodeHeartbeat(true);
        drainDispatcher.await();
        containerAllocator.schedule();
        drainDispatcher.await();
        for (Task task2 : job.getTasks().values()) {
            if (task2.getType() == TaskType.MAP) {
                mRApp.waitForState(task2, TaskState.RUNNING);
            }
        }
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.05f, submitApp.getProgress(), 0.001f);
        Iterator<Task> it = job.getTasks().values().iterator();
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it, 1);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.095f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.095f, submitApp.getProgress(), 0.001f);
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it, 7);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.41f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.41f, submitApp.getProgress(), 0.001f);
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it, 2);
        containerAllocator.schedule();
        drainDispatcher.await();
        registerNode.nodeHeartbeat(true);
        drainDispatcher.await();
        containerAllocator.schedule();
        drainDispatcher.await();
        for (Task task3 : job.getTasks().values()) {
            if (task3.getType() == TaskType.REDUCE) {
                mRApp.waitForState(task3, TaskState.RUNNING);
            }
        }
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it, 2);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.59f, submitApp.getProgress(), 0.001f);
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it, 8);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.95f, submitApp.getProgress(), 0.001f);
    }

    private void finishNextNTasks(DrainDispatcher drainDispatcher, MockNM mockNM, MRApp mRApp, Iterator<Task> it, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            finishTask(drainDispatcher, mockNM, mRApp, it.next());
        }
    }

    private void finishTask(DrainDispatcher drainDispatcher, MockNM mockNM, MRApp mRApp, Task task) throws Exception {
        TaskAttempt taskAttempt = (TaskAttempt) task.getAttempts().values().iterator().next();
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(BuilderUtils.newContainerStatus(taskAttempt.getAssignedContainerID(), ContainerState.COMPLETE, "", 0));
        HashMap hashMap = new HashMap(1);
        hashMap.put(mRApp.getAppID(), arrayList);
        mockNM.nodeHeartbeat(hashMap, true);
        drainDispatcher.await();
        mRApp.getContext().getEventHandler().handle(new TaskAttemptEvent(taskAttempt.getID(), TaskAttemptEventType.TA_DONE));
        mRApp.waitForState(task, TaskState.SUCCEEDED);
    }

    @Test
    public void testReportedAppProgressWithOnlyMaps() throws Exception {
        LOG.info("Running testReportedAppProgressWithOnlyMaps");
        Configuration configuration = new Configuration();
        final MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher drainDispatcher = (DrainDispatcher) myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        drainDispatcher.await();
        MockNM registerNode = myResourceManager.registerNode("amNM:1234", 11264);
        registerNode.nodeHeartbeat(true);
        drainDispatcher.await();
        final ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        drainDispatcher.await();
        MRApp mRApp = new MRApp(appAttemptId, BuilderUtils.newContainerId(appAttemptId, 0), 10, 0, false, getClass().getName(), true, 1) { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.2
            protected Dispatcher createDispatcher() {
                return new DrainDispatcher();
            }

            @Override // org.apache.hadoop.mapreduce.v2.app.MRApp
            protected ContainerAllocator createContainerAllocator(ClientService clientService, AppContext appContext) {
                return new MyContainerAllocator(myResourceManager, appAttemptId, appContext);
            }
        };
        Assert.assertEquals(0.0d, submitApp.getProgress(), 0.0d);
        mRApp.submit(configuration);
        Job job = (Job) ((Map.Entry) mRApp.getContext().getAllJobs().entrySet().iterator().next()).getValue();
        DrainDispatcher dispatcher = mRApp.getDispatcher();
        MyContainerAllocator containerAllocator = mRApp.getContainerAllocator();
        mRApp.waitForState(job, JobState.RUNNING);
        dispatcher.await();
        Iterator it = job.getTasks().values().iterator();
        while (it.hasNext()) {
            mRApp.waitForInternalState((TaskAttemptImpl) ((Task) it.next()).getAttempts().values().iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
        }
        dispatcher.await();
        containerAllocator.schedule();
        drainDispatcher.await();
        registerNode.nodeHeartbeat(true);
        drainDispatcher.await();
        containerAllocator.schedule();
        drainDispatcher.await();
        Iterator it2 = job.getTasks().values().iterator();
        while (it2.hasNext()) {
            mRApp.waitForState((Task) it2.next(), TaskState.RUNNING);
        }
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.05f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.05f, submitApp.getProgress(), 0.001f);
        Iterator<Task> it3 = job.getTasks().values().iterator();
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it3, 1);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.14f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.14f, submitApp.getProgress(), 0.001f);
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it3, 5);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.59f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.59f, submitApp.getProgress(), 0.001f);
        finishNextNTasks(drainDispatcher, registerNode, mRApp, it3, 4);
        containerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals(0.95f, job.getProgress(), 0.001f);
        Assert.assertEquals(0.95f, submitApp.getProgress(), 0.001f);
    }

    @Test
    public void testUpdatedNodes() throws Exception {
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h2:1234", 10240);
        dispatcher.await();
        ContainerRequestEvent createReq = createReq(newJobId, 1, 1024, new String[]{"h1"});
        myContainerAllocator.sendRequest(createReq);
        TaskAttemptId attemptID = createReq.getAttemptID();
        TaskAttempt taskAttempt = (TaskAttempt) Mockito.mock(TaskAttempt.class);
        Mockito.when(taskAttempt.getNodeId()).thenReturn(registerNode.getNodeId());
        Task task = (Task) Mockito.mock(Task.class);
        Mockito.when(task.getAttempt(attemptID)).thenReturn(taskAttempt);
        Mockito.when(job.getTask(attemptID.getTaskId())).thenReturn(task);
        myContainerAllocator.schedule();
        dispatcher.await();
        registerNode.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals(1, schedule.size());
        Assert.assertEquals(registerNode.getNodeId(), schedule.get(0).getContainer().getNodeId());
        Assert.assertTrue(myContainerAllocator.getJobUpdatedNodeEvents().isEmpty());
        Assert.assertTrue(myContainerAllocator.getTaskAttemptKillEvents().isEmpty());
        registerNode.nodeHeartbeat(false);
        registerNode2.nodeHeartbeat(false);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals(0, schedule2.size());
        Assert.assertEquals(1, myContainerAllocator.getJobUpdatedNodeEvents().size());
        Assert.assertEquals(1, myContainerAllocator.getTaskAttemptKillEvents().size());
        Assert.assertEquals(2, myContainerAllocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
        Assert.assertEquals(attemptID, myContainerAllocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
        myContainerAllocator.getJobUpdatedNodeEvents().clear();
        myContainerAllocator.getTaskAttemptKillEvents().clear();
        List<TaskAttemptContainerAssignedEvent> schedule3 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals(0, schedule3.size());
        Assert.assertTrue(myContainerAllocator.getJobUpdatedNodeEvents().isEmpty());
        Assert.assertTrue(myContainerAllocator.getTaskAttemptKillEvents().isEmpty());
    }

    @Test
    public void testBlackListedNodes() throws Exception {
        LOG.info("Running testBlackListedNodes");
        Configuration configuration = new Configuration();
        configuration.setBoolean("yarn.app.mapreduce.am.job.node-blacklisting.enable", true);
        configuration.setInt("mapreduce.job.maxtaskfailures.per.tracker", 1);
        configuration.setInt("yarn.app.mapreduce.am.job.node-blacklisting.ignore-threshold-node-percent", -1);
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h2:1234", 10240);
        MockNM registerNode3 = myResourceManager.registerNode("h3:1234", 10240);
        dispatcher.await();
        myContainerAllocator.sendRequest(createReq(newJobId, 1, 1024, new String[]{"h1"}));
        myContainerAllocator.sendRequest(createReq(newJobId, 2, 1024, new String[]{"h2"}));
        myContainerAllocator.sendRequest(createReq(newJobId, 3, 1024, new String[]{"h3"}));
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 1, "h1", false));
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 1, "h2", false));
        registerNode.nodeHeartbeat(true);
        registerNode2.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule2.size());
        registerNode.nodeHeartbeat(false);
        registerNode2.nodeHeartbeat(false);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule3 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule3.size());
        registerNode3.nodeHeartbeat(true);
        dispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule4 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertTrue("No of assignments must be 3", schedule4.size() == 3);
        Iterator<TaskAttemptContainerAssignedEvent> it = schedule4.iterator();
        while (it.hasNext()) {
            Assert.assertTrue("Assigned container host not correct", "h3".equals(it.next().getContainer().getNodeId().getHost()));
        }
    }

    @Test
    public void testIgnoreBlacklisting() throws Exception {
        LOG.info("Running testIgnoreBlacklisting");
        Configuration configuration = new Configuration();
        configuration.setBoolean("yarn.app.mapreduce.am.job.node-blacklisting.enable", true);
        configuration.setInt("mapreduce.job.maxtaskfailures.per.tracker", 1);
        configuration.setInt("yarn.app.mapreduce.am.job.node-blacklisting.ignore-threshold-node-percent", 33);
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher drainDispatcher = (DrainDispatcher) myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        drainDispatcher.await();
        MockNM[] mockNMArr = new MockNM[10];
        int i = 0 + 1;
        mockNMArr[0] = registerNodeManager(0, myResourceManager, drainDispatcher);
        mockNMArr[0].nodeHeartbeat(true);
        drainDispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        drainDispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 1, 1024, new String[]{"h1"}, mockNMArr[0], drainDispatcher, myContainerAllocator).size());
        LOG.info("Failing container _1 on H1 (Node should be blacklisted and ignore blacklisting enabled");
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 1, "h1", false));
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 2, 1024, new String[]{"h1"}, mockNMArr[0], drainDispatcher, myContainerAllocator).size());
        int i2 = i + 1;
        mockNMArr[i] = registerNodeManager(i, myResourceManager, drainDispatcher);
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 3, 1024, new String[]{"h2"}, mockNMArr[1], drainDispatcher, myContainerAllocator).size());
        int i3 = i2 + 1;
        mockNMArr[i2] = registerNodeManager(i2, myResourceManager, drainDispatcher);
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 4, 1024, new String[]{"h3"}, mockNMArr[2], drainDispatcher, myContainerAllocator).size());
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 5, 1024, new String[]{"h1"}, mockNMArr[0], drainDispatcher, myContainerAllocator).size());
        int i4 = i3 + 1;
        mockNMArr[i3] = registerNodeManager(i3, myResourceManager, drainDispatcher);
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 6, 1024, new String[]{"h4"}, mockNMArr[3], drainDispatcher, myContainerAllocator).size());
        Assert.assertEquals("No of assignments must be 0", 0, getContainerOnHost(newJobId, 7, 1024, new String[]{"h1"}, mockNMArr[0], drainDispatcher, myContainerAllocator).size());
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 3, "h2", false));
        Assert.assertEquals("No of assignments must be 2", 2, getContainerOnHost(newJobId, 8, 1024, new String[]{"h1"}, mockNMArr[0], drainDispatcher, myContainerAllocator).size());
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 9, 1024, new String[]{"h2"}, mockNMArr[1], drainDispatcher, myContainerAllocator).size());
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 4, "h3", false));
        int i5 = i4 + 1;
        mockNMArr[i4] = registerNodeManager(i4, myResourceManager, drainDispatcher);
        Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 10, 1024, new String[]{"h3"}, mockNMArr[2], drainDispatcher, myContainerAllocator).size());
        for (int i6 = 0; i6 < 5; i6++) {
            int i7 = i5;
            int i8 = i5;
            i5++;
            mockNMArr[i7] = registerNodeManager(i8, myResourceManager, drainDispatcher);
            Assert.assertEquals("No of assignments must be 1", 1, getContainerOnHost(newJobId, 11 + i6, 1024, new String[]{String.valueOf(5 + i6)}, mockNMArr[4 + i6], drainDispatcher, myContainerAllocator).size());
        }
        Assert.assertEquals("No of assignments must be 0", 0, getContainerOnHost(newJobId, 20, 1024, new String[]{"h3"}, mockNMArr[2], drainDispatcher, myContainerAllocator).size());
    }

    private MockNM registerNodeManager(int i, MyResourceManager myResourceManager, DrainDispatcher drainDispatcher) throws Exception {
        MockNM registerNode = myResourceManager.registerNode("h" + (i + 1) + ":1234", 10240);
        drainDispatcher.await();
        return registerNode;
    }

    private List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId, int i, int i2, String[] strArr, MockNM mockNM, DrainDispatcher drainDispatcher, MyContainerAllocator myContainerAllocator) throws Exception {
        myContainerAllocator.sendRequest(createReq(jobId, i, i2, strArr));
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        drainDispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        mockNM.nodeHeartbeat(true);
        drainDispatcher.await();
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        drainDispatcher.await();
        return schedule2;
    }

    @Test
    public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
        LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
        Configuration configuration = new Configuration();
        configuration.setBoolean("yarn.app.mapreduce.am.job.node-blacklisting.enable", true);
        configuration.setInt("mapreduce.job.maxtaskfailures.per.tracker", 1);
        configuration.setInt("yarn.app.mapreduce.am.job.node-blacklisting.ignore-threshold-node-percent", -1);
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        myResourceManager.registerNode("amNM:1234", 2048).nodeHeartbeat(true);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        myResourceManager.sendAMLaunched(appAttemptId);
        dispatcher.await();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        MyContainerAllocator myContainerAllocator = new MyContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        MockNM registerNode = myResourceManager.registerNode("h1:1234", 10240);
        MockNM registerNode2 = myResourceManager.registerNode("h3:1234", 10240);
        dispatcher.await();
        LOG.info("Requesting 1 Containers _1 on H1");
        myContainerAllocator.sendRequest(createReq(newJobId, 1, 1024, new String[]{"h1"}));
        LOG.info("RM Heartbeat (to send the container requests)");
        List<TaskAttemptContainerAssignedEvent> schedule = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule.size());
        LOG.info("h1 Heartbeat (To actually schedule the containers)");
        registerNode.nodeHeartbeat(true);
        dispatcher.await();
        LOG.info("RM Heartbeat (To process the scheduled containers)");
        List<TaskAttemptContainerAssignedEvent> schedule2 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 1", 1, schedule2.size());
        LOG.info("Failing container _1 on H1 (should blacklist the node)");
        myContainerAllocator.sendFailure(createFailEvent(newJobId, 1, "h1", false));
        myContainerAllocator.sendRequest(createReq(newJobId, 1, 1024, new String[]{"h1"}, true, false));
        List<TaskAttemptContainerAssignedEvent> schedule3 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule3.size());
        myContainerAllocator.sendRequest(createReq(newJobId, 3, 1024, new String[]{"h1", "h3"}));
        LOG.info("h1 Heartbeat (To actually schedule the containers)");
        registerNode.nodeHeartbeat(true);
        dispatcher.await();
        LOG.info("RM Heartbeat (To process the scheduled containers)");
        List<TaskAttemptContainerAssignedEvent> schedule4 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule4.size());
        LOG.info("RM Heartbeat (To process the re-scheduled containers)");
        List<TaskAttemptContainerAssignedEvent> schedule5 = myContainerAllocator.schedule();
        dispatcher.await();
        Assert.assertEquals("No of assignments must be 0", 0, schedule5.size());
        LOG.info("h3 Heartbeat (To re-schedule the containers)");
        registerNode2.nodeHeartbeat(true);
        dispatcher.await();
        LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
        List<TaskAttemptContainerAssignedEvent> schedule6 = myContainerAllocator.schedule();
        dispatcher.await();
        for (TaskAttemptContainerAssignedEvent taskAttemptContainerAssignedEvent : schedule6) {
            LOG.info(taskAttemptContainerAssignedEvent.getTaskAttemptID() + " assgined to " + taskAttemptContainerAssignedEvent.getContainer().getId() + " with priority " + taskAttemptContainerAssignedEvent.getContainer().getPriority());
        }
        Assert.assertEquals("No of assignments must be 2", 2, schedule6.size());
        for (TaskAttemptContainerAssignedEvent taskAttemptContainerAssignedEvent2 : schedule6) {
            Assert.assertEquals("Assigned container " + taskAttemptContainerAssignedEvent2.getContainer().getId() + " host not correct", "h3", taskAttemptContainerAssignedEvent2.getContainer().getNodeId().getHost());
        }
    }

    private ContainerRequestEvent createReq(JobId jobId, int i, int i2, String[] strArr) {
        return createReq(jobId, i, i2, strArr, false, false);
    }

    private ContainerRequestEvent createReq(JobId jobId, int i, int i2, String[] strArr, boolean z, boolean z2) {
        TaskAttemptId newTaskAttemptId = MRBuilderUtils.newTaskAttemptId(z2 ? MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE) : MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP), i);
        Resource newResource = BuilderUtils.newResource(i2, 1);
        return z ? ContainerRequestEvent.createContainerRequestEventForFailedContainer(newTaskAttemptId, newResource) : new ContainerRequestEvent(newTaskAttemptId, newResource, strArr, new String[]{"/default-rack"});
    }

    private ContainerFailedEvent createFailEvent(JobId jobId, int i, String str, boolean z) {
        return new ContainerFailedEvent(MRBuilderUtils.newTaskAttemptId(z ? MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE) : MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP), i), str);
    }

    private void checkAssignments(ContainerRequestEvent[] containerRequestEventArr, List<TaskAttemptContainerAssignedEvent> list, boolean z) {
        Assert.assertNotNull("Container not assigned", list);
        Assert.assertEquals("Assigned count not correct", containerRequestEventArr.length, list.size());
        HashSet hashSet = new HashSet();
        Iterator<TaskAttemptContainerAssignedEvent> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getContainer().getId());
        }
        Assert.assertEquals("Assigned containers must be different", list.size(), hashSet.size());
        for (ContainerRequestEvent containerRequestEvent : containerRequestEventArr) {
            TaskAttemptContainerAssignedEvent taskAttemptContainerAssignedEvent = null;
            Iterator<TaskAttemptContainerAssignedEvent> it2 = list.iterator();
            while (true) {
                if (it2.hasNext()) {
                    TaskAttemptContainerAssignedEvent next = it2.next();
                    if (next.getTaskAttemptID().equals(containerRequestEvent.getAttemptID())) {
                        taskAttemptContainerAssignedEvent = next;
                        break;
                    }
                }
            }
            checkAssignment(containerRequestEvent, taskAttemptContainerAssignedEvent, z);
        }
    }

    private void checkAssignment(ContainerRequestEvent containerRequestEvent, TaskAttemptContainerAssignedEvent taskAttemptContainerAssignedEvent, boolean z) {
        Assert.assertNotNull("Nothing assigned to attempt " + containerRequestEvent.getAttemptID(), taskAttemptContainerAssignedEvent);
        Assert.assertEquals("assigned to wrong attempt", containerRequestEvent.getAttemptID(), taskAttemptContainerAssignedEvent.getTaskAttemptID());
        if (z) {
            Assert.assertTrue("Not assigned to requested host", Arrays.asList(containerRequestEvent.getHosts()).contains(taskAttemptContainerAssignedEvent.getContainer().getNodeId().getHost()));
        }
    }

    @Test
    public void testReduceScheduling() throws Exception {
        RMContainerAllocator rMContainerAllocator = (RMContainerAllocator) Mockito.mock(RMContainerAllocator.class);
        ((RMContainerAllocator) Mockito.doCallRealMethod().when(rMContainerAllocator)).scheduleReduces(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), Matchers.anyFloat(), Matchers.anyFloat());
        rMContainerAllocator.scheduleReduces(10, 1, 10, 0, 2, 0, 1024, 2048, 4, 0.5f, 0.2f);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator, Mockito.never())).setIsReduceStarted(true);
        rMContainerAllocator.scheduleReduces(10, 1, 0, 0, 10 - 1, 0, 1024, 2048, 4, 0.5f, 0.2f);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator, Mockito.never())).setIsReduceStarted(true);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator, Mockito.never())).scheduleAllReduces();
        rMContainerAllocator.scheduleReduces(10, 3, 10, 0, 2, 0, 1024, 2048, 4, 0.5f, 0.2f);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator, Mockito.times(1))).setIsReduceStarted(true);
        ((RMContainerAllocator) Mockito.doReturn(102400).when(rMContainerAllocator)).getMemLimit();
        rMContainerAllocator.scheduleReduces(10, 3, 10, 0, 2, 0, 1024, 2048, 4, 0.5f, 0.2f);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator)).rampUpReduces(Matchers.anyInt());
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator, Mockito.never())).rampDownReduces(Matchers.anyInt());
        ((RMContainerAllocator) Mockito.doReturn(10240).when(rMContainerAllocator)).getMemLimit();
        rMContainerAllocator.scheduleReduces(10, 3, 10, 3, 2, 0, 1024, 2048, 4, 0.5f, 0.2f);
        ((RMContainerAllocator) Mockito.verify(rMContainerAllocator)).rampDownReduces(Matchers.anyInt());
    }

    @Test
    public void testCompletedTasksRecalculateSchedule() throws Exception {
        LOG.info("Running testCompletedTasksRecalculateSchedule");
        Configuration configuration = new Configuration();
        MyResourceManager myResourceManager = new MyResourceManager(configuration);
        myResourceManager.start();
        DrainDispatcher dispatcher = myResourceManager.getRMContext().getDispatcher();
        RMApp submitApp = myResourceManager.submitApp(1024);
        dispatcher.await();
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        JobId newJobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
        Job job = (Job) Mockito.mock(Job.class);
        Mockito.when(job.getReport()).thenReturn(MRBuilderUtils.newJobReport(newJobId, "job", "user", JobState.RUNNING, 0L, 0L, 0L, 0.0f, 0.0f, 0.0f, 0.0f, "jobfile", (List) null, false, ""));
        ((Job) Mockito.doReturn(10).when(job)).getTotalMaps();
        ((Job) Mockito.doReturn(10).when(job)).getTotalReduces();
        ((Job) Mockito.doReturn(0).when(job)).getCompletedMaps();
        RecalculateContainerAllocator recalculateContainerAllocator = new RecalculateContainerAllocator(myResourceManager, configuration, appAttemptId, job);
        recalculateContainerAllocator.schedule();
        recalculateContainerAllocator.recalculatedReduceSchedule = false;
        recalculateContainerAllocator.schedule();
        Assert.assertFalse("Unexpected recalculate of reduce schedule", recalculateContainerAllocator.recalculatedReduceSchedule);
        ((Job) Mockito.doReturn(1).when(job)).getCompletedMaps();
        recalculateContainerAllocator.schedule();
        Assert.assertTrue("Expected recalculate of reduce schedule", recalculateContainerAllocator.recalculatedReduceSchedule);
    }

    @Test
    public void testHeartbeatHandler() throws Exception {
        LOG.info("Running testHeartbeatHandler");
        Configuration configuration = new Configuration();
        configuration.setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 1);
        ControlledClock controlledClock = new ControlledClock(new SystemClock());
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        Mockito.when(appContext.getClock()).thenReturn(controlledClock);
        Mockito.when(appContext.getApplicationID()).thenReturn(BuilderUtils.newApplicationId(1L, 1));
        RMContainerAllocator rMContainerAllocator = new RMContainerAllocator((ClientService) Mockito.mock(ClientService.class), appContext) { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.3
            protected void register() {
            }

            protected AMRMProtocol createSchedulerProxy() {
                return (AMRMProtocol) Mockito.mock(AMRMProtocol.class);
            }

            protected synchronized void heartbeat() throws Exception {
            }
        };
        rMContainerAllocator.init(configuration);
        rMContainerAllocator.start();
        controlledClock.setTime(5L);
        for (int i = 5000; rMContainerAllocator.getLastHeartbeatTime() != 5 && i > 0; i -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(5L, rMContainerAllocator.getLastHeartbeatTime());
        controlledClock.setTime(7L);
        for (int i2 = 5000; rMContainerAllocator.getLastHeartbeatTime() != 7 && i2 > 0; i2 -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(7L, rMContainerAllocator.getLastHeartbeatTime());
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        rMContainerAllocator.runOnNextHeartbeat(new Runnable() { // from class: org.apache.hadoop.mapreduce.v2.app.TestRMContainerAllocator.4
            @Override // java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
            }
        });
        controlledClock.setTime(8L);
        for (int i3 = 5000; rMContainerAllocator.getLastHeartbeatTime() != 8 && i3 > 0; i3 -= 10) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(8L, rMContainerAllocator.getLastHeartbeatTime());
        Assert.assertTrue(atomicBoolean.get());
    }

    public static void main(String[] strArr) throws Exception {
        TestRMContainerAllocator testRMContainerAllocator = new TestRMContainerAllocator();
        testRMContainerAllocator.testSimple();
        testRMContainerAllocator.testResource();
        testRMContainerAllocator.testMapReduceScheduling();
        testRMContainerAllocator.testReportedAppProgress();
        testRMContainerAllocator.testReportedAppProgressWithOnlyMaps();
        testRMContainerAllocator.testBlackListedNodes();
        testRMContainerAllocator.testCompletedTasksRecalculateSchedule();
    }
}
