MapReduce 自定义OutputFormat 实现多路径输出
先跑一遍代码,再看总结
自定义outputFormat
需求
现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现
实现
实现要点:
1、在mapreduce中访问外部资源
2、自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
数据准备
链接:https://pan.baidu.com/s/17F-QvPTiakaLa65fLpn6cQ
提取码:g96a
下载ordercomment.csv,内容长这样:
Java代码
CustomFileOutputFormat
package com.czxy.day20191119.demo02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CustomFileOutputFormat extends FileOutputFormat<Text,NullWritable>{
/**
*
* 这个方法需要自定义一个RecordWriter,它俩结合,一个定义,一个判断写入,联合起来才能把这件事干好。
* FileOutputFormat定义格式,RecordWriter写业务逻辑。
*
* @param taskAttemptContext
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//获取我们的configuration
Configuration configuration = taskAttemptContext.getConfiguration();
//
FileSystem fileSystem = FileSystem.get(configuration);
//好评写入的路径,这个是写好评用的流
FSDataOutputStream fsDataOutputStreamGood = fileSystem.create(new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02\\out_good_comment01\\1.txt"));
//差评写入的路径,这个是写差评用的流
FSDataOutputStream fsDataOutputStreamBad = fileSystem.create(new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02\\out_bad_comment01\\1.txt"));
//自定义RecordWriter,用这玩意才能写数据
//也就是说,每次自定义一个FileOutputFormat,就得定义一个RecordWriter。
CustomRecordWriter customRecordWriter = new CustomRecordWriter(fsDataOutputStreamGood, fsDataOutputStreamBad);
return customRecordWriter;
}
}
CustomMapper
package com.czxy.day20191119.demo02;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CustomMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
CustomRecordWriter
package com.czxy.day20191119.demo02;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class CustomRecordWriter extends RecordWriter<Text,NullWritable> {
private FSDataOutputStream out1;
private FSDataOutputStream out2;
public CustomRecordWriter() {
}
/**
* 在自定义FileOutputFormat类中,定义输出流,就可以定义输出流的路径。
* 在那边指定路径,在RecordWriter类里执行写入。
*
* @param out1
* @param out2
*/
public CustomRecordWriter(FSDataOutputStream out1, FSDataOutputStream out2) {
this.out1 = out1;
this.out2 = out2;
}
//通过这个方法把文件往外写,决定了我们的文件写入到哪个路径下
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
//把在CustomFileOutputFormat里面定义的两个FSDataOutputStream传入到这里来,
//在这里对我们的文件进行判断,如果类型是0,写入到一个文件中,如果评论类型是1和2写入到另一个文件中
//0是好评 1是中评和差评
if (key.toString().split("\t")[9].equals("0")){
//好评
out1.write(key.toString().getBytes());
out1.write("\r\n".getBytes());
}else {
//中评和差评
out2.write(key.toString().getBytes());
out2.write("\r\n".getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (null!=out1){
IOUtils.closeQuietly(out1);
}
if (null!=out2){
IOUtils.closeQuietly(out2);
}
}
}
Driver
package com.czxy.day20191119.demo02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 这就是一个普通的Driver类,但是它没有指定Reduce
* 而是指定了一个自定义的FileOutputFormat类,同时还自定义了RecordWriter类,
* 它不在map端处理数据,而是把数据放到RecordWriter里面处理了。
* RecordWriter使用了FileOutputFormat定义的两个输出流。
* 它们是不可分割的整体。
*/
public class Driver extends Configured implements Tool{
@Override
public int run(String[] strings) throws Exception {
Job job=Job.getInstance(super.getConf(),"CustomOutputFormatDriver");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("E:\\cache\\mapReduceTestCache\\20191119\\demo02"));
job.setMapperClass(CustomMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(CustomFileOutputFormat.class);
CustomFileOutputFormat.setOutputPath(job, new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02_01"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
/**
* ToolRunner有很多的类型
* 又见到一种新的= =
*/
int run = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(run);
}
}
运行结果
自定义FileOutputFormat实现了分类输出到不同路径。
总结
FileOutputFormat里边可以自定义多个输出流,每个输出流都指定一个路径。
这样就可以实现输出到多个路径。
而且,它自定义了RecordWriter,并且在里边实现了业务逻辑。
而在Mapper里一点代码都没有写,这说明,有些代码可以写到自定义的
RecordWriter里面,增加了代码的灵活性。
需要注意的是,两个输出流都是在FileOutputFormat里边定义并初始化的。
而在RecordWriter里边,直接就使用了。
RW里重写了write方法,从而实现了业务逻辑,这点需要记忆。
而在FileOutputFormat 里边,重写了RecordWriter方法,并且这个方法还返回一个RecordWriter。
这里自定义了RecordWriter,所以可以实现业务逻辑。
所以需要记忆的是FileOutputFormat要是自定义的话,通过重写RecordWriter就可以实现创建多个不同输出路径的输出流。
其它功能暂时没发现,先把这些记住。