Apache HBase 集成MapReduce 读取一张表的数据写入另一张表
HBase当中的数据最终都是存储在HDFS上面的,HBase天生的支持MR的操作,我们可以通过MR直接处理HBase当中的数据,并且MR可以将处理后的结果直接存储到HBase当中去。
需求:读取HBase当中一张表的数据,然后将数据写入到HBase当中的另外一张表当中去。
注意:我们可以使用TableMapper与TableReducer来实现从HBase当中读取与写入数据。
将myuser这张表当中f1列族的name和age字段写入到myuser2这张表的f1列族当中去
第一步:创建myuser2这张表
注意:列族的名字要与myuser表的列族名字相同
hbase(main):010:0> create 'myuser2','f1'
hbase(main):016:0> list
TABLE
myuser
user
2 row(s) in 0.0060 seconds
=> ["myuser", "user"]
hbase(main):017:0> create 'myuser2','f1'
0 row(s) in 1.3370 seconds
=> Hbase::Table - myuser2
hbase(main):018:0> list
TABLE
myuser
myuser2
user
3 row(s) in 0.0060 seconds
=> ["myuser", "myuser2", "user"]
第二步:创建maven工程,导入jar包
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*/RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
第三步:开发MR的程序
public class HBaseMR extends Configured implements Tool{
public static class HBaseMapper extends TableMapper<Text,Put>{
/**
* @param key 我们的主键rowkey
* @param value 我们一行数据所有列的值都封装在value里面了
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.get();
String rowKey = Bytes.toString(bytes);
Put put = new Put(key.get());
Cell[] cells = value.rawCells();
for (Cell cell : cells) {
if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell);
}
if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell);
}
}
}
if(!put.isEmpty()){
context.write(new Text(rowKey),put);
}
}
}
public static class HBaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(null,value);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "hbaseMr");
job.setJarByClass(this.getClass());
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
//使用TableMapReduceUtil 工具类来初始化我们的mapper
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),scan,HBaseMapper.class,Text.class,Put.class,job);
//使用TableMapReduceUtil 工具类来初始化我们的reducer
TableMapReduceUtil.initTableReducerJob("myuser2",HBaseReducer.class,job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
//创建HBaseConfiguration配置
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new HBaseMR(), args);
System.exit(run);
}
}
第四步:打包运行
打包为hbase02.jar,然后执行
yarn jar hbase02.jar com.czxy.demo01.HBaseMR
执行完后,去hbase shell里看看结果,发现成功了
hbase(main):001:0> list
TABLE
myuser
myuser2
user
3 row(s) in 0.3430 seconds
=> ["myuser", "myuser2", "user"]
hbase(main):002:0> scan 'myuser2'
ROW COLUMN+CELL
0001 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1E
0001 column=f1:name, timestamp=1576629006005, value=\xE6\x9B\xB9\xE6
\x93\x8D
0002 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1E
0002 column=f1:name, timestamp=1576629006005, value=\xE6\x9B\xB9\xE6
\x93\x8D
0003 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00
0003 column=f1:name, timestamp=1576629006005, value=\xE5\x88\x98\xE5
\xA4\x87
0004 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00#
0004 column=f1:name, timestamp=1576629006005, value=\xE5\xAD\x99\xE6
\x9D\x83
0005 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1C
0005 column=f1:name, timestamp=1576629006005, value=\xE8\xAF\xB8\xE8
\x91\x9B\xE4\xBA\xAE
0006 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1B
0006 column=f1:name, timestamp=1576629006005, value=\xE5\x8F\xB8\xE9
\xA9\xAC\xE6\x87\xBF
0007 column=f1:age, timestamp=1576629006005, value=\x00\x00\x00\x1C
0007 column=f1:name, timestamp=1576629006005, value=xiaobubu\xE2\x80
\x94\xE5\x90\x95\xE5\xB8\x83
7 row(s) in 0.2840 seconds
hbase(main):003:0>
或者我们也可以自己设置我们的环境变量
(就是上传的jar包只需要把源代码打包,依赖的jar包使用环境变量的jar包)
export HADOOP_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0/
export HBASE_HOME=/export/servers/hbase-1.2.0-cdh5.14.0/
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp
yarn jar original-hbaseStudy-1.0-SNAPSHOT.jar com.czxy.demo01.HBaseMR