IDEA自定义OutputFormat案例实操

需求:
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

需求分析:
1.创建类FileRecordWriter继承RecordWriter
2.创建两个输出流,分别对应两个输出文件

MyRecordWriter类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.atguigu.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.FileNotFoundException;
import java.io.IOException;


public class MyRecordWriter extends RecordWriter<LongWritable, Text> {
private FSDataOutputStream atguigu;
private FSDataOutputStream other;

/**
* 利用框架信息确定输出文件具体存放位置
* @param taskAttemptContext job框架的配置信息context
* @throws IOException
*/
public void initialize(TaskAttemptContext taskAttemptContext) throws IOException {
String outdir = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR);//固定写法,获取框架的输出文件夹
FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());//固定写法,获取文件系统
atguigu = fileSystem.create(new Path(outdir + "/atguigu.log"));
other = fileSystem.create(new Path(outdir + "/other.log"));
}

/**
* 将KV写出,每对KV调用一次
* @param longWritable
* @param text
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException {
String out = text.toString() + "\n";
if(out.contains("atguigu")){
atguigu.write(out.getBytes());
}else{
other.write(out.getBytes());
}
}

/**
* 关闭资源
* @param taskAttemptContext
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(atguigu);
IOUtils.closeStream(other);
}
}

MyOutputFormat类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.atguigu.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyOutputFormat extends FileOutputFormat<LongWritable, Text> {
@Override
public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
MyRecordWriter myRecordWriter = new MyRecordWriter();
myRecordWriter.initialize(taskAttemptContext);//将context框架信息传递给输出流
return myRecordWriter;
}
}

OutputDriver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.atguigu.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());

job.setJarByClass(OutputDriver.class);

job.setOutputFormatClass(MyOutputFormat.class);
FileInputFormat.setInputPaths(job,new Path("F:\\input"));
FileOutputFormat.setOutputPath(job,new Path("F:\\output"));

boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}