Apache HBase 通过bulkload方式批量加载数据到HBase中
在大量数据需要写入HBase时,通常有put方式和bulkLoad两种方式。
- 1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息写入WAL,在写入到WAL后,数据就会被放到MemStore中,当MemStore满后数据就会被flush到磁盘(即形成HFile文件),在这种写操作过程会涉及到flush、split、compaction等操作,容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。
val put = new Put(rowKeyByts)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
table.put(put)
2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。
上图的解释
- Extract
异构数据源数据导入到 HDFS 之上。
- Transform
通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。
- Load
HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。
优点:
1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。
2、减少接口调用的消耗,是一种快速写入的优化方式。
Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase:
https://www.jianshu.com/p/b6c5a5ba30af
Bulkload过程主要包括三部分:
从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS
。
抽取数据到HDFS。和Hbase并没有关系,所以大家可以选用自己擅长的方式进行。
利用MapReduce作业处理事先准备的数据,生成HFile文件
。
这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。
该作业需要使用rowkey(行键)作为输出Key; KeyValue、Put或者Delete作为输出Value。
MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。
为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。
HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。
告诉RegionServers数据的位置并导入数据
。
这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。
Bulkload
使用HBase的javaAPI或者使用sqoop将数据写入或者导入到HBase中,这些方式不是慢就是在导入的过程的占用Region资料导致效率低下。
而Bulkload方式通过MR的程序,将数据直接转换成HBase的最终存储格式HFile,然后直接load数据到HBase中即可。
bulkload优点:
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
HFile
HBase中每张Table在根目录(/HBase)下用一个文件夹存储,Table名为文件夹名,在Table文件夹下每个Region同样用一个文件夹存储,每个Region文件夹下的每个列族也用文件夹存储,而每个列族下存储的就是一些HFile文件,HFile就是HBase数据在HFDS下存储格式,所以HBase存储文件最终在hdfs上面的表现形式就是HFile,如果我们可以直接将数据转换为HFile的格式,那么我们的HBase就可以直接读取加载HFile格式的文件,就可以直接读取了
普通读写与bulkload方式对比
HBase数据正常读写流程
使用bulkload的方式将我们的数据直接生成HFile格式,然后直接加载到HBase的表当中去
JavaAPI举个例子
需求:将我们hdfs上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser3这张表里面去
这个文件内容很简单,但是暂不给出,请结合下面的代码及运行结果分析理解。
创建myuser3表,并执行如下代码
create 'myuser3','f1'
package com.czxy.demo04;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import javax.management.ImmutableDescriptor;
import java.io.IOException;
public class BulkLoadMR extends Configured implements Tool {
//编写将hdfs数据转换成HFile文件的代码
public static class createHFile extends Mapper<LongWritable, Text,ImmutableBytesWritable, Put>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] split = data.split("\t");
String rowkey = split[0];
String name = split[1];
String age = split[2];
//封装put
Put put = new Put(rowkey.getBytes());
put.addColumn("f1".getBytes(),"name".getBytes(),name.getBytes());
put.addColumn("f1".getBytes(),"age".getBytes(),age.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()),put);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("myuser3");
Table myuser3 = connection.getTable(tableName);
RegionLocator myuser3RegionLocator = connection.getRegionLocator(tableName);
//实例一个job
Job job = Job.getInstance(conf,"BulkLoad");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:8020/hbase/input/user.txt"));
job.setJarByClass(BulkLoadMR.class);
job.setMapperClass(createHFile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job,myuser3,myuser3RegionLocator);
HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop01:8020/tmp/output"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception{
ToolRunner.run(new BulkLoadMR(),args);
}
}
运行结果:生成了HFile文件
[root@hadoop01 ~]# hadoop fs -ls /tmp/output/f1
Found 1 items
-rw-r--r-- 3 Administrator supergroup 1308 2019-12-19 16:52 /tmp/output/f1/b7be1f99900842ee88090bb30cec8175
最后移动HFile文件到对应region里
将我们的输出路径下面的HFile文件,加载到我们的hbase表当中去
package com.czxy.demo04;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
public class LoadDatas {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03");
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
Table myuser3 = connection.getTable(TableName.valueOf("myuser3"));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(new Path("hdfs://hadoop01:8020/tmp/output"),admin,myuser3,connection.getRegionLocator(TableName.valueOf("myuser3")));
}
}
运行前:
hbase(main):001:0> scan 'myuser3'
ROW COLUMN+CELL
0 row(s) in 0.4670 seconds
运行后:
hbase(main):002:0> scan 'myuser3'
ROW COLUMN+CELL
0007 column=f1:age, timestamp=1576745579632, value=18
0007 column=f1:name, timestamp=1576745579632, value=zhangsan
0008 column=f1:age, timestamp=1576745579632, value=25
0008 column=f1:name, timestamp=1576745579632, value=lisi
0009 column=f1:age, timestamp=1576745579632, value=20
0009 column=f1:name, timestamp=1576745579632, value=wangwu
3 row(s) in 0.1310 seconds