IDEA手写Combiner合并案例实操

需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。实验代码采用WordCount案例。只需更改WcDriver类中的job.setCombinerClass()指定Combiner类即可。
因为Combiner在这里的作用也是对单词进行汇总,然后把次数相加,这与WcReducer的作用一样,所以这里代码直接写为job.setCombinerClass(WcReducer.class)即可。

WcDriver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.atguigu.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WcDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//套路化编写
//1.获取一个Job实例,类似整个MR这条线Context
Job job = Job.getInstance(new Configuration());

//2.设置我们的类路径(Classpath)
job.setJarByClass(WcDriver.class);

//3.设置Mapper和Reducer
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);

//4.设置Mapper和Reducer输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置Combiner
job.setCombinerClass(WcReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//5.设置输入输出数据
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

//6.提交我们的Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}