/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.io.NullOutputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestDataTransferKeepalive {
    Configuration conf = new HdfsConfiguration();
    private MiniDFSCluster cluster;
    private FileSystem fs;
    private InetSocketAddress dnAddr;
    private DataNode dn;
    private DFSClient dfsClient;
    private static Path TEST_FILE = new Path("/test");
    private static final int KEEPALIVE_TIMEOUT = 1000;
    private static final int WRITE_TIMEOUT = 3000;

    @Before
    public void setup() throws Exception {
        this.conf.setInt("dfs.datanode.socket.reuse.keepalive", 1000);
        this.conf.setInt("dfs.client.max.block.acquire.failures", 0);
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(1).build();
        this.fs = this.cluster.getFileSystem();
        this.dfsClient = ((DistributedFileSystem)this.fs).dfs;
        this.dfsClient.peerCache.clear();
        String poolId = this.cluster.getNamesystem().getBlockPoolId();
        this.dn = this.cluster.getDataNodes().get(0);
        DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(this.dn, poolId);
        this.dnAddr = NetUtils.createSocketAddr((String)dnReg.getXferAddr());
    }

    @After
    public void teardown() {
        this.cluster.shutdown();
    }

    @Test(timeout=30000L)
    public void testKeepaliveTimeouts() throws Exception {
        DFSTestUtil.createFile(this.fs, TEST_FILE, 1L, (short)1, 0L);
        Assert.assertEquals((long)0L, (long)this.dfsClient.peerCache.size());
        this.assertXceiverCount(0);
        DFSTestUtil.readFile(this.fs, TEST_FILE);
        Assert.assertEquals((long)1L, (long)this.dfsClient.peerCache.size());
        this.assertXceiverCount(1);
        Thread.sleep(2000L);
        this.assertXceiverCount(0);
        Assert.assertEquals((long)1L, (long)this.dfsClient.peerCache.size());
        Peer peer = this.dfsClient.peerCache.get(this.dn.getDatanodeId(), false);
        Assert.assertNotNull((Object)peer);
        Assert.assertEquals((long)-1L, (long)peer.getInputStream().read());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testSlowReader() throws Exception {
        MiniDFSCluster.DataNodeProperties props = this.cluster.stopDataNode(0);
        props.conf.setInt("dfs.datanode.socket.write.timeout", 3000);
        props.conf.setInt("dfs.datanode.socket.reuse.keepalive", 120000);
        Assert.assertTrue((boolean)this.cluster.restartDataNode(props, true));
        this.cluster.triggerHeartbeats();
        this.dn = this.cluster.getDataNodes().get(0);
        DFSTestUtil.createFile(this.fs, TEST_FILE, 0x800000L, (short)1, 0L);
        FSDataInputStream stm = this.fs.open(TEST_FILE);
        try {
            stm.read();
            this.assertXceiverCount(1);
            long sleepTime = 3100L;
            for (long totalSleepTime = 0L; this.getXceiverCountWithoutServer() > 0 && totalSleepTime < 5000L; totalSleepTime += sleepTime) {
                Thread.sleep(sleepTime);
                sleepTime = 100L;
            }
            this.assertXceiverCount(0);
        }
        finally {
            IOUtils.closeStream((Closeable)stm);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testManyClosedSocketsInCache() throws Exception {
        DFSTestUtil.createFile(this.fs, TEST_FILE, 1L, (short)1, 0L);
        Closeable[] stms = new InputStream[5];
        try {
            for (int i = 0; i < stms.length; ++i) {
                stms[i] = this.fs.open(TEST_FILE);
            }
            for (Closeable stm : stms) {
                IOUtils.copyBytes((InputStream)stm, (OutputStream)new NullOutputStream(), (int)1024);
            }
        }
        finally {
            IOUtils.cleanup(null, (Closeable[])stms);
        }
        DFSClient client = ((DistributedFileSystem)this.fs).dfs;
        Assert.assertEquals((long)5L, (long)client.peerCache.size());
        Thread.sleep(1500L);
        this.assertXceiverCount(0);
        Assert.assertEquals((long)5L, (long)client.peerCache.size());
        DFSTestUtil.readFile(this.fs, TEST_FILE);
    }

    private void assertXceiverCount(int expected) {
        int count = this.getXceiverCountWithoutServer();
        if (count != expected) {
            ReflectionUtils.printThreadInfo((PrintWriter)new PrintWriter(System.err), (String)"Thread dumps");
            Assert.fail((String)("Expected " + expected + " xceivers, found " + count));
        }
    }

    private int getXceiverCountWithoutServer() {
        return this.dn.getXceiverCount() - 1;
    }
}

