MapReduce 自定义OutputFormat 实现多路径输出

MapReduce 自定义OutputFormat 实现多路径输出

先跑一遍代码,再看总结

自定义outputFormat

需求

现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评

分析

程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现

实现

实现要点:
1、在mapreduce中访问外部资源
2、自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()

数据准备

链接:https://pan.baidu.com/s/17F-QvPTiakaLa65fLpn6cQ
提取码:g96a

下载ordercomment.csv,内容长这样:

www.zeeklog.com  - MapReduce 自定义OutputFormat 实现多路径输出

Java代码

CustomFileOutputFormat

package com.czxy.day20191119.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class CustomFileOutputFormat  extends FileOutputFormat<Text,NullWritable>{
    /**
     *
     * 这个方法需要自定义一个RecordWriter,它俩结合,一个定义,一个判断写入,联合起来才能把这件事干好。
     * FileOutputFormat定义格式,RecordWriter写业务逻辑。
     *
     * @param taskAttemptContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //获取我们的configuration
        Configuration configuration = taskAttemptContext.getConfiguration();

        //
        FileSystem fileSystem = FileSystem.get(configuration);

        //好评写入的路径,这个是写好评用的流
        FSDataOutputStream fsDataOutputStreamGood = fileSystem.create(new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02\\out_good_comment01\\1.txt"));

        //差评写入的路径,这个是写差评用的流
        FSDataOutputStream fsDataOutputStreamBad = fileSystem.create(new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02\\out_bad_comment01\\1.txt"));

        //自定义RecordWriter,用这玩意才能写数据
        //也就是说,每次自定义一个FileOutputFormat,就得定义一个RecordWriter。
        CustomRecordWriter customRecordWriter = new CustomRecordWriter(fsDataOutputStreamGood, fsDataOutputStreamBad);


        return customRecordWriter;
    }






}

CustomMapper

package com.czxy.day20191119.demo02;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class CustomMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }



}

CustomRecordWriter

package com.czxy.day20191119.demo02;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class CustomRecordWriter extends RecordWriter<Text,NullWritable> {

    private FSDataOutputStream out1;
    private FSDataOutputStream out2;

    public CustomRecordWriter() {

    }

    /**
     * 在自定义FileOutputFormat类中,定义输出流,就可以定义输出流的路径。
     * 在那边指定路径,在RecordWriter类里执行写入。
     *
     * @param out1
     * @param out2
     */

    public CustomRecordWriter(FSDataOutputStream out1, FSDataOutputStream out2) {
        this.out1 = out1;
        this.out2 = out2;
    }

    //通过这个方法把文件往外写,决定了我们的文件写入到哪个路径下
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        //把在CustomFileOutputFormat里面定义的两个FSDataOutputStream传入到这里来,
        //在这里对我们的文件进行判断,如果类型是0,写入到一个文件中,如果评论类型是1和2写入到另一个文件中


        //0是好评 1是中评和差评
        if (key.toString().split("\t")[9].equals("0")){
            //好评
            out1.write(key.toString().getBytes());
            out1.write("\r\n".getBytes());
        }else {
            //中评和差评
            out2.write(key.toString().getBytes());
            out2.write("\r\n".getBytes());
        }


    }


    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if (null!=out1){
            IOUtils.closeQuietly(out1);
        }

        if (null!=out2){
            IOUtils.closeQuietly(out2);
        }
    }
}

Driver

package com.czxy.day20191119.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 这就是一个普通的Driver类,但是它没有指定Reduce
 * 而是指定了一个自定义的FileOutputFormat类,同时还自定义了RecordWriter类,
 * 它不在map端处理数据,而是把数据放到RecordWriter里面处理了。
 * RecordWriter使用了FileOutputFormat定义的两个输出流。
 * 它们是不可分割的整体。
 */
public class Driver extends Configured implements Tool{
    @Override
    public int run(String[] strings) throws Exception {

        Job job=Job.getInstance(super.getConf(),"CustomOutputFormatDriver");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("E:\\cache\\mapReduceTestCache\\20191119\\demo02"));

        job.setMapperClass(CustomMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(CustomFileOutputFormat.class);
        CustomFileOutputFormat.setOutputPath(job, new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo02_01"));

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        /**
         * ToolRunner有很多的类型
         * 又见到一种新的= =
         */
        int run = ToolRunner.run(new Configuration(), new Driver(), args);
        System.exit(run);
    }
}

运行结果

www.zeeklog.com  - MapReduce 自定义OutputFormat 实现多路径输出
www.zeeklog.com  - MapReduce 自定义OutputFormat 实现多路径输出


自定义FileOutputFormat实现了分类输出到不同路径。

总结

FileOutputFormat里边可以自定义多个输出流,每个输出流都指定一个路径。
这样就可以实现输出到多个路径。
而且,它自定义了RecordWriter,并且在里边实现了业务逻辑。
而在Mapper里一点代码都没有写,这说明,有些代码可以写到自定义的
RecordWriter里面,增加了代码的灵活性。

需要注意的是,两个输出流都是在FileOutputFormat里边定义并初始化的。
而在RecordWriter里边,直接就使用了。

RW里重写了write方法,从而实现了业务逻辑,这点需要记忆。
而在FileOutputFormat 里边,重写了RecordWriter方法,并且这个方法还返回一个RecordWriter。
这里自定义了RecordWriter,所以可以实现业务逻辑。
所以需要记忆的是FileOutputFormat要是自定义的话,通过重写RecordWriter就可以实现创建多个不同输出路径的输出流。

其它功能暂时没发现,先把这些记住。