/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.recovery;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.junit.Test;

public class TestZKRMStateStoreZKClientConnections
extends ClientBaseWithFixes {
    private static final int ZK_OP_WAIT_TIME = 3000;
    private Log LOG = LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);

    @Test(timeout=20000L)
    public void testZKClientRetry() throws Exception {
        TestZKClient zkClientTester = new TestZKClient();
        String path = "/test";
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.resourcemanager.zk-timeout-ms", 1000);
        conf.setLong("yarn.resourcemanager.zk-retry-interval-ms", 100L);
        final ZKRMStateStore store = (ZKRMStateStore)zkClientTester.getRMStateStore((Configuration)conf);
        RMStateStoreTestBase.TestDispatcher dispatcher = new RMStateStoreTestBase.TestDispatcher();
        store.setRMDispatcher((Dispatcher)dispatcher);
        final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
        this.stopServer();
        Thread clientThread = new Thread(){

            @Override
            public void run() {
                try {
                    store.getDataWithRetries("/test", true);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    assertionFailedInThread.set(true);
                }
            }
        };
        Thread.sleep(2000L);
        this.startServer();
        clientThread.join();
        Assert.assertFalse((boolean)assertionFailedInThread.get());
    }

    @Test(timeout=20000L)
    public void testZKClientDisconnectAndReconnect() throws Exception {
        TestZKClient zkClientTester = new TestZKClient();
        String path = "/test";
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.resourcemanager.zk-timeout-ms", 100);
        ZKRMStateStore store = (ZKRMStateStore)zkClientTester.getRMStateStore((Configuration)conf);
        RMStateStoreTestBase.TestDispatcher dispatcher = new RMStateStoreTestBase.TestDispatcher();
        store.setRMDispatcher((Dispatcher)dispatcher);
        store.createWithRetries(path, null, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        store.getDataWithRetries(path, true);
        store.setDataWithRetries(path, "newBytes".getBytes(), 0);
        this.stopServer();
        zkClientTester.watcher.waitForDisconnected(3000L);
        try {
            store.getDataWithRetries(path, true);
            Assert.fail((String)"Expected ZKClient time out exception");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Wait for ZKClient creation timed out"));
        }
        this.startServer();
        zkClientTester.watcher.waitForConnected(3000L);
        byte[] ret = null;
        try {
            ret = store.getDataWithRetries(path, true);
        }
        catch (Exception e) {
            String error = "ZKRMStateStore Session restore failed";
            this.LOG.error((Object)error, (Throwable)e);
            Assert.fail((String)error);
        }
        Assert.assertEquals((Object)"newBytes", (Object)new String(ret));
    }

    @Test(timeout=20000L)
    public void testZKSessionTimeout() throws Exception {
        TestZKClient zkClientTester = new TestZKClient();
        String path = "/test";
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.resourcemanager.zk-timeout-ms", 100);
        ZKRMStateStore store = (ZKRMStateStore)zkClientTester.getRMStateStore((Configuration)conf);
        RMStateStoreTestBase.TestDispatcher dispatcher = new RMStateStoreTestBase.TestDispatcher();
        store.setRMDispatcher((Dispatcher)dispatcher);
        zkClientTester.forExpire = true;
        store.createWithRetries(path, null, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        store.getDataWithRetries(path, true);
        store.setDataWithRetries(path, "bytes".getBytes(), 0);
        zkClientTester.syncBarrier.await();
        try {
            byte[] ret = store.getDataWithRetries(path, false);
            Assert.assertEquals((Object)"bytes", (Object)new String(ret));
        }
        catch (Exception e) {
            String error = "New session creation failed";
            this.LOG.error((Object)error, (Throwable)e);
            Assert.fail((String)error);
        }
    }

    @Test(timeout=20000L)
    public void testSetZKAcl() {
        TestZKClient zkClientTester = new TestZKClient();
        YarnConfiguration conf = new YarnConfiguration();
        conf.set("yarn.resourcemanager.zk-acl", "world:anyone:rwca");
        try {
            zkClientTester.store.zkClient.delete(zkClientTester.store.znodeWorkingPath, -1);
            Assert.fail((String)"Shouldn't be able to delete path");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test(timeout=20000L)
    public void testInvalidZKAclConfiguration() {
        TestZKClient zkClientTester = new TestZKClient();
        YarnConfiguration conf = new YarnConfiguration();
        conf.set("yarn.resourcemanager.zk-acl", "randomstring&*");
        try {
            zkClientTester.getRMStateStore((Configuration)conf);
            Assert.fail((String)"ZKRMStateStore created with bad ACL");
        }
        catch (ZKUtil.BadAclFormatException bafe) {
        }
        catch (Exception e) {
            String error = "Incorrect exception on BadAclFormat";
            this.LOG.error((Object)error, (Throwable)e);
            Assert.fail((String)error);
        }
    }

    class TestZKClient {
        ZKRMStateStore store;
        boolean forExpire = false;
        TestForwardingWatcher watcher;
        CyclicBarrier syncBarrier = new CyclicBarrier(2);

        TestZKClient() {
        }

        public RMStateStore getRMStateStore(Configuration conf) throws Exception {
            String workingZnode = "/Test";
            conf.set("yarn.resourcemanager.zk-address", TestZKRMStateStoreZKClientConnections.this.hostPort);
            conf.set("yarn.resourcemanager.zk-state-store.parent-path", workingZnode);
            this.watcher = new TestForwardingWatcher();
            this.store = new TestZKRMStateStore(conf, workingZnode);
            return this.store;
        }

        private class TestForwardingWatcher
        extends ClientBaseWithFixes.CountdownWatcher {
            private TestForwardingWatcher() {
            }

            public void process(WatchedEvent event) {
                super.process(event);
                try {
                    if (TestZKClient.this.store != null) {
                        TestZKClient.this.store.processWatchEvent(event);
                    }
                }
                catch (Throwable t) {
                    TestZKRMStateStoreZKClientConnections.this.LOG.error((Object)("Failed to process watcher event " + event + ": " + StringUtils.stringifyException((Throwable)t)));
                }
            }
        }

        protected class TestZKRMStateStore
        extends ZKRMStateStore {
            public TestZKRMStateStore(Configuration conf, String workingZnode) throws Exception {
                this.init(conf);
                this.start();
                Assert.assertTrue((boolean)this.znodeWorkingPath.equals(workingZnode));
            }

            public ZooKeeper getNewZooKeeper() throws IOException, InterruptedException {
                return TestZKRMStateStoreZKClientConnections.this.createClient(TestZKClient.this.watcher, TestZKRMStateStoreZKClientConnections.this.hostPort, 100);
            }

            public synchronized void processWatchEvent(WatchedEvent event) throws Exception {
                if (TestZKClient.this.forExpire) {
                    WatchedEvent expriredEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
                    super.processWatchEvent(expriredEvent);
                    TestZKClient.this.forExpire = false;
                    TestZKClient.this.syncBarrier.await();
                } else {
                    super.processWatchEvent(event);
                }
            }
        }
    }
}

