MapReduce 自定义InputFormat 实现小文件合并

MapReduce 自定义InputFormat 实现小文件合并

1 需求

无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案, 将多个小文件合并成一个文件 SequenceFile.SequenceFile 里面存储着多个文件。存储的形式为文件名称为 key,文件内容为 value。

2 分析

小文件的优化无非以下几种方式:
1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、在mapreduce处理时,可采用combineInputFormat提高效率

3 实现

本节实现的是上述第二种方式
程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件

准备数据

链接:https://pan.baidu.com/s/1qkpeuec7M826gBZOScZJlA
提取码:no2x

下载hello.txt和world.txt

hello.txt

helko
klwejlkwflwekjlfkwe
lwekfjwelwejfw
sldkfjsf
乐山大佛健康是福
lksdjfs
拉克丝京东方克里斯多夫

world.txt

lsdfjksldflsdkfjskldf
sldkfjskldf
lsdkfjsldfk
lskdjflskdf
lksdjfklsdjf
了深刻的房间里快速的减肥

Java代码

CustomFileInputFormat

package com.czxy.day20191119.demo01;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class CustomFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

   //直接返回文件不可切割,保证一个文件是一个完整的一行

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        CustomFileRecordReader customFileRecordReader = new CustomFileRecordReader();
        customFileRecordReader.initialize(split, context);
        return customFileRecordReader;
    }
}

CustomFileRecordReader

package com.czxy.day20191119.demo01;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class CustomFileRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit fileSplit;
    private Configuration configuration;
    private BytesWritable bytesWritable = new BytesWritable();
    private boolean processed = false;
    /**
     * 初始化的方法 带了两个参数,第一个inputSplit,封装了我们的数据都在这里面
     * context与我们mapreduce当中的context类似 ,上下文对象
     *
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.configuration = context.getConfiguration();
    }

    //往下读文件,如果返回true,表示已经读取过了
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            //获取文件切片的路径,可以通过filesystem读取这个文件,读成一个流
            Path path = fileSplit.getPath();
            //获取我们的FileSystem
            FileSystem fileSystem = null;
            //通过FileSystem来获取我们的文件输入流
            FSDataInputStream inputStream = null;
            try {
                fileSystem = FileSystem.get(configuration);
                inputStream = fileSystem.open(path);
                //初始化一个字节数组,长度为读取内容的大小
                byte[] contents = new byte[(int) fileSplit.getLength()];
                //将我们文件的输入流读取到byte[]当中
                IOUtils.readFully(inputStream, contents, 0, contents.length);
                //将我们字节数组中的内容全部转换到我们的BytesWritable当中进行封装
                bytesWritable.set(contents, 0, contents.length);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                fileSystem.close();
                if (null != inputStream) {
                    IOUtils.closeQuietly(inputStream);
                }
            }
            processed=true;
            return true;
        } else {
            return false;
        }
    }
    //获取当前返回的key
    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    //获取当前返回的value值
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

    //获取读取的进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
    }

    //关闭清理方法
    @Override
    public void close() throws IOException {
    }
}

CustomInputFormatDriver

package com.czxy.day20191119.demo01;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CustomInputFormatDriver extends Configured implements Tool{
    @Override
    public int run(String[] strings) throws Exception {

        Job job = Job.getInstance(super.getConf(), "CustomInputFormatDriver");

        job.setInputFormatClass(CustomFileInputFormat.class);
        CustomFileInputFormat.addInputPath(job,new Path("E:\\cache\\mapReduceTestCache\\20191119\\demo01"));

        job.setMapperClass(CustomMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo01_01"));

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new CustomInputFormatDriver(), args);
        System.exit(run);
    }
}

CustomMapper

package com.czxy.day20191119.demo01;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class CustomMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {

         //获取文件名,将我们的k2,v2 定义成Text文件名  BytesWritable 文件内容
         FileSplit inputSplit = (FileSplit) context.getInputSplit();
         String name = inputSplit.getPath().getName();
         context.write(new Text(name),value);
    }
}

运行结果:

成功合并成一个大文件了。

www.zeeklog.com  - MapReduce 自定义InputFormat 实现小文件合并

总结

这个FileInputFormat需要RecordReader
它们俩就像FileOutputFormat和RecordWirter一样
紧紧结合在一起,一起出现。

FileInputFormat的自定义类中,
重写initialize方法是很重要的,因为这里边有 InputSplit
这东西把你的数据都封装在它里面。而且还有context上下文对象,可以从这里获取configuration

所以弄俩全局变量,在initialize方法里给他俩放全局变量里
private FileSplit fileSplit;
private Configuration configuration;

然后在NextKeyValue 方法里进行业务逻辑的编写,

initialize
初始化方法比较重要,需要理解

nextKeyValue
这个一般是写业务逻辑的地方

getCurrentKey
返回当前的key ,感觉没啥用

getCurrentValue
获取当前的value,同样感觉没啥用

getProgress
获取读取的进度,这次没用上,估计以后也用不上,肯定还没有getCurrent的key,value实用一点

close
关闭并清理,估计是做一些收尾的工作。

再来看一下FileInputFormat的RecordReader方法都干了啥
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

    CustomFileRecordReader customFileRecordReader = new CustomFileRecordReader();
    customFileRecordReader.initialize(split, context);
    return customFileRecordReader;
}
你看看,它就初始化了一个自定义的RecordReader对象,
然后调用了一下初始化方法,
然后就把这个东西返回了,几乎啥都没做。

你需要记住的,就是FileInputFormat和RecordReader是分不开的一个整体,要用的时候肯定是一起用
在FileInputFormat里边定义,在RecordReader里边写业务逻辑。就是这样。