需求:在每个订单中,找出最贵的商品。
需求分析:
可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品。
OrderBean类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69package com.atguigu.groupingcomparator;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
/**
* 订单Bean
*/
private String orderId;//订单ID
private String productId;//商品ID
private double price;//成交金额
@Override
public String toString() {
return orderId + '\t' + productId + '\t' + price;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public int compareTo(OrderBean o) {
int compare = this.orderId.compareTo(o.orderId);//先按照订单ID分区排序
if(compare == 0){
return Double.compare(o.price,this.price);//id相同再按照金额降序排序
}else{
return compare;
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeUTF(productId);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderId = dataInput.readUTF();
this.productId = dataInput.readUTF();
this.price = dataInput.readDouble();
}
}
OrderComparator类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package com.atguigu.groupingcomparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderComparator extends WritableComparator {
/**
* 需要有两个空对象,不然反序列化时没有对象接收数据,会报空指针异常
* 所以在构造函数阶段就设置createInstanes为true,生成空对象,后面才可以进行对象属性的比较
*/
protected OrderComparator() {
super(OrderBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getOrderId().compareTo(ob.getOrderId());
}
}
OrderMapper类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package com.atguigu.groupingcomparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
private OrderBean orderbean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
orderbean.setOrderId(fields[0]);
orderbean.setProductId(fields[1]);
orderbean.setPrice(Double.parseDouble(fields[2]));
context.write(orderbean,NullWritable.get());
}
}
OrderReducer类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27package com.atguigu.groupingcomparator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
/**
* 只有一个key,默认就是排序后的每组的第一个OrderBean
* 本质上,只有一个键对象key和一个值对象value
* 所有数据都是通过KV值反序列化不断更新复用对象,达到遍历所有对象的需要
*/
context.write(key,NullWritable.get());
/**
* 如果想要取得每个组的前两名,可写为以下代码
*/
// Iterator<NullWritable> iterator = values.iterator();
// for (int i = 0;i<2;++i) {
// if(iterator.hasNext()) {
// context.write(key,iterator.next());
// }
// }
}
}
OrderDriver类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.atguigu.groupingcomparator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderComparator.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("f:\\input"));
FileOutputFormat.setOutputPath(job, new Path("f:\\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}