Apache HBase 集成MapReduce 读取一张表的数据写入另一张表

Apache HBase 集成MapReduce 读取一张表的数据写入另一张表

HBase当中的数据最终都是存储在HDFS上面的,HBase天生的支持MR的操作,我们可以通过MR直接处理HBase当中的数据,并且MR可以将处理后的结果直接存储到HBase当中去。

需求:读取HBase当中一张表的数据,然后将数据写入到HBase当中的另外一张表当中去。

注意:我们可以使用TableMapper与TableReducer来实现从HBase当中读取与写入数据。

将myuser这张表当中f1列族的name和age字段写入到myuser2这张表的f1列族当中去

第一步:创建myuser2这张表

注意:列族的名字要与myuser表的列族名字相同

hbase(main):010:0> create 'myuser2','f1'

hbase(main):016:0> list
TABLE                                                                                 
myuser                                                                                
user                                                                                  
2 row(s) in 0.0060 seconds

=> ["myuser", "user"]
hbase(main):017:0> create 'myuser2','f1'
0 row(s) in 1.3370 seconds

=> Hbase::Table - myuser2
hbase(main):018:0> list
TABLE                                                                                 
myuser                                                                                
myuser2                                                                               
user                                                                                  
3 row(s) in 0.0060 seconds

=> ["myuser", "myuser2", "user"]

第二步:创建maven工程,导入jar包

	<repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.14.3</version>
            <scope>test</scope>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*/RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

第三步:开发MR的程序

public class HBaseMR extends Configured implements Tool{


    public static class HBaseMapper extends  TableMapper<Text,Put>{
        /**
         * @param key  我们的主键rowkey
         * @param value  我们一行数据所有列的值都封装在value里面了
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            byte[] bytes = key.get();
            String rowKey = Bytes.toString(bytes);
            Put put = new Put(key.get());
            Cell[] cells = value.rawCells();
            for (Cell cell : cells) {
                if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                    if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                }
            }
           if(!put.isEmpty()){
                context.write(new Text(rowKey),put);
            }
        }
    }
    public static class HBaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{
        @Override
        protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            for (Put value : values) {
                context.write(null,value);
            }
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "hbaseMr");
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        //使用TableMapReduceUtil 工具类来初始化我们的mapper
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),scan,HBaseMapper.class,Text.class,Put.class,job);
        //使用TableMapReduceUtil 工具类来初始化我们的reducer
        TableMapReduceUtil.initTableReducerJob("myuser2",HBaseReducer.class,job);

        job.setNumReduceTasks(1);

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        //创建HBaseConfiguration配置
        Configuration configuration = HBaseConfiguration.create();
        int run = ToolRunner.run(configuration, new HBaseMR(), args);
        System.exit(run);

    }

}

第四步:打包运行

打包为hbase02.jar,然后执行

yarn jar hbase02.jar  com.czxy.demo01.HBaseMR

执行完后,去hbase shell里看看结果,发现成功了

hbase(main):001:0> list
TABLE                                                                                 
myuser                                                                                
myuser2                                                                               
user                                                                                  
3 row(s) in 0.3430 seconds

=> ["myuser", "myuser2", "user"]
hbase(main):002:0> scan 'myuser2'
ROW                    COLUMN+CELL                                                    
 0001                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1E 
 0001                  column=f1:name, timestamp=1576629006005, value=\xE6\x9B\xB9\xE6
                       \x93\x8D                                                       
 0002                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1E 
 0002                  column=f1:name, timestamp=1576629006005, value=\xE6\x9B\xB9\xE6
                       \x93\x8D                                                       
 0003                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00     
 0003                  column=f1:name, timestamp=1576629006005, value=\xE5\x88\x98\xE5
                       \xA4\x87                                                       
 0004                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00#    
 0004                  column=f1:name, timestamp=1576629006005, value=\xE5\xAD\x99\xE6
                       \x9D\x83                                                       
 0005                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1C 
 0005                  column=f1:name, timestamp=1576629006005, value=\xE8\xAF\xB8\xE8
                       \x91\x9B\xE4\xBA\xAE                                           
 0006                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1B 
 0006                  column=f1:name, timestamp=1576629006005, value=\xE5\x8F\xB8\xE9
                       \xA9\xAC\xE6\x87\xBF                                           
 0007                  column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1C 
 0007                  column=f1:name, timestamp=1576629006005, value=xiaobubu\xE2\x80
                       \x94\xE5\x90\x95\xE5\xB8\x83                                   
7 row(s) in 0.2840 seconds

hbase(main):003:0> 

或者我们也可以自己设置我们的环境变量
(就是上传的jar包只需要把源代码打包,依赖的jar包使用环境变量的jar包)

export HADOOP_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0/
export HBASE_HOME=/export/servers/hbase-1.2.0-cdh5.14.0/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp
yarn jar original-hbaseStudy-1.0-SNAPSHOT.jar  com.czxy.demo01.HBaseMR