一种Hudi on Flink动态同步元数据变化的方法

一种Hudi on Flink动态同步元数据变化的方法

一、背景

一个需求,需要同步MySQL数据到Hive,包括DDL与DML,所以需要动态同步元数据变化。

二、官方Schema Evolution例子

从Hudi官方文档Schema Evolution(https://hudi.apache.org/docs/next/schema_evolution)可知通过Hudi可实现源端添加列、int到long列类型转换等DDL操作同步到目标端,且该文档提供了一个Spark+Hudi写数据的例子,先定义一个Schema,写入了3条数据;然后定义newSchema,比schema多一个newField字段,且intToLong字段类型由Integer变为了Long,又upsert了3条数据到同一个Hudi表中。最终查询出该Hudi表的结构与newSchema一致,即使用新的schema写数据,实现了元数据的更新。

由于多种原因(省略一万字)选择了Flink+Hudi,且为了实现一些逻辑更自由,选择了DataStream API而不是Flink SQL,在群里从大佬处了解到Hudi中使用DataStream API操作的类org.apache.hudi.streamer.HoodieFlinkStreamer。通过两次启动任务传入的修改后的Avro Schema,也能实现官方文档Schema Evolution中例子类似的功能。

但是按这种方式,只能通过重新定义schema并重启Flink任务,才能将源表新增的列同步到目标Hive表中,无法在启动任务后自动同步schema中未定义的源表中新增的列。所以需要对HoodieFlinkStreamer的功能进行改进。先说方案,经过对HoodieFlinkStreamer分析,其中用到的一些主要Function(Source、Map、Sink等)都是在最初定义时传入参数配置类,要么在构造方法中、要么在open方法中,根据配置(包含schema)生成deserialer、converter、writeClient等对数据进行反序列化、转换、保存的实例,由于是根据最初schema生成的实例,即使数据中有新增的字段,转换后新增的字段也没有保留。所以采用的方式是,在各个Function处理数据的方法中,判断如果数据中的schema与当前Function中的schema不一致(一种简单的优化),就使用数据中的schema重新生成这些deserialer、converter、writeClient,这样数据经过处理后,就有新增的字段。方法很简单,主要是需要了解一下HoodieFlinkStreamer的处理流程。

四、HoodieFlinkStreamer流程浅析及扩展方法

调试环境:可先在Idea中建个Flink Demo/QuickStart项目,导入Hudi的hudi-flink-bundle模块、及对应的Hadoop、Hive相关依赖。先在Idea中操作方便调试、以及分析依赖冲突等问题。当前使用分支release-0.10.0,Hive版本2.1.1-cdh6.3.0,hadoop版本3.0.0-cdh6.3.0。

扩展可将流程中关键的类(及必须依赖的类)从Hudi源码拷贝出来(或集成,视各类的依赖关系而定),修改相应逻辑,再使用修改后的类作为处理函数

经过亿些分析及调试后,梳理出HoodieFlinkStreamer的大致流程如下图。

www.zeeklog.com  - 一种Hudi on Flink动态同步元数据变化的方法

4.1 FlinkKafkaConsumer

  • Function功能说明

数据来源,从Kafka中读取数据,HoodieFlinkStreamer类中,指定的反序列化类为org.apache.flink.formats.json.JsonRowDataDeserializationSchema,将Json数据转换为org.apache.flink.table.data.RowData

  • 处理逻辑扩展

对source函数扩展,主要就是修改反序列化类,数据可选择Debezium发送到Kafka中的带Schema格式的Json数据,反序列化类使用org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema并修改部分逻辑,在DebeziumJsonDeserializationSchemadeserialize(byte[] message, Collector<RowData> out)方法中,先通过message获取到数据中的schema,再参考构造方法重新生成成员变量this.jsonDeserializer、this.metadataConverters。

  • 输出扩展

DebeziumJsonDeserializationSchema本身输出的是实现了RowData接口的org.apache.flink.table.data.GenericRowData(逻辑在emitRow方法中),改为通过复制或继承GenericRowData定义的SchemaWithRowData,添加一个字符串成员变量保存当前数据的schema,以便下游的函数能根据schema重新生成数据处理实例。

4.2 RowDataToHoodieFunction

  • Function功能说明

将DataStream转换为后面HudiAPI操作需要的DataStream。

  • 处理逻辑扩展

O map(I i)方法中,首先根据上游发来的SchemaWithRowData中的schema,参考open方法重新生成this.converter等成员变量。

RowDataToHoodieFunction类中有一个org.apache.flink.configuration.Configuration类型的成员变量config,保存了任务配置的参数,后面流程中函数基本都有这个成员变量,且很多函数也从该配置中读取schema信息,所以在更新schema时,可以首先设置this.config.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema);(任务启动通过--source-avro-schema传入参数,所以schema存在config的FlinkOptions.SOURCE_AVRO_SCHEMA这个key中)。后续不再赘述。

  • 输出扩展

与前面类似,为了让下游函数获取到schema,在toHoodieRecord方法中,修改返回值为继承了HoodieRecord类的带有schema信息的自定义类SchemaWithHoodieRecord

4.3 StreamWriteFunction

(其实前面还有个BucketAssignerFunction,看起来没有直接修改或转换当前从流中接收到的数据的各字段值,只是设置了location。也添加了更新schema逻辑,重新生成了bucketAssigner成员变量。)

  • Function功能说明

将流中的数据写入HDFS。数据缓存在this.buckets中,由bufferRecord方法的注释可知,缓存的记录数大于FlinkOptions.WRITE_BATCH_SIZE配置的值、或缓冲区大小大于FlinkOptions.WRITE_TASK_MAX_SIZE时,调用flushBucket将缓存的数据写入文件。

在每次checkpoint时,snapshotState方法也会调用flushRemaining方法将缓存的记录写入文件。

  • 处理逻辑扩展

仍然在processElement方法中,首先通过接收到的SchemaWithHoodieRecord中的schema信息,更新this.writeClient,先关闭再重新生成。

  • 输出扩展

Hudi官方代码中,只在processElement中调用了bufferRecord(所以图中画的虚线)。为了让下游的compact和clean函数接收到新的schema,可直接转发:out.collect(value);(可优化,比如只传schema)

改到当前Function为止,数据已经能写入文件(MOR表的log文件、COW表的parquet文件),但是在Hive中查询不出来

结合StreamWriteFunction类的注释,及一些日志,及一些调试分析。了解到数据写入文件后,会通知StreamWriteOperatorCoordinator保存hudi表相关参数,如提交instant更新Timeline相关记录,相关元数据等,应该就是操作hudi表目录下的.hoodie目录。StreamWriteFunctionflushBucketflushRemaining方法最后调用this.eventGateway.sendEventToCoordinator(event);org.apache.hudi.sink.event.WriteMetadataEvent发到StreamWriteOperatorCoordinatorStreamWriteOperatorCoordinatororg.apache.hudi.sink.common.WriteOperatorFactorygetCoordinatorProvider方法中实例化,也是传入的初始配置,为了能保存最新的元数据,所以也要将schema发过去,在StreamWriteOperatorCoordinator中主要使用WriteMetadataEventwriteStatuses成员变量,所以将schema存在writeStatuses中。

4.4 StreamWriteOperatorCoordinator

  • Function功能说明

主要逻辑在notifyCheckpointComplete方法中,即每次checkpoint完成后执行,总体分为2部分,commitInstant和hive同步。

commitInstant方法中,从eventBuffer中读取WriteMetadataEventwriteStatus,若前面的步骤中真的有数据处理,这里获取到的writeResults不为空,则调用doCommit方法提交相关信息。

如果commitInstant真的提交了数据,返回true,则会调用syncHiveIfEnabled方法执行hive同步操作。最终其实调用到HiveSyncToolsyncHoodieTable方法,从这个方法可以看到Hive同步支持的一些功能,自动建数据库、自动建数据表、自动建分区;元数据同步功能通过将hudi表最新的提交中的元数据从Hive metastore查出的表的元数据对比,如果不同则将元数据变化同步到Hive metastore中。

  • 处理逻辑扩展

上面提到,commitInstant方法中,如果writeResults不为空,则会调用doCommit方法,所以在调用doCommit之前添加更新schema的逻辑,从自定义的SchemaWithWriteStatus中读取schema,参考start方法的逻辑重新生成writeClient。

StreamWriteOperatorCoordinator修改后,在之前数据已经写入文件的基础上,新增的字段、修改的类型已经能同步到hudi表(指.hoodie目录)及hive metastore中,在Hive中COW表也能正常查询。但是对于MOR表,新增的字段只是写到了增量日志文件中,读优化表(_ro)查不到新增字段的数据,所以还要修改Compaction处理类。

4.5 Compaction及Clean类

如上面流程图,compaction分三步:生成压缩计划、执行压缩、提交压缩执行结果。都是类似的操作,先是CompactionPlanOperator接收到DataStream<SchemaWithHoodieRecord>,更新元数据,后续的CompactionPlanEventCompactionCommiEvent也可带上schema,更新各自的table、writeClient等成员。CleanFunction也类似。

到目前为止,MOR表的读优化表在Hive也能查询到新增列的数据,历史parquet文件中没有新增字段,查询结果中新增字段为null。但是实时表(_rt)查询还有点问题。如果查询rt表涉及历史的parquet文件(没有新增字段,至于为什么肯定是Parquet文件,后面会说到通过调试发现,如果以后发现有其他情况再补充),则会报类似这样的错误:

"Field new_col2 not found in log schema. Query cannot proceed! Derived Schema Fields: ..."

五、MOR rt表查询bug解决

5.1 分析

在Hudi源码中搜索该报错信息,找到两个位置,实时表对应的位置是org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils#generateProjectionSchema

public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
                                                List<String> fieldNames) {
    /**
     * ......
     */
    List<Schema.Field> projectedFields = new ArrayList<>();
    for (String fn : fieldNames) {
      Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
      if (field == null) {
        throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
            + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
      } else {
        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
      }
    }

    Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
        writeSchema.getNamespace(), writeSchema.isError());
    projectedSchema.setFields(projectedFields);
    return projectedSchema;
  }

遍历了fieldNames,如果schemaFieldsMap中找不到这个字段则报错,所以fieldNames包含新增的字段,schemaFieldsMap为历史parquet文件中读取出来的字段信息,

该方法在org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader#init 中被调用。源码中也写了一个TODO还没有DO:

// TODO(vc): In the future, the reader schema should be updated based on log files & be able
    // to null out fields not present before

即未来基于日志文件更新reader schema,并且会支持将新增的字段置为空值。

AbstractRealtimeRecordReader#init方法中看到,HoodieRealtimeRecordReaderUtils#generateProjectionSchema的fieldNames参数从jobConf中读取,包含新增的字段。通过,发现jobConf的properties中以下几个key的值包含字段信息(new_col2为新增字段):

hive.io.file.readcolumn.names -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_name,last_name,alias,new_col,new_col2

schema.evolution.columns -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_name,last_name,alias,new_col,new_col2

serialization.ddl -> struct customers1_rt { string _hoodie_commit_time, string _hoodie_commit_seqno, string _hoodie_record_key, string _hoodie_partition_path, string _hoodie_file_name, i32 id, string first_name, string last_name, string alias, double new_col, double new_col2}

schema.evolution.columns.types -> string,string,string,string,string,int,string,string,string,double,double

fieldNames参数就使用到了hive.io.file.readcolumn.names的值。schema.evolution.columns中包含了新增字段,且与之对应的schema.evolution.columns.types中包含了字段的类型(看起来是Hive中的类型)。所以尝试将HoodieRealtimeRecordReaderUtils#generateProjectionSchema中抛异常的位置改为在projectedFields中仍然添加一个字段,默认值为null,字段schema通过schema.evolution.columns.types中的类型转换而来。

(经过测试,即使不解决这个bug,更新一下历史的数据也可以,但是实际情况中肯定不会用这种方法)

5.2 修改

AbstractRealtimeRecordReader#init方法中调用HoodieRealtimeRecordReaderUtils#generateProjectionSchema的位置改成:

readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields,
          jobConf.get("schema.evolution.columns"), jobConf.get("schema.evolution.columns.types"));

HoodieRealtimeRecordReaderUtils#generateProjectionSchema改为:

public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap, List<String> fieldNames, String csColumns, String csColumnTypes) { /** * ... */ List<Schema.Field> projectedFields = new ArrayList<>(); Map<String, Schema.Field> fieldMap = getFieldMap(csColumns, csColumnTypes); for (String fn : fieldNames) { Schema.Field field = schemaFieldsMap.get(fn.toLowerCase()); if (field == null) { // throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " // + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); projectedFields.add(fieldMap.get(fn)); } else { projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())); } } Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(), writeSchema.getNamespace(), writeSchema.isError()); projectedSchema.setFields(projectedFields); return projectedSchema; } 

其中getFieldMap方法为:

private static Map<String, Schema.Field> getFieldMap(String csColumns, String csColumnTypes) {
    LOG.info(String.format("columns:%s\ntypes:%s", csColumns, csColumnTypes));
    Map<String, Schema.Field> result = new HashMap<>();
    String[] columns = csColumns.split(",");
    String[] types = csColumnTypes.split(",");
    for (int i = 0; i < columns.length; i++) {
      String columnName = columns[i];
      result.put(columnName, new Schema.Field(columnName,toSchema(types[i]), null, null));
    }
    return result;
  }

  private static Schema toSchema(String hiveSqlType) {
    switch (hiveSqlType.toLowerCase()) {
      case "boolean":
        return Schema.create(Schema.Type.BOOLEAN);
      case "byte":
      case "short":
      case "integer":
        return Schema.create(Schema.Type.INT);
      case "long":
        return Schema.create(Schema.Type.LONG);
      case "float":
        return Schema.create(Schema.Type.FLOAT);
      case "double":
      case "decimal":
        return Schema.create(Schema.Type.DOUBLE);
      case "binary":
        return Schema.create(Schema.Type.BYTES);
      case "string":
      case "char":
      case "varchar":
      case "date":
      case "timestamp":
      default:
        return Schema.create(Schema.Type.STRING);

    }
  }

修改后,重新将依赖包部署到Hive中,rt表也能正常查询。从parquet文件中读取的数据,新增的字段则显示的空值

六、总结

通过这种方法,实现了元数据动态同步到Hive。

https://www.pudn.com/news/6228cfe39ddf223e1ad14f23.html

https://hudi.apache.org/docs/next/schema_evolution/