博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark性能调优之道——解决Spark数据倾斜(Data Skew)的N种姿势
阅读量:6904 次
发布时间:2019-06-27

本文共 2184 字,大约阅读时间需要 7 分钟。

为何要处理数据倾斜(Data Skew)

\\

什么是数据倾斜

\\

对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。

\\

何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

\\

数据倾斜是如何造成的

\\

在Spark中,同一个Stage的不同Partition可以并行处理,而具体依赖关系的不同Stage之间是串行处理的。假设某个Spark Job分为Stage 0和Stage 1两个Stage,且Stage 1依赖于Stage 0,那Stage 0完全处理结束之前不会处理Stage 1。而Stage 0可能包含N个Task,这N个Task可以并行进行。如果其中N-1个Task都在10秒内完成,而另外一个Task却耗时1分钟,那该Stage的总时间至少为1分钟。换句话说,一个Stage所耗费的时间,主要由最慢的那个Task决定。

\\

由于同一个Stage内的所有Task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同Task之间耗时的差异主要由该Task所处理的数据量决定。

\\

Stage的数据来源主要分为如下两类

\\
  • 从数据源直接读取。如读取HDFS,Kafka\\t
  • 读取上一个Stage的Shuffle数据\

如何缓解/消除数据倾斜

\\

尽量避免数据源的数据倾斜

\\

以Spark Stream通过DirectStream方式读取Kafka数据为例。由于Kafka的每一个Partition对应Spark的一个Task(Partition),所以Kafka内相关Topic的各Partition之间数据是否平衡,直接决定Spark处理该数据时是否会产生数据倾斜。

\\

如《》一文所述,Kafka某一Topic内消息在不同Partition之间的分布,主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner,则每条消息会随机发送到一个Partition中,从而从概率上来讲,各Partition间的数据会达到平衡。此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜。

\\

但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据放于同一个Partition中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个Partition中。此时,如果产生了数据倾斜,则需要通过其它方式处理。

\\

调整并行度分散同一个Task的不同Key

\\

原理

\\

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。

\\

如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

\\

a20aa0e91fe63c743b4e2375e758d248.png

\\

案例

\\

现有一张测试表,名为student_external,内有10.5亿条数据,每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5条数据,并通过一些处理,使得id为9亿到9.4亿间的所有数据对12取模后余数为8(即在Shuffle并行度为12时该数据集全部被HashPartition分配到第8个Task),其它数据集对其id除以100取整,从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中,而id小于9.4亿的数据全部分配到同一个Task中。处理过程如下

\\
\INSERT OVERWRITE TABLE test\SELECT CASE WHEN id \u0026lt; 940000000 THEN (9500000  + (CAST (RAND() * 8 AS INTEGER)) * 12 )\       ELSE CAST(id/100 AS INTEGER)\       END,\       name\FROM student_external\WHERE id BETWEEN 900000000 AND 1050000000;
\\

通过上述处理,一份可能造成后续数据倾斜的测试数据即以准备好。接下来,使用Spark读取该测试数据,并通过groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下

\\
\public class SparkDataSkew {\  public static void main(String[] args) {\    SparkSession sparkSession = SparkSession.builder()\      .appName(\"SparkDataSkewTunning\")\      .config(\"hive.metastore.uris\

转载地址:http://zdmdl.baihongyu.com/

你可能感兴趣的文章
写了一个关于简单的Excel表格导入sqlserver 2013-08-04 15:23 391人阅读 评论(...
查看>>
码农也来关注下经济问题<美元加息>对我们的影响
查看>>
Linux用户及用户组管理
查看>>
Python随笔11
查看>>
ARTS打卡计划第三周-Review
查看>>
jQuery validation
查看>>
JavaScript知识架构学习路径(一)- 变量篇
查看>>
正则表达式
查看>>
20050425:公测啊,晚点再说
查看>>
Windows Azure媒体服务使得伦敦奥运会的云端传输成为可能
查看>>
错误:媒体集有 2 个媒体簇,但只提供了 1 个 sql2005 备份错误。
查看>>
软件运行过慢?系统打开特别慢?连系统都装不了?可能是硬盘坏了
查看>>
Mongo的安全验证
查看>>
thinkphp Class 'PDO' not found 错误
查看>>
(实用篇)PHP ftp上传文件操作类
查看>>
Lucene5.x 中文 同义词
查看>>
ASCII码、Unicode码 转中文
查看>>
矩阵的相似对角化
查看>>
java常用类--与用户互动
查看>>
innobackupex 备份数据搭建 MySQL Slave
查看>>