Hadoop MapReduce 数据排序实战
数据排序是数据处理流程中的基础环节,无论是学生成绩排名还是建立索引,往往都需要先对原始数据进行有序化处理。这个案例的逻辑与数据去重类似,都是通过对输入数据的初步清洗和重组,为后续操作铺平道路。
需求分析
我们需要处理一个包含多行数字的输入文件。每行代表一条数据,目标是在输出中生成两列:第一列是该数字在整体排序后的位次(Rank),第二列是原始数值本身。
假设输入包含三个文件,内容如下:
file1
2 32 654 32 15 756 65223
file2
5956 22 650 92
file3
26 54 6
期望的输出结果应体现全局排序后的序号与数值的对应关系:
1 2
2 6
3 15
...
14 65223
核心思路
MapReduce 框架本身具备强大的排序能力,默认会根据 Key 的值进行排序。如果我们把要排序的数字封装成 IntWritable 作为 Key 输出,框架就会自动按数字大小排列;如果是 Text 类型,则按字典序排列。
在这个场景中,我们采用 IntWritable 作为 Key。Mapper 阶段将读取到的数字转换为整数并写入 Context,Value 可以设为任意占位符(例如 1)。Reducer 接收到 <Key, ValueList> 后,利用 ValueList 的长度判断该数字出现的次数,同时维护一个全局计数器来生成位次。
需要注意的是,本示例未配置 Combiner。虽然 Combiner 能优化性能,但在这种简单的计数排序场景下,直接使用 Mapper 和 Reducer 足以完成任务,且逻辑更清晰。
代码实现
1. 升序排序
升序是 MapReduce 的默认行为,关键在于正确设置 Key 的类型。Mapper 负责解析文本,Reducer 负责分配序号。
package com.mk.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
java.io.IOException;
java.net.URI;
{
<LongWritable, Text, IntWritable, IntWritable> {
IOException, InterruptedException {
(Integer.parseInt(value.toString().trim()));
context.write(v, ());
}
}
<IntWritable, IntWritable, IntWritable, IntWritable> {
;
IOException, InterruptedException {
(IntWritable v : values) {
context.write( (count++), key);
}
}
}
Exception {
;
;
;
();
(System.getProperty().toLowerCase().contains()) {
conf.set(, );
}
FileSystem.get(URI.create(uri), conf);
(output);
fileSystem.delete(path, );
(conf, );
job.setJar();
job.setJarByClass(Sort.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPaths(job, uri + input);
FileOutputFormat.setOutputPath(job, (uri + output));
job.waitForCompletion();
System.out.println(job.getJobName() + + ret);
}
}

