/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode;

import com.facebook.presto.hive.;
import com.facebook.presto.hive.$internal.org.apache.commons.logging.Log;
import com.facebook.presto.hive.$internal.org.apache.commons.logging.LogFactory;
import com.facebook.presto.hive.$internal.org.apache.hadoop.conf.Configuration;
import com.facebook.presto.hive.$internal.org.apache.hadoop.conf.Configured;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.HDFSPolicyProvider;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.Block;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.DatanodeID;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.FSConstants;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.LocatedBlock;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.common.HdfsConstants;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.common.Storage;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.BlockSender;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.DataStorage;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.FSDataset;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.UpgradeManagerDatanode;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.namenode.NameNode;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import com.facebook.presto.hive.$internal.org.apache.hadoop.http.HttpServer;
import com.facebook.presto.hive.$internal.org.apache.hadoop.io.IOUtils;
import com.facebook.presto.hive.$internal.org.apache.hadoop.io.Text;
import com.facebook.presto.hive.$internal.org.apache.hadoop.ipc.RPC;
import com.facebook.presto.hive.$internal.org.apache.hadoop.ipc.RemoteException;
import com.facebook.presto.hive.$internal.org.apache.hadoop.ipc.Server;
import com.facebook.presto.hive.$internal.org.apache.hadoop.net.DNS;
import com.facebook.presto.hive.$internal.org.apache.hadoop.net.NetUtils;
import com.facebook.presto.hive.$internal.org.apache.hadoop.security.SecurityUtil;
import com.facebook.presto.hive.$internal.org.apache.hadoop.security.authorize.ConfiguredPolicy;
import com.facebook.presto.hive.$internal.org.apache.hadoop.security.authorize.PolicyProvider;
import com.facebook.presto.hive.$internal.org.apache.hadoop.util.Daemon;
import com.facebook.presto.hive.$internal.org.apache.hadoop.util.DiskChecker;
import com.facebook.presto.hive.$internal.org.apache.hadoop.util.ReflectionUtils;
import com.facebook.presto.hive.$internal.org.apache.hadoop.util.StringUtils;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class DataNode
extends Configured
implements InterDatanodeProtocol,
ClientDatanodeProtocol,
FSConstants,
Runnable {
    public static final Log LOG = LogFactory.getLog(DataNode.class);
    public static final String DN_CLIENTTRACE_FORMAT = "src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, srvID: %s, blockid: %s";
    static final Log ClientTraceLog;
    public DatanodeProtocol namenode = null;
    public FSDatasetInterface data = null;
    public DatanodeRegistration dnRegistration = null;
    volatile boolean shouldRun = true;
    private LinkedList<Block> receivedBlockList = new LinkedList();
    private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
    private LinkedList<String> delHints = new LinkedList();
    public static final String EMPTY_DEL_HINT = "";
    AtomicInteger xmitsInProgress = new AtomicInteger();
    Daemon dataXceiverServer = null;
    ThreadGroup threadGroup = null;
    long blockReportInterval;
    long lastBlockReport = 0L;
    boolean resetBlockReportTime = true;
    long initialBlockReportDelay = 0L;
    long lastHeartbeat = 0L;
    long heartBeatInterval;
    private DataStorage storage = null;
    private HttpServer infoServer = null;
    DataNodeMetrics myMetrics;
    private static InetSocketAddress nameNodeAddr;
    private InetSocketAddress selfAddr;
    private static DataNode datanodeObject;
    private Thread dataNodeThread = null;
    String machineName;
    private static String dnThreadName;
    int socketTimeout;
    int socketWriteTimeout = 0;
    boolean transferToAllowed = true;
    int writePacketSize = 0;
    public DataBlockScanner blockScanner = null;
    public Daemon blockScannerThread = null;
    private static final Random R;
    public Server ipcServer;
    UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
    public static final int PKT_HEADER_LEN = 21;

    @Deprecated
    public static InetSocketAddress createSocketAddr(String target) throws IOException {
        return NetUtils.createSocketAddr(target);
    }

    static long now() {
        return System.currentTimeMillis();
    }

    DataNode(Configuration conf, AbstractList<File> dataDirs) throws IOException {
        super(conf);
        datanodeObject = this;
        try {
            this.startDataNode(conf, dataDirs);
        }
        catch (IOException ie) {
            this.shutdown();
            throw ie;
        }
    }

    void startDataNode(Configuration conf, AbstractList<File> dataDirs) throws IOException {
        if (conf.get("slave.host.name") != null) {
            this.machineName = conf.get("slave.host.name");
        }
        if (this.machineName == null) {
            this.machineName = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface", "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
        }
        InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
        this.socketTimeout = conf.getInt("dfs.socket.timeout", 60000);
        this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", 480000);
        this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", true);
        this.writePacketSize = conf.getInt("dfs.write.packet.size", 65536);
        String address = NetUtils.getServerAddress(conf, "dfs.datanode.bindAddress", "dfs.datanode.port", "dfs.datanode.address");
        InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
        int tmpPort = socAddr.getPort();
        this.storage = new DataStorage();
        this.dnRegistration = new DatanodeRegistration(this.machineName + ":" + tmpPort);
        this.namenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class, 19L, nameNodeAddr, conf);
        NamespaceInfo nsInfo = this.handshake();
        HdfsConstants.StartupOption startOpt = DataNode.getStartupOption(conf);
        assert (startOpt != null) : "Startup option must be set.";
        boolean simulatedFSDataset = conf.getBoolean("dfs.datanode.simulateddatastorage", false);
        if (simulatedFSDataset) {
            DataNode.setNewStorageID(this.dnRegistration);
            this.dnRegistration.storageInfo.layoutVersion = -18;
            this.dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
            conf.set("StorageId", this.dnRegistration.getStorageID());
            try {
                this.data = (FSDatasetInterface)ReflectionUtils.newInstance(Class.forName("com.facebook.presto.hive.$internal.org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
            }
            catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.stringifyException(e));
            }
        } else {
            this.storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
            this.dnRegistration.setStorageInfo(this.storage);
            this.data = new FSDataset(this.storage, conf);
        }
        ServerSocket ss = this.socketWriteTimeout > 0 ? ServerSocketChannel.open().socket() : new ServerSocket();
        Server.bind(ss, socAddr, 0);
        ss.setReceiveBufferSize(131072);
        tmpPort = ss.getLocalPort();
        this.selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), tmpPort);
        this.dnRegistration.setName(this.machineName + ":" + tmpPort);
        LOG.info("Opened info server at " + tmpPort);
        this.threadGroup = new ThreadGroup("dataXceiverServer");
        this.dataXceiverServer = new Daemon(this.threadGroup, new DataXceiverServer(ss, conf, this));
        this.threadGroup.setDaemon(true);
        this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", 3600000L);
        this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay", 0L) * 1000L;
        if (this.initialBlockReportDelay >= this.blockReportInterval) {
            this.initialBlockReportDelay = 0L;
            LOG.info("dfs.blockreport.initialDelay is greater than dfs.blockreport.intervalMsec. Setting initial delay to 0 msec:");
        }
        this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", 3L) * 1000L;
        DataNode.nameNodeAddr = nameNodeAddr;
        String reason = null;
        if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
            reason = "verification is turned off by configuration";
        } else if (!(this.data instanceof FSDataset)) {
            reason = "verifcation is supported only with FSDataset";
        }
        if (reason == null) {
            this.blockScanner = new DataBlockScanner(this, (FSDataset)this.data, conf);
        } else {
            LOG.info("Periodic Block Verification is disabled because " + reason + ".");
        }
        String infoAddr = NetUtils.getServerAddress(conf, "dfs.datanode.info.bindAddress", "dfs.datanode.info.port", "dfs.datanode.http.address");
        InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
        String infoHost = infoSocAddr.getHostName();
        int tmpInfoPort = infoSocAddr.getPort();
        this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf);
        if (conf.getBoolean("dfs.https.enable", false)) {
            boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
            InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.https.address", infoHost + ":" + 0));
            Configuration sslConf = new Configuration(false);
            sslConf.addResource(conf.get("dfs.https.server.keystore.resource", "ssl-server.xml"));
            this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
        }
        this.infoServer.addInternalServlet(null, "/streamFile/*", .StreamFile.class);
        this.infoServer.addInternalServlet(null, "/getFileChecksum/*", .FileChecksumServlets.GetServlet.class);
        this.infoServer.setAttribute("datanode.blockScanner", this.blockScanner);
        this.infoServer.addServlet(null, "/blockScannerReport", DataBlockScanner.Servlet.class);
        this.infoServer.start();
        this.dnRegistration.setInfoPort(this.infoServer.getPort());
        this.myMetrics = new DataNodeMetrics(conf, this.dnRegistration.getStorageID());
        if (conf.getBoolean("hadoop.security.authorization", false)) {
            PolicyProvider policyProvider = ReflectionUtils.newInstance(conf.getClass("hadoop.security.authorization.policyprovider", HDFSPolicyProvider.class, PolicyProvider.class), conf);
            SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
        }
        InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.get("dfs.datanode.ipc.address"));
        this.ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false, conf);
        this.ipcServer.start();
        this.dnRegistration.setIpcPort(this.ipcServer.getListenerAddress().getPort());
        LOG.info("dnRegistration = " + this.dnRegistration);
    }

    protected Socket newSocket() throws IOException {
        return this.socketWriteTimeout > 0 ? SocketChannel.open().socket() : new Socket();
    }

    private NamespaceInfo handshake() throws IOException {
        NamespaceInfo nsInfo = new NamespaceInfo();
        while (this.shouldRun) {
            try {
                nsInfo = this.namenode.versionRequest();
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info("Problem connecting to server: " + this.getNameNodeAddr());
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ie) {}
            }
        }
        String errorMsg = null;
        if (!nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
            errorMsg = "Incompatible build versions: namenode BV = " + nsInfo.getBuildVersion() + "; datanode BV = " + Storage.getBuildVersion();
            LOG.fatal(errorMsg);
            try {
                this.namenode.errorReport(this.dnRegistration, 0, errorMsg);
            }
            catch (SocketTimeoutException e) {
                LOG.info("Problem connecting to server: " + this.getNameNodeAddr());
            }
            throw new IOException(errorMsg);
        }
        assert (-18 == nsInfo.getLayoutVersion()) : "Data-node and name-node layout versions must be the same.Expected: -18 actual " + nsInfo.getLayoutVersion();
        return nsInfo;
    }

    public static DataNode getDataNode() {
        return datanodeObject;
    }

    public static InterDatanodeProtocol createInterDataNodeProtocolProxy(DatanodeID datanodeid, Configuration conf) throws IOException {
        InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost() + ":" + datanodeid.getIpcPort());
        if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
            InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
        }
        return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class, 3L, addr, conf);
    }

    public InetSocketAddress getNameNodeAddr() {
        return nameNodeAddr;
    }

    public InetSocketAddress getSelfAddr() {
        return this.selfAddr;
    }

    DataNodeMetrics getMetrics() {
        return this.myMetrics;
    }

    public String getNamenode() {
        return "<namenode>";
    }

    public static void setNewStorageID(DatanodeRegistration dnReg) {
        String ip = "unknownIP";
        try {
            ip = DNS.getDefaultIP("default");
        }
        catch (UnknownHostException ignored) {
            LOG.warn("Could not find ip address of \"default\" inteface.");
        }
        int rand = 0;
        try {
            rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
        }
        catch (NoSuchAlgorithmException e) {
            LOG.warn("Could not use SecureRandom");
            rand = R.nextInt(Integer.MAX_VALUE);
        }
        dnReg.storageID = "DS-" + rand + "-" + ip + "-" + dnReg.getPort() + "-" + System.currentTimeMillis();
    }

    private void register() throws IOException {
        if (this.dnRegistration.getStorageID().equals(EMPTY_DEL_HINT)) {
            DataNode.setNewStorageID(this.dnRegistration);
        }
        while (this.shouldRun) {
            try {
                this.dnRegistration.name = this.machineName + ":" + this.dnRegistration.getPort();
                this.dnRegistration = this.namenode.register(this.dnRegistration);
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info("Problem connecting to server: " + this.getNameNodeAddr());
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        assert (EMPTY_DEL_HINT.equals(this.storage.getStorageID()) && !EMPTY_DEL_HINT.equals(this.dnRegistration.getStorageID()) || this.storage.getStorageID().equals(this.dnRegistration.getStorageID())) : "New storageID can be assigned only if data-node is not formatted";
        if (this.storage.getStorageID().equals(EMPTY_DEL_HINT)) {
            this.storage.setStorageID(this.dnRegistration.getStorageID());
            this.storage.writeAll();
            LOG.info("New storage id " + this.dnRegistration.getStorageID() + " is assigned to data-node " + this.dnRegistration.getName());
        }
        if (!this.storage.getStorageID().equals(this.dnRegistration.getStorageID())) {
            throw new IOException("Inconsistent storage IDs. Name-node returned " + this.dnRegistration.getStorageID() + ". Expecting " + this.storage.getStorageID());
        }
        this.scheduleBlockReport(this.initialBlockReportDelay);
    }

    public void shutdown() {
        if (this.infoServer != null) {
            try {
                this.infoServer.stop();
            }
            catch (Exception e) {
                LOG.warn("Exception shutting down DataNode", e);
            }
        }
        if (this.ipcServer != null) {
            this.ipcServer.stop();
        }
        this.shouldRun = false;
        if (this.dataXceiverServer != null) {
            ((DataXceiverServer)this.dataXceiverServer.getRunnable()).kill();
            this.dataXceiverServer.interrupt();
            if (this.threadGroup != null) {
                while (true) {
                    this.threadGroup.interrupt();
                    LOG.info("Waiting for threadgroup to exit, active threads is " + this.threadGroup.activeCount());
                    if (this.threadGroup.activeCount() == 0) break;
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e) {}
                }
            }
            try {
                this.dataXceiverServer.join();
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
        }
        RPC.stopProxy(this.namenode);
        if (this.upgradeManager != null) {
            this.upgradeManager.shutdownUpgrade();
        }
        if (this.blockScannerThread != null) {
            this.blockScannerThread.interrupt();
            try {
                this.blockScannerThread.join(3600000L);
            }
            catch (InterruptedException ie) {
                // empty catch block
            }
        }
        if (this.storage != null) {
            try {
                this.storage.unlockAll();
            }
            catch (IOException ie) {
                // empty catch block
            }
        }
        if (this.dataNodeThread != null) {
            this.dataNodeThread.interrupt();
            try {
                this.dataNodeThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        if (this.data != null) {
            this.data.shutdown();
        }
        if (this.myMetrics != null) {
            this.myMetrics.shutdown();
        }
    }

    protected void checkDiskError(IOException e) throws IOException {
        if (e.getMessage() != null && e.getMessage().startsWith("No space left on device")) {
            throw new DiskChecker.DiskOutOfSpaceException("No space left on device");
        }
        this.checkDiskError();
    }

    protected void checkDiskError() throws IOException {
        try {
            this.data.checkDataDir();
        }
        catch (DiskChecker.DiskErrorException de) {
            this.handleDiskError(de.getMessage());
        }
    }

    private void handleDiskError(String errMsgr) {
        LOG.warn("DataNode is shutting down.\n" + errMsgr);
        this.shouldRun = false;
        try {
            this.namenode.errorReport(this.dnRegistration, 1, errMsgr);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    int getXceiverCount() {
        return this.threadGroup == null ? 0 : this.threadGroup.activeCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void offerService() throws Exception {
        LOG.info("using BLOCKREPORT_INTERVAL of " + this.blockReportInterval + "msec" + " Initial delay: " + this.initialBlockReportDelay + "msec");
        while (this.shouldRun) {
            try {
                LinkedList<String> linkedList;
                long startTime = DataNode.now();
                if (startTime - this.lastHeartbeat > this.heartBeatInterval) {
                    this.lastHeartbeat = startTime;
                    DatanodeCommand[] cmds = this.namenode.sendHeartbeat(this.dnRegistration, this.data.getCapacity(), this.data.getDfsUsed(), this.data.getRemaining(), this.xmitsInProgress.get(), this.getXceiverCount());
                    this.myMetrics.heartbeats.inc(DataNode.now() - startTime);
                    if (!this.processCommand(cmds)) continue;
                }
                Block[] blockArray = null;
                String[] delHintArray = null;
                LinkedList<Block> linkedList2 = this.receivedBlockList;
                synchronized (linkedList2) {
                    linkedList = this.delHints;
                    synchronized (linkedList) {
                        int numBlocks = this.receivedBlockList.size();
                        if (numBlocks > 0) {
                            if (numBlocks != this.delHints.size()) {
                                LOG.warn("Panic: receiveBlockList and delHints are not of the same length");
                            }
                            blockArray = this.receivedBlockList.toArray(new Block[numBlocks]);
                            delHintArray = this.delHints.toArray(new String[numBlocks]);
                        }
                    }
                }
                if (blockArray != null) {
                    if (delHintArray == null || delHintArray.length != blockArray.length) {
                        LOG.warn("Panic: block array & delHintArray are not the same");
                    }
                    this.namenode.blockReceived(this.dnRegistration, blockArray, delHintArray);
                    linkedList2 = this.receivedBlockList;
                    synchronized (linkedList2) {
                        linkedList = this.delHints;
                        synchronized (linkedList) {
                            for (int i = 0; i < blockArray.length; ++i) {
                                this.receivedBlockList.remove(blockArray[i]);
                                this.delHints.remove(delHintArray[i]);
                            }
                        }
                    }
                }
                if (startTime - this.lastBlockReport > this.blockReportInterval) {
                    long brStartTime = DataNode.now();
                    Block[] bReport = this.data.getBlockReport();
                    DatanodeCommand cmd = this.namenode.blockReport(this.dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport));
                    long brTime = DataNode.now() - brStartTime;
                    this.myMetrics.blockReports.inc(brTime);
                    LOG.info("BlockReport of " + bReport.length + " blocks got processed in " + brTime + " msecs");
                    if (this.resetBlockReportTime) {
                        this.lastBlockReport = startTime - (long)R.nextInt((int)this.blockReportInterval);
                        this.resetBlockReportTime = false;
                    } else {
                        this.lastBlockReport += (DataNode.now() - this.lastBlockReport) / this.blockReportInterval * this.blockReportInterval;
                    }
                    this.processCommand(cmd);
                }
                if (this.blockScanner != null && this.blockScannerThread == null && this.upgradeManager.isUpgradeCompleted()) {
                    LOG.info("Starting Periodic block scanner.");
                    this.blockScannerThread = new Daemon(this.blockScanner);
                    this.blockScannerThread.start();
                }
                long waitTime = this.heartBeatInterval - (System.currentTimeMillis() - this.lastHeartbeat);
                LinkedList<Block> linkedList3 = this.receivedBlockList;
                synchronized (linkedList3) {
                    if (waitTime > 0L && this.receivedBlockList.size() == 0) {
                        try {
                            this.receivedBlockList.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            // empty catch block
                        }
                    }
                }
            }
            catch (RemoteException re) {
                String reClass = re.getClassName();
                if (UnregisteredDatanodeException.class.getName().equals(reClass) || .DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) {
                    LOG.warn("DataNode is shutting down: " + StringUtils.stringifyException(re));
                    this.shutdown();
                    return;
                }
                LOG.warn(StringUtils.stringifyException(re));
            }
            catch (IOException e) {
                LOG.warn(StringUtils.stringifyException(e));
            }
        }
    }

    private boolean processCommand(DatanodeCommand[] cmds) {
        if (cmds != null) {
            for (DatanodeCommand cmd : cmds) {
                try {
                    if (!this.processCommand(cmd)) {
                        return false;
                    }
                }
                catch (IOException ioe) {
                    LOG.warn("Error processing datanode Command", ioe);
                }
            }
        }
        return true;
    }

    private boolean processCommand(DatanodeCommand cmd) throws IOException {
        if (cmd == null) {
            return true;
        }
        BlockCommand bcmd = cmd instanceof BlockCommand ? (BlockCommand)cmd : null;
        switch (cmd.getAction()) {
            case 1: {
                this.transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
                this.myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
                break;
            }
            case 2: {
                Block[] toDelete = bcmd.getBlocks();
                try {
                    if (this.blockScanner != null) {
                        this.blockScanner.deleteBlocks(toDelete);
                    }
                    this.data.invalidate(toDelete);
                }
                catch (IOException e) {
                    this.checkDiskError();
                    throw e;
                }
                this.myMetrics.blocksRemoved.inc(toDelete.length);
                break;
            }
            case 3: {
                this.shutdown();
                return false;
            }
            case 4: {
                LOG.info("DatanodeCommand action: DNA_REGISTER");
                if (!this.shouldRun) break;
                this.register();
                break;
            }
            case 5: {
                this.storage.finalizeUpgrade();
                break;
            }
            case 101: {
                this.processDistributedUpgradeCommand((UpgradeCommand)cmd);
                break;
            }
            case 6: {
                this.recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
                break;
            }
            default: {
                LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
            }
        }
        return true;
    }

    private void processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
        assert (this.upgradeManager != null) : "DataNode.upgradeManager is null.";
        this.upgradeManager.processUpgradeCommand(comm);
    }

    private void startDistributedUpgradeIfNeeded() throws IOException {
        UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
        assert (um != null) : "DataNode.upgradeManager is null.";
        if (!um.getUpgradeState()) {
            return;
        }
        um.setUpgradeState(false, um.getUpgradeVersion());
        um.startUpgrade();
    }

    private void transferBlock(Block block, DatanodeInfo[] xferTargets) throws IOException {
        if (!this.data.isValidBlock(block)) {
            String errStr = "Can't send invalid block " + block;
            LOG.info(errStr);
            this.namenode.errorReport(this.dnRegistration, 2, errStr);
            return;
        }
        long onDiskLength = this.data.getLength(block);
        if (block.getNumBytes() > onDiskLength) {
            this.namenode.reportBadBlocks(new LocatedBlock[]{new LocatedBlock(block, new DatanodeInfo[]{new DatanodeInfo(this.dnRegistration)})});
            LOG.info("Can't replicate block " + block + " because on-disk length " + onDiskLength + " is shorter than NameNode recorded length " + block.getNumBytes());
            return;
        }
        int numTargets = xferTargets.length;
        if (numTargets > 0) {
            if (LOG.isInfoEnabled()) {
                StringBuilder xfersBuilder = new StringBuilder();
                for (int i = 0; i < numTargets; ++i) {
                    xfersBuilder.append(xferTargets[i].getName());
                    xfersBuilder.append(" ");
                }
                LOG.info(this.dnRegistration + " Starting thread to transfer block " + block + " to " + xfersBuilder);
            }
            new Daemon(new DataTransfer(xferTargets, block, this)).start();
        }
    }

    private void transferBlocks(Block[] blocks, DatanodeInfo[][] xferTargets) {
        for (int i = 0; i < blocks.length; ++i) {
            try {
                this.transferBlock(blocks[i], xferTargets[i]);
                continue;
            }
            catch (IOException ie) {
                LOG.warn("Failed to transfer block " + blocks[i], ie);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
        if (block == null || delHint == null) {
            throw new IllegalArgumentException(block == null ? "Block is null" : "delHint is null");
        }
        LinkedList<Block> linkedList = this.receivedBlockList;
        synchronized (linkedList) {
            LinkedList<String> linkedList2 = this.delHints;
            synchronized (linkedList2) {
                this.receivedBlockList.add(block);
                this.delHints.add(delHint);
                this.receivedBlockList.notifyAll();
            }
        }
    }

    @Override
    public void run() {
        LOG.info(this.dnRegistration + "In DataNode.run, data = " + this.data);
        this.dataXceiverServer.start();
        while (this.shouldRun) {
            try {
                this.startDistributedUpgradeIfNeeded();
                this.offerService();
            }
            catch (Exception ex) {
                LOG.error("Exception: " + StringUtils.stringifyException(ex));
                if (!this.shouldRun) continue;
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        LOG.info(this.dnRegistration + ":Finishing DataNode in: " + this.data);
        this.shutdown();
    }

    public static void runDatanodeDaemon(DataNode dn) throws IOException {
        if (dn != null) {
            dn.register();
            dn.dataNodeThread = new Thread((Runnable)dn, dnThreadName);
            dn.dataNodeThread.setDaemon(true);
            dn.dataNodeThread.start();
        }
    }

    static boolean isDatanodeUp(DataNode dn) {
        return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
    }

    public static DataNode instantiateDataNode(String[] args, Configuration conf) throws IOException {
        if (conf == null) {
            conf = new Configuration();
        }
        if (!DataNode.parseArguments(args, conf)) {
            DataNode.printUsage();
            return null;
        }
        if (conf.get("dfs.network.script") != null) {
            LOG.error("This configuration for rack identification is not supported anymore. RackID resolution is handled by the NameNode.");
            System.exit(-1);
        }
        String[] dataDirs = conf.getStrings("dfs.data.dir");
        dnThreadName = "DataNode: [" + StringUtils.arrayToString(dataDirs) + "]";
        return DataNode.makeInstance(dataDirs, conf);
    }

    public static DataNode createDataNode(String[] args, Configuration conf) throws IOException {
        DataNode dn = DataNode.instantiateDataNode(args, conf);
        DataNode.runDatanodeDaemon(dn);
        return dn;
    }

    void join() {
        if (this.dataNodeThread != null) {
            try {
                this.dataNodeThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public static DataNode makeInstance(String[] dataDirs, Configuration conf) throws IOException {
        ArrayList<File> dirs = new ArrayList<File>();
        for (int i = 0; i < dataDirs.length; ++i) {
            File data = new File(dataDirs[i]);
            try {
                DiskChecker.checkDir(data);
                dirs.add(data);
                continue;
            }
            catch (DiskChecker.DiskErrorException e) {
                LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
            }
        }
        if (dirs.size() > 0) {
            return new DataNode(conf, dirs);
        }
        LOG.error("All directories in dfs.data.dir are invalid.");
        return null;
    }

    public String toString() {
        return "DataNode{data=" + this.data + ", localName='" + this.dnRegistration.getName() + "'" + ", storageID='" + this.dnRegistration.getStorageID() + "'" + ", xmitsInProgress=" + this.xmitsInProgress.get() + "}";
    }

    private static void printUsage() {
        System.err.println("Usage: java DataNode");
        System.err.println("           [-rollback]");
    }

    private static boolean parseArguments(String[] args, Configuration conf) {
        int argsLen = args == null ? 0 : args.length;
        HdfsConstants.StartupOption startOpt = HdfsConstants.StartupOption.REGULAR;
        for (int i = 0; i < argsLen; ++i) {
            String cmd = args[i];
            if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
                LOG.error("-r, --rack arguments are not supported anymore. RackID resolution is handled by the NameNode.");
                System.exit(-1);
                continue;
            }
            if ("-rollback".equalsIgnoreCase(cmd)) {
                startOpt = HdfsConstants.StartupOption.ROLLBACK;
                continue;
            }
            if ("-regular".equalsIgnoreCase(cmd)) {
                startOpt = HdfsConstants.StartupOption.REGULAR;
                continue;
            }
            return false;
        }
        DataNode.setStartupOption(conf, startOpt);
        return true;
    }

    private static void setStartupOption(Configuration conf, HdfsConstants.StartupOption opt) {
        conf.set("dfs.datanode.startup", opt.toString());
    }

    static HdfsConstants.StartupOption getStartupOption(Configuration conf) {
        return HdfsConstants.StartupOption.valueOf(conf.get("dfs.datanode.startup", HdfsConstants.StartupOption.REGULAR.toString()));
    }

    public void scheduleBlockReport(long delay) {
        this.lastBlockReport = delay > 0L ? System.currentTimeMillis() - (this.blockReportInterval - (long)R.nextInt((int)delay)) : this.lastHeartbeat - this.blockReportInterval;
        this.resetBlockReportTime = true;
    }

    public FSDatasetInterface getFSDataset() {
        return this.data;
    }

    public static void main(String[] args) {
        try {
            StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
            DataNode datanode = DataNode.createDataNode(args, null);
            if (datanode != null) {
                datanode.join();
            }
        }
        catch (Throwable e) {
            LOG.error(StringUtils.stringifyException(e));
            System.exit(-1);
        }
    }

    @Override
    public BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException {
        Block stored;
        if (LOG.isDebugEnabled()) {
            LOG.debug("block=" + block);
        }
        if ((stored = this.data.getStoredBlock(block.getBlockId())) == null) {
            return null;
        }
        BlockMetaDataInfo info = new BlockMetaDataInfo(stored, this.blockScanner.getLastScanTime(stored));
        if (LOG.isDebugEnabled()) {
            LOG.debug("getBlockMetaDataInfo successful block=" + stored + " length " + stored.getNumBytes() + " genstamp " + stored.getGenerationStamp());
        }
        this.data.validateBlockMetadata(stored);
        return info;
    }

    public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
        Daemon d = new Daemon(this.threadGroup, new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < blocks.length; ++i) {
                    try {
                        DataNode.logRecoverBlock("NameNode", blocks[i], targets[i]);
                        DataNode.this.recoverBlock(blocks[i], false, targets[i], true);
                        continue;
                    }
                    catch (IOException e) {
                        LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
                    }
                }
            }
        });
        d.start();
        return d;
    }

    @Override
    public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
        LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes() + "), newblock=" + newblock + "(length=" + newblock.getNumBytes() + "), datanode=" + this.dnRegistration.getName());
        this.data.updateBlock(oldblock, newblock);
        if (finalize) {
            this.data.finalizeBlock(newblock);
            this.myMetrics.blocksWritten.inc();
            this.notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
            LOG.info("Received block " + newblock + " of size " + newblock.getNumBytes() + " as part of lease recovery.");
        }
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        if (protocol.equals(InterDatanodeProtocol.class.getName())) {
            return 3L;
        }
        if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
            return 3L;
        }
        throw new IOException("Unknown protocol to " + this.getClass().getSimpleName() + ": " + protocol);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeID[] datanodeids, boolean closeFile) throws IOException {
        Map<Block, Block> map = this.ongoingRecovery;
        synchronized (map) {
            Block tmp = new Block();
            tmp.set(block.getBlockId(), block.getNumBytes(), 1L);
            if (this.ongoingRecovery.get(tmp) != null) {
                String msg = "Block " + block + " is already being recovered, " + " ignoring this request to recover it.";
                LOG.info(msg);
                throw new IOException(msg);
            }
            this.ongoingRecovery.put(block, block);
        }
        try {
            ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>();
            long minlength = Long.MAX_VALUE;
            int errorCount = 0;
            for (DatanodeID id : datanodeids) {
                try {
                    DataNode datanode = this.dnRegistration.equals(id) ? this : DataNode.createInterDataNodeProtocolProxy(id, this.getConf());
                    BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
                    if (info == null || info.getGenerationStamp() < block.getGenerationStamp()) continue;
                    if (keepLength) {
                        if (info.getNumBytes() != block.getNumBytes()) continue;
                        syncList.add(new BlockRecord(id, datanode, new Block(info)));
                        continue;
                    }
                    syncList.add(new BlockRecord(id, datanode, new Block(info)));
                    if (info.getNumBytes() >= minlength) continue;
                    minlength = info.getNumBytes();
                }
                catch (IOException e) {
                    ++errorCount;
                    InterDatanodeProtocol.LOG.warn("Failed to getBlockMetaDataInfo for block (=" + block + ") from datanode (=" + id + ")", e);
                }
            }
            if (syncList.isEmpty() && errorCount > 0) {
                throw new IOException("All datanodes failed: block=" + block + ", datanodeids=" + Arrays.asList(datanodeids));
            }
            if (!keepLength) {
                block.setNumBytes(minlength);
            }
            LocatedBlock locatedBlock = this.syncBlock(block, syncList, closeFile);
            return locatedBlock;
        }
        finally {
            Map<Block, Block> map2 = this.ongoingRecovery;
            synchronized (map2) {
                this.ongoingRecovery.remove(block);
            }
        }
    }

    private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList, boolean closeFile) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + "), syncList=" + syncList + ", closeFile=" + closeFile);
        }
        if (syncList.isEmpty()) {
            this.namenode.commitBlockSynchronization(block, 0L, 0L, closeFile, true, DatanodeID.EMPTY_ARRAY);
            return null;
        }
        ArrayList<DatanodeID> successList = new ArrayList<DatanodeID>();
        long generationstamp = this.namenode.nextGenerationStamp(block);
        Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
        for (BlockRecord r : syncList) {
            try {
                r.datanode.updateBlock(r.block, newblock, closeFile);
                successList.add(r.id);
            }
            catch (IOException e) {
                InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + newblock + ", datanode=" + r.id + ")", e);
            }
        }
        if (!successList.isEmpty()) {
            DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
            this.namenode.commitBlockSynchronization(block, newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false, nlist);
            DatanodeInfo[] info = new DatanodeInfo[nlist.length];
            for (int i = 0; i < nlist.length; ++i) {
                info[i] = new DatanodeInfo(nlist[i]);
            }
            return new LocatedBlock(newblock, info);
        }
        StringBuilder b = new StringBuilder();
        for (BlockRecord r : syncList) {
            b.append("\n  " + r.id);
        }
        throw new IOException("Cannot recover " + block + ", none of these " + syncList.size() + " datanodes success {" + b + "\n}");
    }

    @Override
    public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets) throws IOException {
        DataNode.logRecoverBlock("Client", block, targets);
        return this.recoverBlock(block, keepLength, targets, false);
    }

    private static void logRecoverBlock(String who, Block block, DatanodeID[] targets) {
        StringBuilder msg = new StringBuilder(targets[0].getName());
        for (int i = 1; i < targets.length; ++i) {
            msg.append(", " + targets[i].getName());
        }
        LOG.info(who + " calls recoverBlock(block=" + block + ", targets=[" + msg + "])");
    }

    static {
        Configuration.addDefaultResource("hdfs-default.xml");
        Configuration.addDefaultResource("hdfs-site.xml");
        ClientTraceLog = LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
        datanodeObject = null;
        R = new Random();
    }

    private static class BlockRecord {
        final DatanodeID id;
        final InterDatanodeProtocol datanode;
        final Block block;

        BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
            this.id = id;
            this.datanode = datanode;
            this.block = block;
        }

        public String toString() {
            return "block:" + this.block + " node:" + this.id;
        }
    }

    class DataTransfer
    implements Runnable {
        DatanodeInfo[] targets;
        Block b;
        DataNode datanode;

        public DataTransfer(DatanodeInfo[] targets, Block b, DataNode datanode) throws IOException {
            this.targets = targets;
            this.b = b;
            this.datanode = datanode;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block6: {
                DataNode.this.xmitsInProgress.getAndIncrement();
                Socket sock = null;
                DataOutputStream out = null;
                BlockSender blockSender = null;
                try {
                    InetSocketAddress curTarget = NetUtils.createSocketAddr(this.targets[0].getName());
                    sock = DataNode.this.newSocket();
                    NetUtils.connect(sock, curTarget, DataNode.this.socketTimeout);
                    sock.setSoTimeout(this.targets.length * DataNode.this.socketTimeout);
                    long writeTimeout = DataNode.this.socketWriteTimeout + 5000 * (this.targets.length - 1);
                    OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
                    out = new DataOutputStream(new BufferedOutputStream(baseStream, FSConstants.SMALL_BUFFER_SIZE));
                    blockSender = new BlockSender(this.b, 0L, this.b.getNumBytes(), false, false, false, this.datanode);
                    DatanodeInfo srcNode = new DatanodeInfo(DataNode.this.dnRegistration);
                    out.writeShort(14);
                    out.writeByte(80);
                    out.writeLong(this.b.getBlockId());
                    out.writeLong(this.b.getGenerationStamp());
                    out.writeInt(0);
                    out.writeBoolean(false);
                    Text.writeString(out, DataNode.EMPTY_DEL_HINT);
                    out.writeBoolean(true);
                    srcNode.write(out);
                    out.writeInt(this.targets.length - 1);
                    for (int i = 1; i < this.targets.length; ++i) {
                        this.targets[i].write(out);
                    }
                    blockSender.sendBlock(out, baseStream, null);
                    LOG.info(DataNode.this.dnRegistration + ":Transmitted block " + this.b + " to " + curTarget);
                    DataNode.this.xmitsInProgress.getAndDecrement();
                }
                catch (IOException ie) {
                    LOG.warn(DataNode.this.dnRegistration + ":Failed to transfer " + this.b + " to " + this.targets[0].getName() + " got " + StringUtils.stringifyException(ie));
                    break block6;
                }
                finally {
                    DataNode.this.xmitsInProgress.getAndDecrement();
                    IOUtils.closeStream(blockSender);
                    IOUtils.closeStream(out);
                    IOUtils.closeSocket(sock);
                }
                IOUtils.closeStream(blockSender);
                IOUtils.closeStream(out);
                IOUtils.closeSocket(sock);
            }
        }
    }
}

