MapReduce 自定义InputFormat 实现小文件合并
1 需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案, 将多个小文件合并成一个文件 SequenceFile.SequenceFile 里面存储着多个文件。存储的形式为文件名称为 key,文件内容为 value。
2 分析
小文件的优化无非以下几种方式:
1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
3、在mapreduce处理时,可采用combineInputFormat提高效率
3 实现
本节实现的是上述第二种方式
程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
准备数据
链接:https://pan.baidu.com/s/1qkpeuec7M826gBZOScZJlA
提取码:no2x
下载hello.txt和world.txt
hello.txt
helko
klwejlkwflwekjlfkwe
lwekfjwelwejfw
sldkfjsf
乐山大佛健康是福
lksdjfs
拉克丝京东方克里斯多夫
world.txt
lsdfjksldflsdkfjskldf
sldkfjskldf
lsdkfjsldfk
lskdjflskdf
lksdjfklsdjf
了深刻的房间里快速的减肥
Java代码
CustomFileInputFormat
package com.czxy.day20191119.demo01;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class CustomFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
//直接返回文件不可切割,保证一个文件是一个完整的一行
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
CustomFileRecordReader customFileRecordReader = new CustomFileRecordReader();
customFileRecordReader.initialize(split, context);
return customFileRecordReader;
}
}
CustomFileRecordReader
package com.czxy.day20191119.demo01;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class CustomFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable bytesWritable = new BytesWritable();
private boolean processed = false;
/**
* 初始化的方法 带了两个参数,第一个inputSplit,封装了我们的数据都在这里面
* context与我们mapreduce当中的context类似 ,上下文对象
*
* @param split
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.configuration = context.getConfiguration();
}
//往下读文件,如果返回true,表示已经读取过了
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
//获取文件切片的路径,可以通过filesystem读取这个文件,读成一个流
Path path = fileSplit.getPath();
//获取我们的FileSystem
FileSystem fileSystem = null;
//通过FileSystem来获取我们的文件输入流
FSDataInputStream inputStream = null;
try {
fileSystem = FileSystem.get(configuration);
inputStream = fileSystem.open(path);
//初始化一个字节数组,长度为读取内容的大小
byte[] contents = new byte[(int) fileSplit.getLength()];
//将我们文件的输入流读取到byte[]当中
IOUtils.readFully(inputStream, contents, 0, contents.length);
//将我们字节数组中的内容全部转换到我们的BytesWritable当中进行封装
bytesWritable.set(contents, 0, contents.length);
} catch (IOException e) {
e.printStackTrace();
} finally {
fileSystem.close();
if (null != inputStream) {
IOUtils.closeQuietly(inputStream);
}
}
processed=true;
return true;
} else {
return false;
}
}
//获取当前返回的key
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
//获取当前返回的value值
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
//获取读取的进度
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
//关闭清理方法
@Override
public void close() throws IOException {
}
}
CustomInputFormatDriver
package com.czxy.day20191119.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CustomInputFormatDriver extends Configured implements Tool{
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "CustomInputFormatDriver");
job.setInputFormatClass(CustomFileInputFormat.class);
CustomFileInputFormat.addInputPath(job,new Path("E:\\cache\\mapReduceTestCache\\20191119\\demo01"));
job.setMapperClass(CustomMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job,new Path("E:\\cache\\mapReduceResultCache\\20191119\\demo01_01"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new CustomInputFormatDriver(), args);
System.exit(run);
}
}
CustomMapper
package com.czxy.day20191119.demo01;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class CustomMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
//获取文件名,将我们的k2,v2 定义成Text文件名 BytesWritable 文件内容
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
context.write(new Text(name),value);
}
}
运行结果:
成功合并成一个大文件了。
总结
这个FileInputFormat需要RecordReader
它们俩就像FileOutputFormat和RecordWirter一样
紧紧结合在一起,一起出现。
FileInputFormat的自定义类中,
重写initialize方法是很重要的,因为这里边有 InputSplit
这东西把你的数据都封装在它里面。而且还有context上下文对象,可以从这里获取configuration
所以弄俩全局变量,在initialize方法里给他俩放全局变量里
private FileSplit fileSplit;
private Configuration configuration;
然后在NextKeyValue 方法里进行业务逻辑的编写,
initialize
初始化方法比较重要,需要理解
nextKeyValue
这个一般是写业务逻辑的地方
getCurrentKey
返回当前的key ,感觉没啥用
getCurrentValue
获取当前的value,同样感觉没啥用
getProgress
获取读取的进度,这次没用上,估计以后也用不上,肯定还没有getCurrent的key,value实用一点
close
关闭并清理,估计是做一些收尾的工作。
再来看一下FileInputFormat的RecordReader方法都干了啥
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
CustomFileRecordReader customFileRecordReader = new CustomFileRecordReader();
customFileRecordReader.initialize(split, context);
return customFileRecordReader;
}
你看看,它就初始化了一个自定义的RecordReader对象,
然后调用了一下初始化方法,
然后就把这个东西返回了,几乎啥都没做。
你需要记住的,就是FileInputFormat和RecordReader是分不开的一个整体,要用的时候肯定是一起用
在FileInputFormat里边定义,在RecordReader里边写业务逻辑。就是这样。