IDEA手写WritableComparable排序案例实操

需求:对序列化案例中产生的文件再进行一次按照总流量倒序输出。

利用框架在MapTask结束后必然要对Key排序的特点,对总流量进行排序

FlowBean类,实现WritableComparable接口(框架实现多态的比较接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.atguigu.writablecomparable;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;

public FlowBean() {//反射需要提供无参构造方法
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

public void set(long upFlow,long downFlow){
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

/**
* 序列化方法
* @param dataOutput:框架提供的数据出口
* @throws IOException
*/
public void write(DataOutput dataOutput) throws IOException {
//顺序和反序列化的顺序要一模一样
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

/**
* 反序列化方法
* @param dataInput:框架提供的数据来源
* @throws IOException
*/
public void readFields(DataInput dataInput) throws IOException {
//顺序和序列化的顺序要一模一样
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}

@Override
public int compareTo(FlowBean o) {//按照总流量降序排列
return Long.compare(o.sumFlow,this.sumFlow);
}
}

SortMapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.atguigu.writablecomparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text,FlowBean,Text> {

private FlowBean flow = new FlowBean();
private Text phone = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
phone.set(fields[0]);
flow.setUpFlow(Long.parseLong(fields[1]));
flow.setDownFlow(Long.parseLong(fields[2]));
flow.setSumFlow(Long.parseLong(fields[3]));

context.write(flow,phone);
}
}

SortReducer类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.atguigu.writablecomparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


import java.io.IOException;

public class SortReducer extends Reducer<FlowBean, Text, Text,FlowBean> {

@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}

SortDriver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.atguigu.writablecomparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path("F:/output"));
FileOutputFormat.setOutputPath(job, new Path("F:/output2"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}

需求:在前一个需求的基础上,增加自定义分区类,分区按照省份手机号设置

MyPartitioner2类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.atguigu.writablecomparable2;

import com.atguigu.writablecomparable.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class MyPartitioner2 extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int i) {
switch (text.toString().substring(0, 3)) {
case "136":
return 0;
case "137":
return 1;
case "138":
return 2;
case "139":
return 3;
default:
return 4;

}
}
}

SortDriver类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.atguigu.writablecomparable2;

import com.atguigu.writablecomparable.FlowBean;
import com.atguigu.writablecomparable.SortMapper;
import com.atguigu.writablecomparable.SortReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(MyPartitioner2.class);
job.setNumReduceTasks(5);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path("F:/output"));
FileOutputFormat.setOutputPath(job, new Path("F:/output2"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}