Flink KeyBy分布不均匀问题及解决方法
问题现象
当Key数量较少时,Flink流执行KeyBy()
,并且设置的并行度setParallelism()
不唯一时,会出现分到不同task上的key数量不均匀的情况,即:
- 某些subtask没有分到数据,但是某些subtask分到了较多的key对应的数据
Key数量较大时,不容易出现这类不均匀的情况。
原因分析
在多并行度配置下,Flink会对Key进行分组,即得到Key Group
,Key Group
分组的实现方法可参考代码。
Key Group计算公式
最终的计算公式为:
int keyToParallelOperator= MathUtils.murmurHash(key.hashCode()) % maxParallelism * parallelism / maxParallelism
其中各个参数含义如下:
keyToParallelOperator
: 最终这个key对应到的subtask的IDMathUtils.murmurHash()
: Flink原生定义的一种hash散列方法,maxParallelism
: Flink KeyBy后设置的最大并行度,通过方法.setMaxParallelism()
配置,默认值为1<<7
,即128
public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
parallelism
: Flink KeyBy后设置的并行度,通过方法.setParallelism()
配置
Key Group计算方法对应源码
KeyGroupRangeAssignment
中的计算过程主要涉及以下三个方法
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } public static int assignToKeyGroup(Object key, int maxParallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; }
解决方法
基于Flink Key Group计算方法,对Key值进行转换,确保每个Key能分到指定的SubTask中执行。
KeyGroup分区验证代码
首先,验证int
类型key转换到分区的代码是否一致。
场景:给定5个分区,设定Key为0-4
,基于上述公式,计算每个key对应的分区。
val max= 128
val p = 5
println(s"Parallelism: $p,MaxParallelism: $max") for (i <- 0 to 4) { val partition = MathUtils.murmurHash(i) % max * p / max println(s"key: $i, partition: $partition") }
结果如下所示,其中个分区3, 4
分到了两个key,而有两个分区一个key都没有。
Parallelism: 5,MaxParallelism: 128 key: 0, partition: 3 key: 1, partition: 3 key: 2, partition: 4 key: 3, partition: 4 key: 4, partition: 0
使用以下代码验证key是否正确分配:其中设定key为0-4,并且keyBy后的process设置为Parallelism=5, MaxParallelism=128
。
env.addSource(new SourceFunction[Int] { override def run(ctx: SourceFunction.SourceContext[Int]): Unit = { for (i <- 0 to 4) { ctx.collect(i) } } override def cancel(): Unit = { } }) .keyBy(e => e) .process(new KeyedProcessFunction[Int, Int, Int] { override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, Int]#Context, out: Collector[Int]): Unit = { out.collect(value) } }) .setParallelism(5) .setMaxParallelism(128) .addSink(new RichSinkFunction[Int] { override def invoke(value: Int, context: SinkFunction.Context[_]): Unit = { println(value) } override def close(): Unit = { Thread.sleep(3600 * 1000) } })
测试结果如下:subtask 3, 4
分到了两个key,subtask 0
分到了一个key,key的分配与上述结果一致
KeyGroup分区验证结果
实现平衡Key方法
首先构建key的转换方法:
/**
* 获取重平衡后key值方法
*
* @param parallelism 并行度设置
* @param maxParallelism 最大并行度设置
* @return
*/
def getRebalancedKeyList(parallelism: Int, maxParallelism: Int = 128): Array[Int] = { println(s"Parallelism: $parallelism,MaxParallelism: $maxParallelism") var rebalancedKeyPartitionMap: Map[Int, Int] = Map() var i = 0 while (rebalancedKeyPartitionMap.size < parallelism && i < 128) { // 当找到足够的key值或找了超过128次时,则停止查找 val partition = keyToPartition(i, parallelism, maxParallelism) if (!rebalancedKeyPartitionMap.contains(partition)) { rebalancedKeyPartitionMap += ((partition, i)) } i += 1 } rebalancedKeyPartitionMap.values.toArray } /** * Flink中,key到Partition转换公式 * *