Apache HBase 通过bulkload方式批量加载数据到HBase中

Apache HBase 通过bulkload方式批量加载数据到HBase中

在大量数据需要写入HBase时,通常有put方式和bulkLoad两种方式。

  • 1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息写入WAL,在写入到WAL后,数据就会被放到MemStore中,当MemStore满后数据就会被flush到磁盘(即形成HFile文件),在这种写操作过程会涉及到flush、split、compaction等操作,容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。
val put = new Put(rowKeyByts)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)
put.addColumn(cf, column, value)

table.put(put)
www.zeeklog.com  - Apache HBase 通过bulkload方式批量加载数据到HBase中


2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

www.zeeklog.com  - Apache HBase 通过bulkload方式批量加载数据到HBase中


上图的解释

  • Extract

异构数据源数据导入到 HDFS 之上。

  • Transform

通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。

  • Load

HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。

优点:
1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。
2、减少接口调用的消耗,是一种快速写入的优化方式。

Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase:
https://www.jianshu.com/p/b6c5a5ba30af

Bulkload过程主要包括三部分:

从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS

抽取数据到HDFS。和Hbase并没有关系,所以大家可以选用自己擅长的方式进行。

利用MapReduce作业处理事先准备的数据,生成HFile文件

这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce函数不需要我们考虑,由HBase提供。
该作业需要使用rowkey(行键)作为输出Key; KeyValue、Put或者Delete作为输出Value。
MapReduce作业需要使用HFileOutputFormat2来生成HBase数据文件。
为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的key值将输出分割开来。
HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。

告诉RegionServers数据的位置并导入数据
这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导入到相应的区域。

Bulkload

使用HBase的javaAPI或者使用sqoop将数据写入或者导入到HBase中,这些方式不是慢就是在导入的过程的占用Region资料导致效率低下。

而Bulkload方式通过MR的程序,将数据直接转换成HBase的最终存储格式HFile,然后直接load数据到HBase中即可。

bulkload优点:

1.导入过程不占用Region资源

2.能快速导入海量的数据

3.节省内存

HFile

HBase中每张Table在根目录(/HBase)下用一个文件夹存储,Table名为文件夹名,在Table文件夹下每个Region同样用一个文件夹存储,每个Region文件夹下的每个列族也用文件夹存储,而每个列族下存储的就是一些HFile文件,HFile就是HBase数据在HFDS下存储格式,所以HBase存储文件最终在hdfs上面的表现形式就是HFile,如果我们可以直接将数据转换为HFile的格式,那么我们的HBase就可以直接读取加载HFile格式的文件,就可以直接读取了

普通读写与bulkload方式对比

HBase数据正常读写流程

www.zeeklog.com  - Apache HBase 通过bulkload方式批量加载数据到HBase中

使用bulkload的方式将我们的数据直接生成HFile格式,然后直接加载到HBase的表当中去

www.zeeklog.com  - Apache HBase 通过bulkload方式批量加载数据到HBase中

JavaAPI举个例子

需求:将我们hdfs上面的这个路径/hbase/input/user.txt的数据文件,转换成HFile格式,然后load到myuser3这张表里面去
这个文件内容很简单,但是暂不给出,请结合下面的代码及运行结果分析理解。

创建myuser3表,并执行如下代码

create 'myuser3','f1'
package com.czxy.demo04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import javax.management.ImmutableDescriptor;
import java.io.IOException;

public class BulkLoadMR extends Configured implements Tool {

    //编写将hdfs数据转换成HFile文件的代码
    public static class createHFile extends Mapper<LongWritable, Text,ImmutableBytesWritable, Put>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String data = value.toString();
            String[] split = data.split("\t");
            String rowkey = split[0];
            String name = split[1];
            String age = split[2];
            //封装put
            Put put = new Put(rowkey.getBytes());

            put.addColumn("f1".getBytes(),"name".getBytes(),name.getBytes());
            put.addColumn("f1".getBytes(),"age".getBytes(),age.getBytes());
            context.write(new ImmutableBytesWritable(rowkey.getBytes()),put);


        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");

        Connection connection = ConnectionFactory.createConnection(conf);
        TableName tableName = TableName.valueOf("myuser3");
        Table myuser3 = connection.getTable(tableName);
        RegionLocator myuser3RegionLocator = connection.getRegionLocator(tableName);


        //实例一个job
        Job job = Job.getInstance(conf,"BulkLoad");
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:8020/hbase/input/user.txt"));

        job.setJarByClass(BulkLoadMR.class);
        job.setMapperClass(createHFile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job,myuser3,myuser3RegionLocator);
        HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop01:8020/tmp/output"));

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(new BulkLoadMR(),args);
    }


}

运行结果:生成了HFile文件

[root@hadoop01 ~]# hadoop fs -ls /tmp/output/f1
Found 1 items
-rw-r--r--   3 Administrator supergroup       1308 2019-12-19 16:52 /tmp/output/f1/b7be1f99900842ee88090bb30cec8175

最后移动HFile文件到对应region里
将我们的输出路径下面的HFile文件,加载到我们的hbase表当中去

package com.czxy.demo04;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class LoadDatas {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.property.clientPort","2181");
        conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03");

        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        Table myuser3 = connection.getTable(TableName.valueOf("myuser3"));

        LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
        load.doBulkLoad(new Path("hdfs://hadoop01:8020/tmp/output"),admin,myuser3,connection.getRegionLocator(TableName.valueOf("myuser3")));

    }
}

运行前:

hbase(main):001:0> scan 'myuser3'
ROW                   COLUMN+CELL                                               
0 row(s) in 0.4670 seconds

运行后:

hbase(main):002:0> scan 'myuser3'
ROW                   COLUMN+CELL                                               
 0007                 column=f1:age, timestamp=1576745579632, value=18          
 0007                 column=f1:name, timestamp=1576745579632, value=zhangsan   
 0008                 column=f1:age, timestamp=1576745579632, value=25          
 0008                 column=f1:name, timestamp=1576745579632, value=lisi       
 0009                 column=f1:age, timestamp=1576745579632, value=20          
 0009                 column=f1:name, timestamp=1576745579632, value=wangwu     
3 row(s) in 0.1310 seconds