一篇博客让小白快速了解数据倾斜原理以及解决方案

一篇博客让小白快速了解数据倾斜原理以及解决方案

数据倾斜原理及解决方案

何为数据倾斜?

在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:

正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:

  • 一种是唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)
  • 一种是唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一

总结:通俗易懂来说,就是数据在每台节点分布不均匀,有百分一数据在一个节点上,百分之九十九在另一台节点上。

数据倾斜:

数据倾斜在MapReduce编程模型中十分常见,用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的。

数据倾斜会产生哪些原因呢

  • 在数据量超多的节点上一直运行。这个时间非常的缓慢,其余节点便只能进行一个等待。
  • 有可能会发生内存溢出。

如何定位发生数据倾斜的代码

数据倾斜只会发生在shuffle中,下面是常用的可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是代码中使用了这些算子的原因

解决方案

方案一:可采取扩容的方式

原理: 将原先一样的key通过附加前缀变成不一样的key,然后就看可以将这些处理后的“不同的key”分散到多个task中那个去处理,而不是让一个task去处理大量相同的key。此方法与方法六的区别在于,有大量倾斜key的情况,没法将部分key拆分出来单独处理,因此只能对整个RDD 进行数据扩容,对资源要求很高。
缺点:更多的是缓解数据倾斜,而不是彻底避免,而且需要对整个RDD进行扩容,对内存资源要求较高。

方案二:尽量将reduce join转化成map join

实现原理:
普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是mao join ,而此时不会发生shuffle操作,也就不会发生数据倾斜。
方案优点:
对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:
适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

方案三:两阶段聚合(局部聚合+全局聚合)

方案优点:
对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
方案缺点:
仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

方案四:在shuffle过程中加盐(随机数),并行task数量可增加

实现原理:RDD11中的倾斜的Key加上1000以内的随机数,然后和RDD2进行Join操作?不行!此时我们一定需要把RDD11中的Key在RDD2中的相同的Key进行1000以内的随机数,然后在进行Join操作,这样做的好处:
让倾斜的Key更加不倾斜,在实际生产环境下会极大的解决在两个进行Join的RDD数量都很大且其中一个RDD有一个或者两三个明显倾斜的Key的情况下的数据倾斜问题

方案五:过滤少数导致倾斜的key

方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。

方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。

方案六:使用Hive ETL预处理数据

方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。

方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。

项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。