/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import com.google.common.base.Supplier;
import java.io.File;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthMonitor;
import org.apache.hadoop.ha.TestNodeFencer;
import org.apache.hadoop.ha.ZKFCTestUtil;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.mapred.HAUtil;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTrackerHADaemon;
import org.apache.hadoop.mapred.MiniMRHACluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.tools.MRHAAdmin;
import org.apache.hadoop.mapred.tools.MRZKFailoverController;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestMRZKFailoverController
extends ClientBaseWithFixes {
    private static final Log LOG = LogFactory.getLog(TestMRZKFailoverController.class);
    private static final Path TEST_DIR = new Path("/tmp/tst");
    private Configuration conf;
    private MiniMRHACluster cluster;
    private MultithreadedTestUtil.TestContext ctx;
    private ZKFCThread thr1;
    private ZKFCThread thr2;

    @Before
    public void setup() throws Exception {
        String logicalName = "logicaljt";
        this.conf = new Configuration();
        this.conf.set(HAUtil.addKeySuffixes((String)"ha.zookeeper.quorum", (String[])new String[]{logicalName}), this.hostPort);
        this.conf.set("mapred.ha.fencing.methods", TestNodeFencer.AlwaysSucceedFencer.class.getName());
        this.conf.setBoolean("mapred.ha.automatic-failover.enabled", true);
        this.conf.setInt("ipc.client.connection.maxidletime", 0);
        this.conf.setInt(HAUtil.addKeySuffixes((String)"mapred.ha.zkfc.port", (String[])new String[]{logicalName, "jt1"}), 10003);
        this.conf.setInt(HAUtil.addKeySuffixes((String)"mapred.ha.zkfc.port", (String[])new String[]{logicalName, "jt2"}), 10004);
        this.cluster = new MiniMRHACluster(this.conf);
        this.ctx = new MultithreadedTestUtil.TestContext();
        this.thr1 = new ZKFCThread(this.ctx, 0);
        this.ctx.addThread((MultithreadedTestUtil.TestingThread)this.thr1);
        Assert.assertEquals((long)0L, (long)this.thr1.zkfc.run(new String[]{"-formatZK"}));
        this.thr1.start();
        this.waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        this.thr2 = new ZKFCThread(this.ctx, 1);
        this.ctx.addThread((MultithreadedTestUtil.TestingThread)this.thr2);
        this.thr2.start();
        this.cluster.startTaskTracker(0, 1);
        this.cluster.waitActive();
        ZKFCTestUtil.waitForHealthState((ZKFailoverController)this.thr1.zkfc, (HealthMonitor.State)HealthMonitor.State.SERVICE_HEALTHY, (MultithreadedTestUtil.TestContext)this.ctx);
        ZKFCTestUtil.waitForHealthState((ZKFailoverController)this.thr2.zkfc, (HealthMonitor.State)HealthMonitor.State.SERVICE_HEALTHY, (MultithreadedTestUtil.TestContext)this.ctx);
    }

    @After
    public void shutdown() throws Exception {
        this.cluster.shutdown();
        if (this.thr1 != null) {
            this.thr1.interrupt();
        }
        if (this.thr2 != null) {
            this.thr2.interrupt();
        }
        if (this.ctx != null) {
            this.ctx.stop();
        }
    }

    @Test(timeout=60000L)
    public void testFailoverWhileRunningJob() throws Exception {
        LOG.info((Object)"Running job failover test");
        FileUtil.fullyDelete((File)new File("/tmp/tst"));
        JobConf job1 = new JobConf(this.conf);
        String signalFile = new Path(TEST_DIR, "signal").toString();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done: " + rJob1.mapProgress()));
            UtilsForTests.waitFor(500L);
        }
        LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done: " + rJob1.mapProgress()));
        LOG.info((Object)"Shutting down jt1");
        this.cluster.shutdownJobTracker(0);
        LocalFileSystem fs = FileSystem.getLocal((Configuration)this.conf);
        fs.create(new Path(TEST_DIR, "signal"));
        while (!rJob1.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful: " + rJob1.mapProgress()));
            UtilsForTests.waitFor(500L);
        }
        Assert.assertTrue((String)"Job should be successful", (boolean)rJob1.isSuccessful());
    }

    @Test(timeout=60000L)
    public void testManualFailover() throws Exception {
        LOG.info((Object)"Running manual failover test");
        this.thr2.zkfc.getLocalTarget().getZKFCProxy(this.conf, 15000).gracefulFailover();
        this.waitForHAState(0, HAServiceProtocol.HAServiceState.STANDBY);
        this.waitForHAState(1, HAServiceProtocol.HAServiceState.ACTIVE);
        this.thr1.zkfc.getLocalTarget().getZKFCProxy(this.conf, 15000).gracefulFailover();
        this.waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        this.waitForHAState(1, HAServiceProtocol.HAServiceState.STANDBY);
    }

    @Test(timeout=60000L)
    public void testManualFailoverWithMRHAAdmin() throws Exception {
        LOG.info((Object)"Running manual failover test with MRHAAdmin");
        MRHAAdmin tool = new MRHAAdmin();
        tool.setConf(this.conf);
        Assert.assertEquals((long)0L, (long)tool.run(new String[]{"-failover", "jt1", "jt2"}));
        this.waitForHAState(0, HAServiceProtocol.HAServiceState.STANDBY);
        this.waitForHAState(1, HAServiceProtocol.HAServiceState.ACTIVE);
        Assert.assertEquals((long)0L, (long)tool.run(new String[]{"-failover", "jt2", "jt1"}));
        this.waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        this.waitForHAState(1, HAServiceProtocol.HAServiceState.STANDBY);
    }

    private void waitForHAState(int jtidx, final HAServiceProtocol.HAServiceState state) throws TimeoutException, InterruptedException {
        final JobTrackerHADaemon jtHaDaemon = this.cluster.getJobTrackerHaDaemon(jtidx);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            public Boolean get() {
                try {
                    return jtHaDaemon.getServiceStatus().getState() == state;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    return false;
                }
            }
        }, (int)50, (int)5000);
    }

    private class ZKFCThread
    extends MultithreadedTestUtil.TestingThread {
        private final MRZKFailoverController zkfc;

        public ZKFCThread(MultithreadedTestUtil.TestContext ctx, int idx) {
            super(ctx);
            this.zkfc = MRZKFailoverController.create((Configuration)TestMRZKFailoverController.this.cluster.getJobTrackerHaDaemon(idx).getConf());
        }

        public void doWork() throws Exception {
            try {
                Assert.assertEquals((long)0L, (long)this.zkfc.run(new String[0]));
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

