DEV Community

Chen Debra
Chen Debra

Posted on

Efficient Management of MapReduce Tasks with DolphinScheduler

MapReduce is a programming model used to process and generate large datasets, primarily for parallel processing of massive datasets at the terabyte level. This article provides a detailed overview of how DolphinScheduler is applied to MapReduce tasks, including the distinctions between GenericOptionsParser and args, a comprehensive explanation of the hadoop jar command parameters, MapReduce code examples, and instructions on configuring and running MapReduce tasks in DolphinScheduler.

Differences between GenericOptionsParser and args

Using GenericOptionsParser is as follows:

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
Enter fullscreen mode Exit fullscreen mode

Analyzing the source code of GenericOptionsParser, it follows these steps:

  1. Constructor:

    public GenericOptionsParser(Configuration conf, String[] args) 
          throws IOException {
        this(conf, new Options(), args); 
    }
    
  2. Parsing Options:

    private boolean parseGeneralOptions(Options opts, String[] args) throws IOException {
        opts = buildGeneralOptions(opts);
        CommandLineParser parser = new GnuParser();
        boolean parsed = false;
        try {
            commandLine = parser.parse(opts, preProcessForWindows(args), true);
            processGeneralOptions(commandLine);
            parsed = true;
        } catch (ParseException e) {
            LOG.warn("options parsing failed: " + e.getMessage());
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("general options are: ", opts);
        }
        return parsed;
    }
    

    The GenericOptionsParser reads options such as fs, jt, D, libjars, files, archives, and tokenCacheFile and places them in Hadoop’s Configuration.

With args, however, parsing these options (fs, jt, D, etc.) must be handled manually.

Complete Hadoop jar Command Parameters

hadoop jar wordcount.jar org.myorg.WordCount \
    -fs hdfs://namenode.example.com:8020 \
    -jt resourcemanager.example.com:8032 \
    -D mapreduce.job.queuename=default \
    -libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
    -files /path/to/file1.txt,/path/to/file2.txt \
    -archives /path/to/archive1.zip,/path/to/archive2.tar.gz \
    -tokenCacheFile /path/to/credential.file \
    /input /output
Enter fullscreen mode Exit fullscreen mode

This command:

  1. Submits the job to the specified HDFS.
  2. Uses the specified YARN ResourceManager.
  3. Queues the job under default.
  4. Adds dependencies and files.
  5. Distributes archives and credentials.

MapReduce Examples

Classic WordCount Example

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\\s+");
        for (String field : fields) {
            word.set(field);
            context.write(word, one);
        }
    }
}

// Other classes omitted for brevity
Enter fullscreen mode Exit fullscreen mode

File Distribution Example

public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private List<String> whiteList = new ArrayList<>();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException {
        URI[] files = context.getCacheFiles();
        if (files != null && files.length > 0) {
            try (BufferedReader reader = new BufferedReader(new FileReader("white.txt"))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    whiteList.add(line);
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Using MapReduce with DolphinScheduler

Setting up the Yarn test Queue

In capacity-scheduler.xml:

<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default, test</value>
</property>
Enter fullscreen mode Exit fullscreen mode

Refresh with yarn rmadmin -refreshQueues.

Execution Results

Image description

Yarn Job

Image description

Source Code Analysis

org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs

String others = param.getOthers();
// TODO This means that if the queue isn’t specified using the -D mapreduce.job.queuename option, 
// then the queue name will be taken directly from the "Yarn Queue" input field on the page.
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
    String yarnQueue = param.getYarnQueue();
    if (StringUtils.isNotEmpty(yarnQueue)) {
        args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
    }
}

// TODO This part represents the optional parameters input field on the page,
// where -conf, -archives, -files, -libjars, and -D can be specified.
if (StringUtils.isNotEmpty(others)) {
    args.add(others);
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)