Map端实现Join算法
Map端实现Join算法
数据
链接:https://pan.baidu.com/s/1DAXmP9cskwXB5uNSA0bugA
提取码:2zn8
order.txt
格式:
日期\t商品id\t下单日期
2019-11-11 200300500 201911110101
2019-11-11 200300505 201911110102
2019-11-11 200300550 201911110103
2019-11-11 200300500 201911110104
produce.txt
格式:
商品id\t商品名称
200300500 华为手机
200300505 电脑
200300550 小米手机
分析
map端实现join通常把小文件用缓存加载到map里,通过join的相同字段来匹配。
这里两份文本都有商品id,所以map的泛型为<string,string>左边商品id,右边商品名称(如果有多条信息,整行文本也许)。
每读取到一行大文本的数据,都获得它的商品id,再从map获取商品名称。
最后拼接输出即可,连reduce都不用了。
这里的大文件指order.txt,小文件指produce.txt,因为商品的属性比订单的属性少。
最终效果应该是这样的:
输出文件:part-r-00000
2019-11-11 200300500 201911110101 华为手机
2019-11-11 200300505 201911110102 电脑
2019-11-11 200300550 201911110103 小米手机
2019-11-11 200300500 201911110104 华为手机
代码
package com.czxy.demo01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//这玩意必须在job之前弄
DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/produce.txt"),super.getConf());
//实例化job
Job job = Job.getInstance(super.getConf());
//设置job能在集群上运行
job.setJarByClass(MapJoinDriver.class);
//设置输入的类型
job.setInputFormatClass(TextInputFormat.class);
//设置输入的路径
TextInputFormat.addInputPath(job,new Path("E:\\input\\mapjoin\\order.txt"));
//设置mapper的类
job.setMapperClass(MapMapper.class);
//设置mapper输出的key的类型
job.setMapOutputKeyClass(Text.class);
//设置mapper输出的value的类型
job.setMapOutputValueClass(NullWritable.class);
//map端join算法不需要用到reduce,所以不用设置reducer的类和输出类型
//设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出路径
TextOutputFormat.setOutputPath(job,new Path("E:\\output\\mapjoin\\"));
//判断是否运行成功,成功返回0,失败返回1
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception{
//运行代码
ToolRunner.run(new Configuration(),new MapJoinDriver(),args);
}
public static class MapMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
//装小文件数据的map
Map<String,String> map = new HashMap<>();
//节约内存,所以map中用到的line放这儿了。
String line = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//读取缓存文件列表
URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
//得到缓存文件的文件系统
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
//用缓存文件的指定文件系统,打开指定位置的文件
FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));
//用BufferReader可以实现逐行读取,它依赖于InputStreamReader
//InputStreamReader又依赖于FSDataInputStream
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
//死循环是为了读干净
while (true){
//读取一行
String str = bufferedReader.readLine();
//如果为空那就说明读取到尽头了,没数据了,那就退出
if (str == null){
break;
}
//如果没退出就执行到这里了
String[] split = str.split("\t");
//给map装载数据,左边商品id,右边商品名称
//如果商品属性大于2个,则右边装整行文本,在map中获取后分割
map.put(split[0],split[1]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取到大文件的一行
String[] split = value.toString().split("\t");
//读取并拼接数据
line = value.toString() +"\t"+ map.get(split[1]) ;
//把拼接好的数据写出到文件
//因为没有reduce,所以这里会直接写出到文件
context.write(new Text(line),NullWritable.get());
}
}
}
关键点分析
DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/produce.txt"),super.getConf());
1,分布式缓存读取文件,必须在实例化Job之前弄
public static void main(String[] args) throws Exception{
ToolRunner.run(new Configuration(),new MapJoinDriver(),args);
}
1,上面实例化job,DistributedCache都是用的super.getConf();
2,只需要在启动类的run方法中第一个参数增加一个参数为configuration。
URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
//这样直接从缓存拿,不会报错
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
1,运用FileSystem.get(cacheFiles[0], context.getConfiguration());来获取缓存中的文件。
2,再使用FileSystem.open()方法,从缓存读取,而不是直接从指定位置读取。
URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
注意这里的执行过程:
1,FileSystem.get()拿到缓存中的数据
2,FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0])); 这一步必须拿到缓存中数据后才能进行,否则它会读取目标路径文件系统上的文件
(这时,如果cacheFiles[0]是集群上的文件,就会报错FSwrong,所以务必先拿到缓存,再open)
3,InputStreamReader 依赖于 FSDataInputStream
4,BufferedReader 依赖于InputStreamReader
5,BufferedReader可以实现一行一行的读取,正是我们需要的方法(readline())