/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.WordCount;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestJobCounters {
    private static Path IN_DIR = null;
    private static Path OUT_DIR = null;
    private static Path testdir = null;
    private static Path[] inFiles = new Path[5];

    private void validateFileCounters(org.apache.hadoop.mapred.Counters counter, long fileBytesRead, long fileBytesWritten, long mapOutputBytes, long mapOutputMaterializedBytes) {
        Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)FileInputFormatCounter.BYTES_READ)).getValue() != 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)fileBytesRead, (long)((Counters.Counter)counter.findCounter((Enum)FileInputFormatCounter.BYTES_READ)).getValue());
        Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)FileOutputFormatCounter.BYTES_WRITTEN)).getValue() != 0L ? 1 : 0) != 0);
        if (mapOutputBytes >= 0L) {
            Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_OUTPUT_BYTES)).getValue() != 0L ? 1 : 0) != 0);
        }
        if (mapOutputMaterializedBytes >= 0L) {
            Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)).getValue() != 0L ? 1 : 0) != 0);
        }
    }

    private void validateOldFileCounters(org.apache.hadoop.mapred.Counters counter, long fileBytesRead, long fileBytesWritten, long mapOutputBytes, long mapOutputMaterializedBytes) {
        Assert.assertEquals((long)fileBytesRead, (long)((Counters.Counter)counter.findCounter((Enum)FileInputFormat.Counter.BYTES_READ)).getValue());
        Assert.assertEquals((long)fileBytesRead, (long)((Counters.Counter)counter.findCounter((Enum)FileInputFormat.Counter.BYTES_READ)).getValue());
        Assert.assertEquals((long)fileBytesWritten, (long)((Counters.Counter)counter.findCounter((Enum)FileOutputFormat.Counter.BYTES_WRITTEN)).getValue());
        Assert.assertEquals((long)fileBytesWritten, (long)((Counters.Counter)counter.findCounter((Enum)FileOutputFormat.Counter.BYTES_WRITTEN)).getValue());
        if (mapOutputBytes >= 0L) {
            Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_OUTPUT_BYTES)).getValue() != 0L ? 1 : 0) != 0);
        }
        if (mapOutputMaterializedBytes >= 0L) {
            Assert.assertTrue((((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES)).getValue() != 0L ? 1 : 0) != 0);
        }
    }

    private void validateCounters(org.apache.hadoop.mapred.Counters counter, long spillRecCnt, long mapInputRecords, long mapOutputRecords) {
        Assert.assertEquals((long)spillRecCnt, (long)((Counters.Counter)counter.findCounter((Enum)TaskCounter.SPILLED_RECORDS)).getCounter());
        Assert.assertEquals((long)mapInputRecords, (long)((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_INPUT_RECORDS)).getCounter());
        Assert.assertEquals((long)mapOutputRecords, (long)((Counters.Counter)counter.findCounter((Enum)TaskCounter.MAP_OUTPUT_RECORDS)).getCounter());
    }

    private void removeWordsFile(Path inpFile, Configuration conf) throws IOException {
        FileSystem fs = inpFile.getFileSystem(conf);
        if (fs.exists(inpFile) && !fs.delete(inpFile, false)) {
            throw new IOException("Failed to delete " + inpFile);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void createWordsFile(Path inpFile, Configuration conf) throws IOException {
        FileSystem fs = inpFile.getFileSystem(conf);
        if (fs.exists(inpFile)) {
            return;
        }
        FSDataOutputStream out = fs.create(inpFile);
        try {
            int REPLICAS = 5;
            int NUMLINES = 1024;
            int NUMWORDSPERLINE = 4;
            String WORD = "zymurgy";
            Formatter fmt = new Formatter(new StringBuilder());
            for (int i = 0; i < REPLICAS; ++i) {
                for (int j = 1; j <= NUMLINES * NUMWORDSPERLINE; j += NUMWORDSPERLINE) {
                    ((StringBuilder)fmt.out()).setLength(0);
                    for (int k = 0; k < NUMWORDSPERLINE; ++k) {
                        fmt.format("%s%04d ", "zymurgy", j + k);
                    }
                    ((StringBuilder)fmt.out()).append("\n");
                    out.writeBytes(fmt.toString());
                }
            }
        }
        finally {
            out.close();
        }
    }

    private static long getFileSize(Path path) throws IOException {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        long len = 0L;
        len += fs.getFileStatus(path).getLen();
        Path crcPath = new Path(path.getParent(), "." + path.getName() + ".crc");
        if (fs.exists(crcPath)) {
            len += fs.getFileStatus(crcPath).getLen();
        }
        return len;
    }

    @BeforeClass
    public static void initPaths() throws IOException {
        Configuration conf = new Configuration();
        Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp"));
        testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
        IN_DIR = new Path(testdir, "in");
        OUT_DIR = new Path(testdir, "out");
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        if (fs.exists(testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest")) && !fs.delete(testdir, true)) {
            throw new IOException("Could not delete " + testdir);
        }
        if (!fs.mkdirs(IN_DIR)) {
            throw new IOException("Mkdirs failed to create " + IN_DIR);
        }
        for (int i = 0; i < inFiles.length; ++i) {
            TestJobCounters.inFiles[i] = new Path(IN_DIR, "input5_2k_" + i);
        }
        TestJobCounters.createWordsFile(inFiles[0], conf);
        TestJobCounters.createWordsFile(inFiles[1], conf);
        TestJobCounters.createWordsFile(inFiles[2], conf);
    }

    @AfterClass
    public static void cleanup() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = testdir.getFileSystem(conf);
        if (fs.exists(testdir)) {
            fs.delete(testdir, true);
        }
    }

    public static JobConf createConfiguration() throws IOException {
        JobConf baseConf = new JobConf(TestJobCounters.class);
        baseConf.setOutputKeyClass(Text.class);
        baseConf.setOutputValueClass(IntWritable.class);
        baseConf.setMapperClass(WordCount.MapClass.class);
        baseConf.setCombinerClass(WordCount.Reduce.class);
        baseConf.setReducerClass(WordCount.Reduce.class);
        baseConf.setNumReduceTasks(1);
        baseConf.setInt("mapreduce.task.io.sort.mb", 1);
        baseConf.set("mapreduce.map.sort.spill.percent", "0.50");
        baseConf.setInt("mapreduce.map.combine.minspills", 3);
        return baseConf;
    }

    public static Job createJob() throws IOException {
        Configuration conf = new Configuration();
        Job baseJob = Job.getInstance((Configuration)conf);
        baseJob.setOutputKeyClass(Text.class);
        baseJob.setOutputValueClass(IntWritable.class);
        baseJob.setMapperClass(NewMapTokenizer.class);
        baseJob.setCombinerClass(NewSummer.class);
        baseJob.setReducerClass(NewSummer.class);
        baseJob.setNumReduceTasks(1);
        baseJob.getConfiguration().setInt("mapreduce.task.io.sort.mb", 1);
        baseJob.getConfiguration().set("mapreduce.map.sort.spill.percent", "0.50");
        baseJob.getConfiguration().setInt("mapreduce.map.combine.minspills", 3);
        FileInputFormat.setMinInputSplitSize((Job)baseJob, (long)Long.MAX_VALUE);
        return baseJob;
    }

    @Test
    public void testOldCounterA() throws Exception {
        JobConf conf = TestJobCounters.createConfiguration();
        conf.setNumMapTasks(3);
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        this.removeWordsFile(inFiles[3], (Configuration)conf);
        this.removeWordsFile(inFiles[4], (Configuration)conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        org.apache.hadoop.mapred.FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{IN_DIR});
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(OUT_DIR, "outputO0"));
        RunningJob myJob = JobClient.runJob((JobConf)conf);
        org.apache.hadoop.mapred.Counters c1 = myJob.getCounters();
        this.validateCounters(c1, 73728L, 15360L, 61440L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
        this.validateOldFileCounters(c1, inputSize, 61928L, 0L, 0L);
    }

    @Test
    public void testOldCounterB() throws Exception {
        JobConf conf = TestJobCounters.createConfiguration();
        TestJobCounters.createWordsFile(inFiles[3], (Configuration)conf);
        this.removeWordsFile(inFiles[4], (Configuration)conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        inputSize += TestJobCounters.getFileSize(inFiles[3]);
        conf.setNumMapTasks(4);
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        org.apache.hadoop.mapred.FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{IN_DIR});
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(OUT_DIR, "outputO1"));
        RunningJob myJob = JobClient.runJob((JobConf)conf);
        org.apache.hadoop.mapred.Counters c1 = myJob.getCounters();
        this.validateCounters(c1, 98304L, 20480L, 81920L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
    }

    @Test
    public void testOldCounterC() throws Exception {
        JobConf conf = TestJobCounters.createConfiguration();
        TestJobCounters.createWordsFile(inFiles[3], (Configuration)conf);
        TestJobCounters.createWordsFile(inFiles[4], (Configuration)conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        inputSize += TestJobCounters.getFileSize(inFiles[3]);
        inputSize += TestJobCounters.getFileSize(inFiles[4]);
        conf.setNumMapTasks(4);
        conf.setInt("mapreduce.task.io.sort.factor", 3);
        org.apache.hadoop.mapred.FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{IN_DIR});
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(OUT_DIR, "outputO2"));
        RunningJob myJob = JobClient.runJob((JobConf)conf);
        org.apache.hadoop.mapred.Counters c1 = myJob.getCounters();
        this.validateCounters(c1, 122880L, 25600L, 102400L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
    }

    @Test
    public void testOldCounterD() throws Exception {
        JobConf conf = TestJobCounters.createConfiguration();
        conf.setNumMapTasks(3);
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        conf.setNumReduceTasks(0);
        this.removeWordsFile(inFiles[3], (Configuration)conf);
        this.removeWordsFile(inFiles[4], (Configuration)conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        org.apache.hadoop.mapred.FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{IN_DIR});
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(OUT_DIR, "outputO3"));
        RunningJob myJob = JobClient.runJob((JobConf)conf);
        org.apache.hadoop.mapred.Counters c1 = myJob.getCounters();
        this.validateCounters(c1, 0L, 15360L, 61440L);
        this.validateFileCounters(c1, inputSize, 0L, -1L, -1L);
    }

    @Test
    public void testNewCounterA() throws Exception {
        Job job = TestJobCounters.createJob();
        Configuration conf = job.getConfiguration();
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        this.removeWordsFile(inFiles[3], conf);
        this.removeWordsFile(inFiles[4], conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{IN_DIR});
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(OUT_DIR, "outputN0"));
        Assert.assertTrue((boolean)job.waitForCompletion(true));
        org.apache.hadoop.mapred.Counters c1 = org.apache.hadoop.mapred.Counters.downgrade((Counters)job.getCounters());
        this.validateCounters(c1, 73728L, 15360L, 61440L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
    }

    @Test
    public void testNewCounterB() throws Exception {
        Job job = TestJobCounters.createJob();
        Configuration conf = job.getConfiguration();
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        TestJobCounters.createWordsFile(inFiles[3], conf);
        this.removeWordsFile(inFiles[4], conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        inputSize += TestJobCounters.getFileSize(inFiles[3]);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{IN_DIR});
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(OUT_DIR, "outputN1"));
        Assert.assertTrue((boolean)job.waitForCompletion(true));
        org.apache.hadoop.mapred.Counters c1 = org.apache.hadoop.mapred.Counters.downgrade((Counters)job.getCounters());
        this.validateCounters(c1, 98304L, 20480L, 81920L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
    }

    @Test
    public void testNewCounterC() throws Exception {
        Job job = TestJobCounters.createJob();
        Configuration conf = job.getConfiguration();
        conf.setInt("mapreduce.task.io.sort.factor", 3);
        TestJobCounters.createWordsFile(inFiles[3], conf);
        TestJobCounters.createWordsFile(inFiles[4], conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        inputSize += TestJobCounters.getFileSize(inFiles[3]);
        inputSize += TestJobCounters.getFileSize(inFiles[4]);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{IN_DIR});
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(OUT_DIR, "outputN2"));
        Assert.assertTrue((boolean)job.waitForCompletion(true));
        org.apache.hadoop.mapred.Counters c1 = org.apache.hadoop.mapred.Counters.downgrade((Counters)job.getCounters());
        this.validateCounters(c1, 122880L, 25600L, 102400L);
        this.validateFileCounters(c1, inputSize, 0L, 0L, 0L);
    }

    @Test
    public void testNewCounterD() throws Exception {
        Job job = TestJobCounters.createJob();
        Configuration conf = job.getConfiguration();
        conf.setInt("mapreduce.task.io.sort.factor", 2);
        job.setNumReduceTasks(0);
        this.removeWordsFile(inFiles[3], conf);
        this.removeWordsFile(inFiles[4], conf);
        long inputSize = 0L;
        inputSize += TestJobCounters.getFileSize(inFiles[0]);
        inputSize += TestJobCounters.getFileSize(inFiles[1]);
        inputSize += TestJobCounters.getFileSize(inFiles[2]);
        FileInputFormat.setInputPaths((Job)job, (Path[])new Path[]{IN_DIR});
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(OUT_DIR, "outputN3"));
        Assert.assertTrue((boolean)job.waitForCompletion(true));
        org.apache.hadoop.mapred.Counters c1 = org.apache.hadoop.mapred.Counters.downgrade((Counters)job.getCounters());
        this.validateCounters(c1, 0L, 15360L, 61440L);
        this.validateFileCounters(c1, inputSize, 0L, -1L, -1L);
    }

    @Test
    public void testOldCounters() throws Exception {
        org.apache.hadoop.mapred.Counters c1 = new org.apache.hadoop.mapred.Counters();
        c1.incrCounter((Enum)FileInputFormat.Counter.BYTES_READ, 100L);
        c1.incrCounter((Enum)FileOutputFormat.Counter.BYTES_WRITTEN, 200L);
        c1.incrCounter((Enum)TaskCounter.MAP_OUTPUT_BYTES, 100L);
        c1.incrCounter((Enum)TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES, 100L);
        this.validateFileCounters(c1, 100L, 200L, 100L, 100L);
        this.validateOldFileCounters(c1, 100L, 200L, 100L, 100L);
    }

    private long getTaskCounterUsage(JobClient client, JobID id, int numReports, int taskId, TaskType type) throws Exception {
        TaskReport[] reports = null;
        if (TaskType.MAP.equals((Object)type)) {
            reports = client.getMapTaskReports(id);
        } else if (TaskType.REDUCE.equals((Object)type)) {
            reports = client.getReduceTaskReports(id);
        }
        Assert.assertNotNull((String)("No reports found for task type '" + type.name() + "' in job " + id), (Object)reports);
        Assert.assertEquals((String)"Mismatch in task id", (long)numReports, (long)reports.length);
        org.apache.hadoop.mapred.Counters counters = reports[taskId].getCounters();
        return counters.getCounter((Enum)TaskCounter.COMMITTED_HEAP_BYTES);
    }

    private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir, String heapOptions, long targetMapValue, long targetReduceValue, FileSystem fs, JobClient client, Path inDir) throws IOException {
        JobConf jobConf = new JobConf((Configuration)conf);
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(1);
        jobConf.setMapperClass(MemoryLoaderMapper.class);
        jobConf.setReducerClass(MemoryLoaderReducer.class);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputKeyClass(LongWritable.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMaxMapAttempts(1);
        jobConf.setMaxReduceAttempts(1);
        jobConf.set("mapred.child.java.opts", heapOptions);
        jobConf.setLong("map.memory-loader.target-value", targetMapValue);
        jobConf.setLong("reduce.memory-loader.target-value", targetReduceValue);
        org.apache.hadoop.mapred.FileInputFormat.setInputPaths((JobConf)jobConf, (Path[])new Path[]{inDir});
        Path outDir = new Path(testRootDir, "out");
        fs.delete(outDir, true);
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)jobConf, (Path)outDir);
        RunningJob job = client.submitJob(jobConf);
        job.waitForCompletion();
        JobID jobID = job.getID();
        Assert.assertTrue((String)("Job " + jobID + " failed!"), (boolean)job.isSuccessful());
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeapUsageCounter() throws Exception {
        JobConf conf = new JobConf();
        LocalFileSystem fileSystem = FileSystem.getLocal((Configuration)conf);
        Path rootDir = new Path(System.getProperty("test.build.data", "/tmp"));
        Path testRootDir = new Path(rootDir, "testHeapUsageCounter");
        fileSystem.delete(testRootDir, true);
        fileSystem.setWorkingDirectory(testRootDir);
        fileSystem.deleteOnExit(testRootDir);
        MiniMRCluster mrCluster = new MiniMRCluster(1, fileSystem.getUri().toString(), 1);
        try {
            conf = mrCluster.createJobConf();
            JobClient jobClient = new JobClient(conf);
            Path inDir = new Path(testRootDir, "in");
            TestJobCounters.createWordsFile(inDir, (Configuration)conf);
            RunningJob lowMemJob = TestJobCounters.runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", 0L, 0L, (FileSystem)fileSystem, jobClient, inDir);
            JobID lowMemJobID = lowMemJob.getID();
            long lowMemJobMapHeapUsage = this.getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, TaskType.MAP);
            System.out.println("Job1 (low memory job) map task heap usage: " + lowMemJobMapHeapUsage);
            long lowMemJobReduceHeapUsage = this.getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, TaskType.REDUCE);
            System.out.println("Job1 (low memory job) reduce task heap usage: " + lowMemJobReduceHeapUsage);
            RunningJob highMemJob = TestJobCounters.runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", lowMemJobMapHeapUsage + 0x10000000L, lowMemJobReduceHeapUsage + 0x10000000L, (FileSystem)fileSystem, jobClient, inDir);
            JobID highMemJobID = highMemJob.getID();
            long highMemJobMapHeapUsage = this.getTaskCounterUsage(jobClient, highMemJobID, 1, 0, TaskType.MAP);
            System.out.println("Job2 (high memory job) map task heap usage: " + highMemJobMapHeapUsage);
            long highMemJobReduceHeapUsage = this.getTaskCounterUsage(jobClient, highMemJobID, 1, 0, TaskType.REDUCE);
            System.out.println("Job2 (high memory job) reduce task heap usage: " + highMemJobReduceHeapUsage);
            Assert.assertTrue((String)"Incorrect map heap usage reported by the map task", (lowMemJobMapHeapUsage < highMemJobMapHeapUsage ? 1 : 0) != 0);
            Assert.assertTrue((String)"Incorrect reduce heap usage reported by the reduce task", (lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage ? 1 : 0) != 0);
        }
        finally {
            mrCluster.shutdown();
            try {
                fileSystem.delete(testRootDir, true);
            }
            catch (IOException ioe) {}
        }
    }

    public static class NewSummer
    extends org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write((Object)key, (Object)this.result);
        }
    }

    public static class NewMapTokenizer
    extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)one);
            }
        }
    }

    static class MemoryLoaderReducer
    extends MapReduceBase
    implements Reducer<WritableComparable, Writable, WritableComparable, Writable> {
        static final String TARGET_VALUE = "reduce.memory-loader.target-value";
        private static MemoryLoader loader = null;

        MemoryLoaderReducer() {
        }

        public void reduce(WritableComparable key, Iterator<Writable> val, OutputCollector<WritableComparable, Writable> output, Reporter reporter) throws IOException {
            Assert.assertNotNull((String)"Reducer not configured!", (Object)loader);
            loader.load();
            output.collect((Object)key, (Object)key);
        }

        public void configure(JobConf conf) {
            loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1L));
        }
    }

    static class MemoryLoaderMapper
    extends MapReduceBase
    implements org.apache.hadoop.mapred.Mapper<WritableComparable, Writable, WritableComparable, Writable> {
        static final String TARGET_VALUE = "map.memory-loader.target-value";
        private static MemoryLoader loader = null;

        MemoryLoaderMapper() {
        }

        public void map(WritableComparable key, Writable val, OutputCollector<WritableComparable, Writable> output, Reporter reporter) throws IOException {
            Assert.assertNotNull((String)"Mapper not configured!", (Object)loader);
            loader.load();
            output.collect((Object)key, (Object)val);
        }

        public void configure(JobConf conf) {
            loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1L));
        }
    }

    static class MemoryLoader {
        private static final int DEFAULT_UNIT_LOAD_SIZE = 0xA00000;
        private long targetValue;
        private List<String> loadObjects = new ArrayList<String>();

        MemoryLoader(long targetValue) {
            this.targetValue = targetValue;
        }

        void load() {
            while (Runtime.getRuntime().totalMemory() < this.targetValue) {
                System.out.println("Loading memory with 10485760 characters. Current usage : " + Runtime.getRuntime().totalMemory());
                this.loadObjects.add(RandomStringUtils.random((int)0xA00000));
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }
}

