IDEA手写ReduceJoin案例实操

需求:
将订单数据表order(id;pid;amount)和商品信息表pd(pid;pname)根据pid合并到订单数据表中。

OrderBean类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.atguigu.bean;

import org.apache.hadoop.io.WritableComparable;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
private String id;
private String pid;
private int amount;
private String pname;

@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getPid() {
return pid;
}

public void setPid(String pid) {
this.pid = pid;
}

public int getAmount() {
return amount;
}

public void setAmount(int amount) {
this.amount = amount;
}

public String getPname() {
return pname;
}

public void setPname(String pname) {
this.pname = pname;
}

public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
}

public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
}

public int compareTo(OrderBean o) {
int compare = this.pid.compareTo(o.pid);//首先按照pid分组
if(compare == 0){
return o.pname.compareTo(this.pname);//其次再按照pname排序,倒序排列,因为需要pname字段为空时排到后面
}else{
return compare;
}
}
}

RJMapper类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.atguigu.reducejoin;

import com.atguigu.bean.OrderBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class RJMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
private OrderBean orderbean = new OrderBean();
private String filename;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();//获取MapTask对应的文件切片
filename = fs.getPath().getName();//获取文件切片所对应的文件名
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
if(filename.equals("order.txt")){
orderbean.setId(fields[0]);
orderbean.setPid(fields[1]);
orderbean.setAmount(Integer.parseInt(fields[2]));
orderbean.setPname("");
}else if(filename.equals("pd.txt")){
orderbean.setPid(fields[0]);
orderbean.setPname(fields[1]);
orderbean.setId("");
orderbean.setAmount(0);
}

context.write(orderbean,NullWritable.get());
}
}

RJComparator类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.atguigu.reducejoin;

import com.atguigu.bean.OrderBean;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class RJComparator extends WritableComparator {

protected RJComparator() {
super(OrderBean.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getPid().compareTo(ob.getPid());//按照Pid分组
}
}

RJReducer类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.atguigu.reducejoin;

import com.atguigu.bean.OrderBean;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class RJReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {

@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
/**
* 迭代器中内容有pd.txt文件中的信息,且排在第一行;之后还有order.txt文件中的内容
* reduce方法中,内容的key都是相同的
*/
Iterator<NullWritable> iterator = values.iterator();
iterator.next();//将指针指向第一行,取出包括pid和pname的pd.txt的内容
String pname = key.getPname();//取出pname

while(iterator.hasNext()){
iterator.next();//移动迭代器指针,转向之后order.txt的内容,为pname字段填充有效内容
key.setPname(pname);
context.write(key,NullWritable.get());
}

}
}

RJDriver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.atguigu.reducejoin;

import com.atguigu.bean.OrderBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class RJDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(RJDriver.class);
job.setMapperClass(RJMapper.class);
job.setReducerClass(RJReducer.class);

job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);

job.setGroupingComparatorClass(RJComparator.class);

job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("F:\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\output"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}