需求:
表4-4 订单数据表t_order1
2
3
4
5
6
7id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
表4-5 商品信息表t_product1
2
3
4pid pname
01 小米
02 华为
03 格力
将商品信息表中数据根据商品pid合并到订单数据表中。
表4-6 最终数据形式1
2
3
4
5
6
7id pname amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6
OrderBean类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75package com.atguigu.bean;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String id;
private String pid;
private int amount;
private String pname;
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
}
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
}
public int compareTo(OrderBean o) {
int compare = this.pid.compareTo(o.pid);//首先按照pid分组
if(compare == 0){
return o.pname.compareTo(this.pname);//其次再按照pname排序,倒序排列,因为需要pname字段为空时排到后面
}else{
return compare;
}
}
}
MJMapper类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package com.atguigu.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MJMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
private Map<String,String> pMap = new HashMap<>();//存pd.txt的内容
private Text k = new Text();
/**
* 需要在执行map之前将缓存内容读进内存
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();
BufferedReader bufferedReader = new BufferedReader(new FileReader(path));
String line;
while(StringUtils.isNotEmpty((line = bufferedReader.readLine()))){
String[] fields = line.split("\t");
pMap.put(fields[0],fields[1]);
}
IOUtils.closeStream(bufferedReader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
String pname = pMap.get(fields[1]);
if(pname == null) pname = "NULL";
k.set(fields[0] + "\t" + pname + "\t" + fields[2]);
context.write(k,NullWritable.get());
}
}
MJDriver类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package com.atguigu.mapjoin;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MJDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(MJDriver.class);
job.setMapperClass(MJMapper.class);
job.setNumReduceTasks(0);//设置ReduceTask数量为0,就不会进入Shuffle和Reduce,数据直接进入OutputFormat
job.addCacheFile(URI.create("file:///F:/input/pd.txt"));//将小表缓存进入内存
FileInputFormat.setInputPaths(job, new Path("F:\\input/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("F:/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}