IDEA上T倒排索引案例

IIMapper1类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.atguigu.invertindex;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class IIMapper1 extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text k = new Text();
private IntWritable v = new IntWritable(1);

private String filename;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
filename = fs.getPath().getName();//获取文件名
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
k.set(word + "--" + filename);
context.write(k,v);
}

}
}

IIReducer1类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.atguigu.invertindex;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class IIReducer1 extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);

context.write(key,v);
}
}

IIMapper2类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.atguigu.invertindex;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;

public class IIMapper2 extends Mapper<LongWritable,Text,Text, Text> {
private Text k = new Text();
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("--");
k.set(split[0]);
String[] fields = split[1].split("\t");
v.set(fields[0] + "-->" + fields[1]);

context.write(k,v);
}
}

IIReducer2类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.atguigu.invertindex;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class IIReducer2 extends Reducer<Text,Text,Text, Text> {
private Text v = new Text();
private StringBuilder sb = new StringBuilder();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
sb.delete(0,sb.length());//sb清零
for (Text value : values) {
sb.append(value.toString()).append(" ");
}
v.set(sb.toString());

context.write(key,v);

}
}

IIDriver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.atguigu.invertindex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class IIDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job1 = Job.getInstance(new Configuration());
job1.setJarByClass(IIDriver.class);
job1.setMapperClass(IIMapper1.class);
job1.setReducerClass(IIReducer1.class);

job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job1,new Path("F:/input"));
FileOutputFormat.setOutputPath(job1,new Path("F:/output"));
boolean b = job1.waitForCompletion(true);
if (b) {
Job job2 = Job.getInstance(new Configuration());
job2.setJarByClass(IIDriver.class);
job2.setMapperClass(IIMapper2.class);
job2.setReducerClass(IIReducer2.class);

job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job2, new Path("F:/output"));
FileOutputFormat.setOutputPath(job2, new Path("F:/output1"));
boolean b2 = job2.waitForCompletion(true);
System.exit(b2 ? 0 : 1);
}
}
}