需求:
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
程序可复用IDEA手写自定义bean对象实现序列化接口Writable中的FlowBean、FlowMapper、FlowReducer类,只需重写Partitioner类和Driver类
MyPartitioner类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.atguigu.parition;
import com.atguigu.flow.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, FlowBean> {//数据是从Mapper中出来的,所以泛型就是Mapper的输出泛型
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
String phone = text.toString();
switch (phone.substring(0,3)){
/**
* maven自带的jar包是1.5的,但是String作为switch的判断是1.7以后的特性
* 这里键入alt + enter可以在IDEA中自动更改配置文件,更新依赖
*/
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;
}
}
}
PartitionerDriver类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package com.atguigu.parition;
import com.atguigu.flow.FlowBean;
import com.atguigu.flow.FlowMapper;
import com.atguigu.flow.FlowReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionerDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job实例
Job job = Job.getInstance(new Configuration());
//2.设置类路径
job.setJarByClass(PartitionerDriver.class);
//3.设置Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//手动设置ReduceTask的数量和指定的Partitioner类
job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartitioner.class);
//4.设置输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//5.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("F:/input"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
//6.提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}