Map端实现Join算法

Map端实现Join算法

Map端实现Join算法

数据

链接:https://pan.baidu.com/s/1DAXmP9cskwXB5uNSA0bugA
提取码:2zn8

order.txt
格式:
日期\t商品id\t下单日期

2019-11-11	200300500	201911110101
2019-11-11	200300505	201911110102
2019-11-11	200300550	201911110103
2019-11-11	200300500	201911110104

produce.txt
格式:
商品id\t商品名称

200300500	华为手机
200300505	电脑
200300550	小米手机

分析

map端实现join通常把小文件用缓存加载到map里,通过join的相同字段来匹配。
这里两份文本都有商品id,所以map的泛型为<string,string>左边商品id,右边商品名称(如果有多条信息,整行文本也许)。
每读取到一行大文本的数据,都获得它的商品id,再从map获取商品名称。
最后拼接输出即可,连reduce都不用了。

这里的大文件指order.txt,小文件指produce.txt,因为商品的属性比订单的属性少

最终效果应该是这样的:
输出文件:part-r-00000

2019-11-11	200300500	201911110101 华为手机
2019-11-11	200300505	201911110102 电脑
2019-11-11	200300550	201911110103 小米手机
2019-11-11	200300500	201911110104 华为手机

代码

package com.czxy.demo01;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MapJoinDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //这玩意必须在job之前弄
        DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/produce.txt"),super.getConf());
		//实例化job
        Job job = Job.getInstance(super.getConf());
		//设置job能在集群上运行
        job.setJarByClass(MapJoinDriver.class);
		//设置输入的类型
        job.setInputFormatClass(TextInputFormat.class);
        //设置输入的路径
        TextInputFormat.addInputPath(job,new Path("E:\\input\\mapjoin\\order.txt"));
		//设置mapper的类
        job.setMapperClass(MapMapper.class);
        //设置mapper输出的key的类型
        job.setMapOutputKeyClass(Text.class);
        //设置mapper输出的value的类型
        job.setMapOutputValueClass(NullWritable.class);
        
		//map端join算法不需要用到reduce,所以不用设置reducer的类和输出类型
		
		//设置输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出路径
        TextOutputFormat.setOutputPath(job,new Path("E:\\output\\mapjoin\\"));
		
		//判断是否运行成功,成功返回0,失败返回1
        boolean b = job.waitForCompletion(true);
        return b?0:1;

    }
    public static void main(String[] args) throws Exception{
    	//运行代码
        ToolRunner.run(new Configuration(),new MapJoinDriver(),args);
    }




    public static class MapMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
		//装小文件数据的map
        Map<String,String> map = new HashMap<>();
        //节约内存,所以map中用到的line放这儿了。
      	String line = null;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
			//读取缓存文件列表
            URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
        	//得到缓存文件的文件系统
            FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
           	//用缓存文件的指定文件系统,打开指定位置的文件
            FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));
			//用BufferReader可以实现逐行读取,它依赖于InputStreamReader
			//InputStreamReader又依赖于FSDataInputStream
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));

			//死循环是为了读干净
            while (true){
            	//读取一行
                String str = bufferedReader.readLine();
                //如果为空那就说明读取到尽头了,没数据了,那就退出
                if (str == null){
                    break;
                }
				//如果没退出就执行到这里了
                String[] split = str.split("\t");
                //给map装载数据,左边商品id,右边商品名称
                //如果商品属性大于2个,则右边装整行文本,在map中获取后分割
                map.put(split[0],split[1]);

            }



        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        	//读取到大文件的一行
            String[] split = value.toString().split("\t");
			//读取并拼接数据
            line = value.toString() +"\t"+ map.get(split[1]) ;
			//把拼接好的数据写出到文件
			//因为没有reduce,所以这里会直接写出到文件
            context.write(new Text(line),NullWritable.get());

        }
    }

}

关键点分析

DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/produce.txt"),super.getConf());

1,分布式缓存读取文件,必须在实例化Job之前弄

 public static void main(String[] args) throws Exception{
        ToolRunner.run(new Configuration(),new MapJoinDriver(),args);
    }

1,上面实例化job,DistributedCache都是用的super.getConf();
2,只需要在启动类的run方法中第一个参数增加一个参数为configuration。

URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
//这样直接从缓存拿,不会报错
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());

1,运用FileSystem.get(cacheFiles[0], context.getConfiguration());来获取缓存中的文件。
2,再使用FileSystem.open()方法,从缓存读取,而不是直接从指定位置读取。

URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
        
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
           
FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));

注意这里的执行过程:
1,FileSystem.get()拿到缓存中的数据
2,FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0])); 这一步必须拿到缓存中数据后才能进行,否则它会读取目标路径文件系统上的文件
(这时,如果cacheFiles[0]是集群上的文件,就会报错FSwrong,所以务必先拿到缓存,再open)
3,InputStreamReader 依赖于 FSDataInputStream
4,BufferedReader 依赖于InputStreamReader
5,BufferedReader可以实现一行一行的读取,正是我们需要的方法(readline())