/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobQueueTaskScheduler;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskScheduler;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestClusterStatus {
    private static String[] trackers = new String[]{"tracker_tracker1:1000", "tracker_tracker2:1000", "tracker_tracker3:1000"};
    private JobTracker jobTracker;
    private static final int mapSlotsPerTracker = 4;
    private static final int reduceSlotsPerTracker = 2;
    private MiniMRCluster mr;
    private JobClient client;
    private short responseId;
    private static FakeJobInProgress fakeJob;
    private static FakeTaskScheduler scheduler;

    @Before
    public void setUp() throws Exception {
        this.responseId = 1;
        JobConf conf = new JobConf();
        conf.setClass("mapred.jobtracker.taskScheduler", FakeTaskScheduler.class, TaskScheduler.class);
        this.mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
        this.jobTracker = this.mr.getJobTrackerRunner().getJobTracker();
        for (String tracker : trackers) {
            TestClusterStatus.establishFirstContact(this.jobTracker, tracker);
        }
        this.client = new JobClient(this.mr.createJobConf());
    }

    @After
    public void tearDown() throws Exception {
        this.client.close();
        this.mr.shutdown();
        fakeJob = null;
        scheduler = null;
        this.client = null;
        this.mr = null;
        this.jobTracker = null;
    }

    static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, String tracker, short responseId) throws IOException {
        if (status == null) {
            status = new TaskTrackerStatus(tracker, JobInProgress.convertTrackerNameToHostName((String)tracker));
        }
        jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
        responseId = (short)(responseId + 1);
        return responseId;
    }

    static void establishFirstContact(JobTracker jt, String tracker) throws IOException {
        TestClusterStatus.sendHeartBeat(jt, null, true, false, tracker, (short)0);
    }

    private TaskTrackerStatus getTTStatus(String trackerName, List<TaskStatus> taskStatuses) {
        return new TaskTrackerStatus(trackerName, "http", JobInProgress.convertTrackerNameToHostName((String)trackerName), 0, taskStatuses, 0, 0, 4, 2);
    }

    @Test
    public void testClusterMetrics() throws IOException, InterruptedException {
        Assert.assertEquals((String)"tasktracker count doesn't match", (long)trackers.length, (long)this.client.getClusterStatus().getTaskTrackers());
        ArrayList<TaskStatus> list = new ArrayList<TaskStatus>();
        int mapSlotsPerTask = 2;
        this.addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
        int reduceSlotsPerTask = 1;
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.RUNNING);
        this.sendHeartbeats(list);
        ClusterMetrics metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((String)"occupied map slots do not match", (long)mapSlotsPerTask, (long)metrics.getOccupiedMapSlots());
        Assert.assertEquals((String)"occupied reduce slots do not match", (long)reduceSlotsPerTask, (long)metrics.getOccupiedReduceSlots());
        Assert.assertEquals((String)"map slot capacities do not match", (long)(4 * trackers.length), (long)metrics.getMapSlotCapacity());
        Assert.assertEquals((String)"reduce slot capacities do not match", (long)(2 * trackers.length), (long)metrics.getReduceSlotCapacity());
        Assert.assertEquals((String)"running map tasks do not match", (long)1L, (long)metrics.getRunningMaps());
        Assert.assertEquals((String)"running reduce tasks do not match", (long)1L, (long)metrics.getRunningReduces());
        ClusterStatus stat = this.client.getClusterStatus();
        Assert.assertEquals((String)"running map tasks do not match", (long)1L, (long)stat.getMapTasks());
        Assert.assertEquals((String)"running reduce tasks do not match", (long)1L, (long)stat.getReduceTasks());
        Assert.assertEquals((String)"map slot capacities do not match", (long)(4 * trackers.length), (long)stat.getMaxMapTasks());
        Assert.assertEquals((String)"reduce slot capacities do not match", (long)(2 * trackers.length), (long)stat.getMaxReduceTasks());
        list.clear();
        this.addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.RUNNING);
        this.sendHeartbeats(list);
        metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((long)0L, (long)metrics.getOccupiedMapSlots());
        Assert.assertEquals((long)reduceSlotsPerTask, (long)metrics.getOccupiedReduceSlots());
        list.clear();
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
        this.sendHeartbeats(list);
        metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((long)0L, (long)metrics.getOccupiedReduceSlots());
    }

    private void sendHeartbeats(List<TaskStatus> list) throws IOException {
        TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
        status[0] = this.getTTStatus(trackers[0], list);
        status[1] = this.getTTStatus(trackers[1], new ArrayList<TaskStatus>());
        status[2] = this.getTTStatus(trackers[2], new ArrayList<TaskStatus>());
        for (int i = 0; i < trackers.length; ++i) {
            TestClusterStatus.sendHeartBeat(this.jobTracker, status[i], false, false, trackers[i], this.responseId);
        }
        this.responseId = (short)(this.responseId + 1);
    }

    private void addReduceTaskAttemptToList(List<TaskStatus> list, int reduceSlotsPerTask, TaskStatus.State state) {
        TaskStatus ts = TaskStatus.createTaskStatus((boolean)false, (TaskAttemptID)new TaskAttemptID("jt", 1, false, 0, 0), (float)0.0f, (int)reduceSlotsPerTask, (TaskStatus.State)state, (String)"", (String)"", (String)trackers[0], (TaskStatus.Phase)TaskStatus.Phase.REDUCE, null);
        list.add(ts);
    }

    private void addMapTaskAttemptToList(List<TaskStatus> list, int mapSlotsPerTask, TaskStatus.State state) {
        TaskStatus ts = TaskStatus.createTaskStatus((boolean)true, (TaskAttemptID)new TaskAttemptID("jt", 1, true, 0, 0), (float)0.0f, (int)mapSlotsPerTask, (TaskStatus.State)state, (String)"", (String)"", (String)trackers[0], (TaskStatus.Phase)TaskStatus.Phase.MAP, null);
        list.add(ts);
    }

    @Test
    public void testReservedSlots() throws IOException {
        JobConf conf = this.mr.createJobConf();
        conf.setNumReduceTasks(1);
        conf.setSpeculativeExecution(false);
        TaskTracker tt1 = this.jobTracker.getTaskTracker(trackers[0]);
        TaskTracker tt2 = this.jobTracker.getTaskTracker(trackers[1]);
        TaskTrackerStatus status1 = new TaskTrackerStatus(trackers[0], "http", JobInProgress.convertTrackerNameToHostName((String)trackers[0]), 0, new ArrayList(), 0, 0, 2, 2);
        TaskTrackerStatus status2 = new TaskTrackerStatus(trackers[1], "http", JobInProgress.convertTrackerNameToHostName((String)trackers[1]), 0, new ArrayList(), 0, 0, 2, 2);
        tt1.setStatus(status1);
        tt2.setStatus(status2);
        fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf((Configuration)conf), this.jobTracker);
        TestClusterStatus.sendHeartBeat(this.jobTracker, status1, false, true, trackers[0], this.responseId);
        TestClusterStatus.sendHeartBeat(this.jobTracker, status2, false, true, trackers[1], this.responseId);
        this.responseId = (short)(this.responseId + 1);
        ClusterMetrics metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((String)"reserved map slots do not match", (long)2L, (long)metrics.getReservedMapSlots());
        Assert.assertEquals((String)"reserved reduce slots do not match", (long)2L, (long)metrics.getReservedReduceSlots());
        TestClusterStatus.sendHeartBeat(this.jobTracker, status1, false, true, trackers[0], this.responseId);
        TestClusterStatus.sendHeartBeat(this.jobTracker, status2, false, true, trackers[1], this.responseId);
        this.responseId = (short)(this.responseId + 1);
        metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((String)"reserved map slots do not match", (long)4L, (long)metrics.getReservedMapSlots());
        Assert.assertEquals((String)"reserved reduce slots do not match", (long)4L, (long)metrics.getReservedReduceSlots());
        scheduler.setUnreserveSlots(true);
        TestClusterStatus.sendHeartBeat(this.jobTracker, status1, false, true, trackers[0], this.responseId);
        TestClusterStatus.sendHeartBeat(this.jobTracker, status2, false, true, trackers[1], this.responseId);
        this.responseId = (short)(this.responseId + 1);
        metrics = this.jobTracker.getClusterMetrics();
        Assert.assertEquals((String)"map slots should have been unreserved", (long)0L, (long)metrics.getReservedMapSlots());
        Assert.assertEquals((String)"reduce slots should have been unreserved", (long)0L, (long)metrics.getReservedReduceSlots());
    }

    @Test
    public void testClusterStatus() throws Exception {
        ClusterStatus clusterStatus = this.client.getClusterStatus();
        Assert.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getUsedMemory());
        Assert.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getMaxMemory());
        clusterStatus = this.client.getClusterStatus(false);
        Assert.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getUsedMemory());
        Assert.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getMaxMemory());
        clusterStatus = this.client.getClusterStatus(true);
        if (-1L == clusterStatus.getUsedMemory()) {
            Assert.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory()), (Object)true, (Object)false);
        }
        if (-1L == clusterStatus.getMaxMemory()) {
            Assert.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory()), (Object)true, (Object)false);
        }
    }

    static class FakeJobInProgress
    extends JobInProgress {
        public FakeJobInProgress(JobID jId, JobConf jobConf, JobTracker jt) throws IOException {
            super(jId, jobConf, jt);
        }
    }

    static class FakeTaskScheduler
    extends JobQueueTaskScheduler {
        private Map<TaskTracker, Integer> reservedCounts = new HashMap<TaskTracker, Integer>();
        private boolean unreserveSlots;

        public FakeTaskScheduler() {
            scheduler = this;
        }

        void setUnreserveSlots(boolean shouldUnreserve) {
            this.unreserveSlots = shouldUnreserve;
        }

        public List<Task> assignTasks(TaskTracker tt) {
            if (this.unreserveSlots) {
                tt.unreserveSlots(TaskType.MAP, (JobInProgress)fakeJob);
                tt.unreserveSlots(TaskType.REDUCE, (JobInProgress)fakeJob);
            } else {
                int currCount = 1;
                if (this.reservedCounts.containsKey(tt)) {
                    currCount = this.reservedCounts.get(tt) + 1;
                }
                this.reservedCounts.put(tt, currCount);
                tt.reserveSlots(TaskType.MAP, (JobInProgress)fakeJob, currCount);
                tt.reserveSlots(TaskType.REDUCE, (JobInProgress)fakeJob, currCount);
            }
            return new ArrayList<Task>();
        }
    }
}

