/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.JobTrackerInstrumentation;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class TestRecoveryManager {
    private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class);
    private static final Path TEST_DIR = new Path("/tmp");
    private JobConf conf;
    private FileSystem fs;
    private MiniDFSCluster dfs;
    private MiniMRCluster mr;

    static void mkdir(FileSystem fs, String dir) throws IOException {
        Path p = new Path(dir);
        fs.mkdirs(p);
        fs.setPermission(p, new FsPermission(511));
    }

    @Before
    public void setUp() throws IOException {
        this.conf = new JobConf();
        this.dfs = new MiniDFSCluster((Configuration)this.conf, 1, true, null);
        this.fs = this.dfs.getFileSystem();
        this.conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
        this.conf.set("mapred.system.dir", "/mapred");
        Path mapredSysDir = new Path(this.conf.get("mapred.system.dir"));
        this.fs.mkdirs(mapredSysDir);
        this.fs.setPermission(mapredSysDir, new FsPermission(448));
        this.fs.setOwner(mapredSysDir, UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
        TestRecoveryManager.mkdir(this.fs, "/user");
        TestRecoveryManager.mkdir(this.fs, "/mapred");
        TestRecoveryManager.mkdir(this.fs, "/tmp");
    }

    private void startCluster() throws IOException {
        this.startCluster(this.conf);
    }

    private void startCluster(JobConf conf) throws IOException {
        this.mr = new MiniMRCluster(1, this.dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
    }

    @After
    public void tearDown() {
        ClusterStatus status = this.mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
        if (status.getJobTrackerStatus() == Cluster.JobTrackerStatus.RUNNING) {
            this.mr.shutdown();
        }
        if (this.dfs != null) {
            this.dfs.shutdown();
        }
    }

    @Ignore
    @Test(timeout=120000L)
    public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
        LOG.info((Object)"Testing jobtracker restart with faulty job");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob1 = new JobClient(job1).submitJob(job1);
        LOG.info((Object)("Submitted job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        while (rJob2.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        String sysDir = this.mr.getJobTrackerRunner().getJobTracker().getSystemDir();
        this.mr.stopJobTracker();
        Path jobFile = new Path(sysDir, rJob1.getID().toString() + "/" + "job-info");
        LOG.info((Object)("Deleting job token file : " + jobFile.toString()));
        Assert.assertTrue((boolean)this.fs.delete(jobFile, false));
        FSDataOutputStream out = this.fs.create(jobFile);
        out.write(1);
        out.close();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        ClusterStatus status = jobtracker.getClusterStatus(false);
        Assert.assertEquals((String)"JobTracker crashed!", (Object)Cluster.JobTrackerStatus.RUNNING, (Object)status.getJobTrackerStatus());
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Job should be successful", (boolean)rJob2.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testJobResubmission() throws Exception {
        LOG.info((Object)"Testing Job Resubmission");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        JobClient jc1 = new JobClient(job1);
        RunningJob rJob1 = jc1.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        job2.setBoolean("mapred.job.restart.recover", false);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip2 = jobtracker.getJob(rJob2.getID());
        while (!jip2.inited()) {
            LOG.info((Object)("Waiting for job " + jip2.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc1);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        this.fs.create(new Path(TEST_DIR, "signal"));
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test
    public void testJobResubmissionAsDifferentUser() throws Exception {
        LOG.info((Object)"Testing Job Resubmission as a different user to the jobtracker");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        final JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        UserGroupInformation ugi = UserGroupInformation.createUserForTesting((String)"bob", (String[])new String[]{"users"});
        job1.setUser(ugi.getUserName());
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = (RunningJob)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                JobClient jc = new JobClient(job1);
                return jc.submitJob(job1);
            }
        });
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        this.fs.create(new Path(TEST_DIR, "signal"));
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        rJob1 = jc.getJob(rJob1.getID());
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test
    public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
        LOG.info((Object)"Testing Job Resubmission");
        this.conf.setBoolean("mapred.jobtracker.restart.recover", true);
        this.conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class, JobTrackerInstrumentation.class);
        this.startCluster(this.conf);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        SleepJob job = new SleepJob();
        job.setConf((Configuration)this.mr.createJobConf());
        JobConf job1 = job.setupJobConf(1, 0, 1L, 1, 1L, 1);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        TestJobTrackerInstrumentation.finalizeCall.await();
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Resubmission failed ", (long)1L, (long)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testJobTrackerRestartWithBadJobs() throws Exception {
        LOG.info((Object)"Testing recovery-manager");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        final JobConf job3 = this.mr.createJobConf();
        UserGroupInformation ugi3 = UserGroupInformation.createUserForTesting((String)"abc", (String[])new String[]{"users"});
        UtilsForTests.configureWaitingJobConf(job3, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob3 = (RunningJob)ugi3.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                return new JobClient(job3).submitJob(job3);
            }
        });
        LOG.info((Object)("Submitted job " + rJob3.getID() + " with different user"));
        jip = jobtracker.getJob(rJob3.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        this.mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
        this.mr.getJobTrackerConf().setBoolean("mapred.acls.enabled", true);
        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
        this.mr.getJobTrackerConf().set(QueueManager.toFullPropertyName((String)"default", (String)QueueManager.QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        Assert.assertEquals((String)"Recovery manager failed to tolerate job failures", (long)1L, (long)jobtracker.getAllJobs().length);
        JobStatus status = jobtracker.getJobStatus(rJob1.getID());
        Assert.assertNull((String)"Faulty job should not be resubmitted", (Object)status);
        jip = jobtracker.getJob(rJob2.getID());
        Assert.assertFalse((String)"Job should be running", (boolean)jip.isComplete());
        status = jobtracker.getJobStatus(rJob3.getID());
        Assert.assertNull((String)"Job should be missing because of ACL changed", (Object)status);
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal1"));
            UtilsForTests.waitFor(100L);
        }
        Assert.assertTrue((String)"Job should be successful", (boolean)rJob2.isSuccessful());
    }

    @Test(timeout=120000L)
    public void testRestartCount() throws Exception {
        LOG.info((Object)"Testing Job Restart Count");
        this.startCluster();
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        for (int i = 1; i <= 2; ++i) {
            LOG.info((Object)("Stopping jobtracker for " + i + " time"));
            this.mr.stopJobTracker();
            LOG.info((Object)("Starting jobtracker for " + i + " time"));
            this.mr.startJobTracker();
            UtilsForTests.waitForJobTracker(jc);
            jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
            Assert.assertEquals((String)"Recovery manager failed to recover restart count", (long)0L, (long)jip.getNumRestarts());
        }
        rJob1.killJob();
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager", signalFile, signalFile);
        RunningJob rJob2 = jc.submitJob(job2);
        LOG.info((Object)("Submitted first job after restart" + rJob2.getID()));
        jip = jobtracker.getJob(rJob2.getID());
        Assert.assertEquals((String)"Restart count for new job is incorrect", (long)0L, (long)jip.getNumRestarts());
        LOG.info((Object)"Stopping jobtracker for testing the fs errors");
        this.mr.stopJobTracker();
        Path rFile = jobtracker.recoveryManager.getRestartCountFile();
        this.fs.delete(rFile, false);
        FSDataOutputStream out = this.fs.create(rFile);
        out.writeBoolean(true);
        out.close();
        LOG.info((Object)"Starting jobtracker with fs errors");
        this.mr.startJobTracker();
        MiniMRCluster.JobTrackerRunner runner = this.mr.getJobTrackerRunner();
        Assert.assertFalse((String)"JobTracker is still alive", (boolean)runner.isActive());
    }

    @Test(timeout=120000L)
    public void testJobTrackerInfoCreation() throws Exception {
        LOG.info((Object)"Testing jobtracker.info file");
        this.startCluster();
        String namenode = this.dfs.getFileSystem().getUri().getHost() + ":" + this.dfs.getFileSystem().getUri().getPort();
        this.dfs.shutdownDataNodes();
        JobConf conf = new JobConf();
        FileSystem.setDefaultUri((Configuration)conf, (String)namenode);
        conf.set("mapred.job.tracker", "localhost:0");
        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        JobTracker jobtracker = new JobTracker(conf);
        boolean failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        Assert.assertTrue((String)"JobTracker created info files without datanodes!!!", (boolean)failed);
        Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
        Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
        DistributedFileSystem fs = this.dfs.getFileSystem();
        Assert.assertFalse((String)"Info file exists after update failure", (boolean)fs.exists(restartFile));
        Assert.assertFalse((String)"Temporary restart-file exists after update failure", (boolean)fs.exists(restartFile));
        this.dfs.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
        this.dfs.waitActive();
        failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        Assert.assertFalse((String)"JobTracker failed to create info files with datanodes!", (boolean)failed);
    }

    public static class TestJobTrackerInstrumentation
    extends JobTrackerInstrumentation {
        static CountDownLatch finalizeCall = new CountDownLatch(1);

        public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
            super(jt, conf);
        }

        public void finalizeJob(JobConf conf, JobID id) {
            if (finalizeCall.getCount() == 0L) {
                return;
            }
            finalizeCall.countDown();
            throw new IllegalStateException("Controlled error finalizing job");
        }
    }
}

