reduce端 join 与map端 join 算法实现
1、reduce端join算法实现
1、需求:
订单数据表t_order:
id | date | pid | amount |
---|---|---|---|
1001 | 20150710 | P0001 | 2 |
1002 | 20150710 | P0001 | 3 |
1002 | 20150710 | P0002 | 3 |
商品信息表t_product:
id | pname | category_id | price |
---|---|---|---|
P0001 | 小米5 | 1000 | 2000 |
P0002 | 锤子T1 | 1000 | 3000 |
假如数据量巨大,两表的数据是以文件的形式存储在HDFS中
需要用mapreduce程序来实现一下SQL查询运算:
select a.id,a.date,b.name,b.category_id,b.price
from t_order a join t_product b on a.pid = b.id
2、实现机制:
通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联
第一步:定义OrderBean
package com.czxy.demo07;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderJoinBean implements Writable {
private String id;
private String date;
private String pid;
private String amount;
private String name;
private String categoryId;
private String price;
@Override
public String toString() {
return id+"\t"+date+"\t"+pid+"\t"+amount+"\t"+name+"\t"+categoryId+"\t"+price;
}
public OrderJoinBean() {
}
public OrderJoinBean(String id, String date, String pid, String amount, String name, String categoryId, String price) {
this.id = id;
this.date = date;
this.pid = pid;
this.amount = amount;
this.name = name;
this.categoryId = categoryId;
this.price = price;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public String getAmount() {
return amount;
}
public void setAmount(String amount) {
this.amount = amount;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategoryId() {
return categoryId;
}
public void setCategoryId(String categoryId) {
this.categoryId = categoryId;
}
public String getPrice() {
return price;
}
public void setPrice(String price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id+"");
out.writeUTF(date+"");
out.writeUTF(pid+"");
out.writeUTF(amount+"");
out.writeUTF(name+"");
out.writeUTF(categoryId+"");
out.writeUTF(price+"");
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.date = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readUTF();
this.name = in.readUTF();
this.categoryId = in.readUTF();
this.price = in.readUTF();
}
}
第二步:定义map类
package com.czxy.demo07;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class OrderJoinMap extends Mapper<LongWritable,Text, Text,OrderJoinBean> {
private OrderJoinBean orderJoinBean = new OrderJoinBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//通过获取文件名来区分两个不同的文件
String[] split = value.toString().split(",");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
System.out.println("获取文件名为"+name);
if(name.contains("orders")){
//订单数据
orderJoinBean.setId(split[0]);
orderJoinBean.setDate(split[1]);
orderJoinBean.setPid(split[2]);
orderJoinBean.setAmount(split[3]);
context.write(new Text(split[2]),orderJoinBean);
}else{
//商品数据
orderJoinBean.setName(split[1]);
orderJoinBean.setCategoryId(split[2]);
orderJoinBean.setPrice(split[3]);
context.write(new Text(split[0]),orderJoinBean);
}
}
}
第三步:自定义reduce类
package com.czxy.demo07;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderJoinReduce extends Reducer<Text,OrderJoinBean,OrderJoinBean, NullWritable> {
private OrderJoinBean orderJoinBean;
@Override
protected void reduce(Text key, Iterable<OrderJoinBean> values, Context context) throws IOException, InterruptedException {
orderJoinBean = new OrderJoinBean();
for (OrderJoinBean value : values) {
System.out.println(value.getId());
//相同的key的对象都发送到了这里,在这里将数据拼接完整
if(null !=value.getId() && !value.getId().equals("null") ){
orderJoinBean.setId(value.getId());
orderJoinBean.setDate(value.getDate());
orderJoinBean.setPid(value.getPid());
orderJoinBean.setAmount(value.getAmount());
}else{
orderJoinBean.setName(value.getName());
orderJoinBean.setCategoryId(value.getCategoryId());
orderJoinBean.setPrice(value.getPrice());
}
}
context.write(orderJoinBean,NullWritable.get());
}
}
第四步:开发main方法入口
package com.czxy.demo07;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class OrderJoinMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), OrderJoinMain.class.getSimpleName());
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));
job.setMapperClass(OrderJoinMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderJoinBean.class);
job.setReducerClass(OrderJoinReduce.class);
job.setOutputKeyClass(OrderJoinBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new OrderJoinMain(),args);
}
}
缺点:这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端join实现方式
2、 map端join算法实现
1、原理阐述
适用于关联表中有小表的情形;
可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度
2、实现示例
先在mapper类中预先定义好小表,进行join
引入实际场景中的解决方案:一次加载数据库
第一步:定义mapJoin
package com.czxy.demo08;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class JoinMap extends Mapper<LongWritable, Text,Text,Text> {
HashMap<String,String> b_tab = new HashMap<String, String>();
String line = null;
/*
map端的初始化方法当中获取缓存文件,一次性加载到map当中来
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
//这种方式获取所有的缓存文件
// URI[] cacheFiles1 = DistributedCache.getCacheFiles(context.getConfiguration());
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());
FSDataInputStream open = fileSystem.open(new Path(cacheFiles[0]));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
while ((line = bufferedReader.readLine())!=null){
String[] split = line.split(",");
b_tab.put(split[0],split[1]+"\t"+split[2]+"\t"+split[3]);
}
fileSystem.close();
IOUtils.closeStream(bufferedReader);
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//这里读的是这个map task所负责的那一个切片数据(在hdfs上)
String[] fields = value.toString().split(",");
String orderId = fields[0];
String date = fields[1];
String pdId = fields[2];
String amount = fields[3];
//获取map当中的商品详细信息
String productInfo = b_tab.get(pdId);
context.write(new Text(orderId), new Text(date + "\t" + productInfo+"\t"+amount));
}
}
第二步:定义程序运行main方法
package com.czxy.demo08;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class MapSideJoin extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
//注意,这里的缓存文件的添加,只能将缓存文件放到hdfs文件系统当中,放到本地加载不到
DistributedCache.addCacheFile(new URI("hdfs://192.168.100.201:8020/cachefile/pdts.txt"),conf);
Job job = Job.getInstance(conf, MapSideJoin.class.getSimpleName());
job.setJarByClass(MapSideJoin.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///E:\\cache\\mapReduceTestCache\\Test.txt"));
job.setMapperClass(JoinMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\cache\\mapReduceResultCache\\TestResult01")) ;
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
ToolRunner.run(configuration,new MapSideJoin(),args);
}
}