数据倾斜

分类

join其中一个表数据量小,key比较集中分发到某一个或几个reduce的数据远高于平均值
大表与小表,空值过多这些空值都由一个reduce处理,处理慢
group bygroup by 维度太少,某字段量太大处理某值的reduce非常慢
count distinct某些特殊值过多处理此特殊值的reduce慢

数据倾斜原因分析

数据倾斜表现
  • 任务日志进度长度为99%,在日志监控进度条显示只有几个reduce进度一直没有完成。
  • 某一task处理时长 > 平均处理时长
  • executor出现Java heap space、OutOfMemoryError、executor dead等
数据原因
  • 主表驱动表应该选择分布均匀的表作为驱动表,并做好列裁剪。
  • 大小表Join,需要记得使用map join,小表会先进入内存,在map端即会完成reduce.
  • 此种情形最为常用!!!大表join大表时,关联字段存在大量空值null key
  • 数据类型不匹配关联,先转换数据类型

常见shuffle算子

  • 去重
def distinct()def distinct(numPartitions: Int)
  • 聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) =>
  • 排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length
  • 重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
  • 集合或者表操作
def intersection(other: RDD[T]): RDD[T]def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]def intersection(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

常见shuffle的SQL操作

  • 聚合函数
groupby +(sum, count, distinct count, max, min, avg等)sum, count, distinct count, max, min, avg等
  • join函数

数据准备

通过程序生成users.txt ,log.tx, log.txt_nullt , count.txt数据

数据文件大小

du -sh users.txt log.txt log.txt_null count.txt2.0G    log.txt (key值=1 倾斜)1.9G    log.txt_null (含有null值)3.7G    count.txt324K    users.txt
drop table t_user;create table t_user (        id string,        name string,        role string,        sex string,        birthday string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';drop table t_log;create table t_log (        id string,        user_id string,        method string,        response string,        url string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';drop table t_log_null;create table t_log_null (        id string,        user_id string,        method string,        response string,        url string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';drop table t_count;create table t_count (        id string,        user_id string,        role_id string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';drop table t_relation;create table t_relation (        id string,        user_id string,        role_id string,        name string,        sex string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';drop table t_join;create table t_join (        id string,        name string,        role string,        url string,        method string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';// 导入数据load data local inpath '/data/users.txt' into table t_user;load data local inpath '/data/log.txt' into table t_log;load data local inpath '/data/log.txt_null' into table t_log_null;load data local inpath '/data/count.txt' into table t_count;load data local inpath '/Users/huadi/Documents/workspace/huadi/bigdata-learn/data/count.txt' into table t_relation;

数据量

select count(0) from t_log;+------------+|    _c0     |+------------+|  40000000  |+------------+select count(0) from t_log_null;+------------+|    _c0     |+------------+|  40000000  |+------------+select count(0) from t_user;+----------+|   _c0    |+----------+|  10000   |+----------+// key的分布 field : user_idselect * from (select user_id, count(*) cou from t_log group by user_id) order by cou desc limit 10;+----------+-----------+|  user_id |   count   |+----------+-----------+|     1    |  8000000  |+----------+-----------+|   8797   |    3415   |+----------+-----------+|   9548   |    3402   |+----------+-----------+|   5332   |    3398   |+----------+-----------+|   6265   |    3395   |+----------+-----------+|   4450   |    3393   |+----------+-----------+|   3279   |    3393   |+----------+-----------+|   888    |    3393   |+----------+-----------+|   5573   |    3390   |+----------+-----------+|   3986   |    3388   |+----------+-----------+// 1值特别多select * from (select user_id, count(*) cou from t_log_null group by user_id) order by cou desc limit 10;+----------+-----------+|  user_id |   count   |+----------+-----------+|          |  36000000 |+----------+-----------+|   8409   |    485    |+----------+-----------+|   3503   |    482    |+----------+-----------+|   8619   |    476    |+----------+-----------+|   7172   |    475    |+----------+-----------+|   6680   |    472    |+----------+-----------+|   4439   |    470    |+----------+-----------+|   815    |    466    |+----------+-----------+|   7778   |    465    |+----------+-----------+|   3140   |    463    |+----------+-----------+

模拟的数据 null值特别多

常见场景

备注:当前例子是基于spark-sql引擎

运行SQL

// sql执行命令和参数 ,下面的SQL 放在-e参数中执行spark-sql --executor-memory 5g --executor-cores 2 --num-executors 8 --conf spark.sql.shuffle.partitions=50 --conf spark.driver.maxResultSize=2G -e "${sql}"

常见优化配置

spark.sql.shuffle.partitions –提高并行度
spark.sql.autoBroadcastJoinThreshold –开启map端join配置,并修改广播表的大小
spark.sql.optimizer.metadataOnly –元数据查询优化
— spark-2.3.3之后
spark.sql.adaptive.enabled 自动调整并行度
spark.sql.ataptive.shuffle.targetPostShuffleInputSize –用来控制每个task处理的目标数据量
spark.sql.ataptive.skewedJoin.enabled –自动处理join时的数据倾斜
spark.sql.ataptive.skewedPartitionFactor –设置倾斜因子

JOIN 数据倾斜 :

先关闭map端join 配置 spark.sql.autoBroadcastJoinThreshold = -1

  • 空值问题
INSERT OVERWRITE TABLE t_join SELECT a.user_id AS id, b.name, b.role, a.url, a.method FROM t_log_null a JOIN t_user b ON a.user_id = b.id;

如果主表的关联字段 t1.id 存在过多的NULL值,那么可能会造成数据倾斜

解决办法如下:

  • 过滤掉无用的null值
INSERT OVERWRITE TABLE t_join SELECT a.user_id AS id, b.name, b.role, a.url, a.method FROM t_log_null a JOIN t_user b ON a.user_id = b.id WHERE a.user_id != '';

  • 加随机值
INSERT OVERWRITE TABLE t_join SELECT a.user_id AS id, b.name, b.role, a.url, a.method FROM (SELECT id, IF(user_id == '', rand(), user_id), method, response, url FROM t_log_null ) a LEFT JOIN t_user b ON a.user_id = b.id

  • 大表关联小表,可以用 map join 方式解决

开启map端join 配置 spark.sql.autoBroadcastJoinThreshold = 26214400

INSERT OVERWRITE TABLE t_join select a.user_id AS id, b.name, b.role, a.url, a.method from t_log a join t_user b on a.user_id = b.id


  • 存在某些JOIN的值数据量过多

先判断是否能在主表可以先进行去重

-- 例子select count(1) from t_log t1 inner join t_user t2 on t1.user_id = t2.id-- 解决办法如下select sum(t1.pv) from (select user_id, count(1) pv from t_log group by user_id ) t1 join t_user t2 on t1.user_id = t2.id
  • 不同数据类型关联也会产生数据倾斜滴!

例如注册表中ID字段为int类型,登录表中ID字段即有string类型,也有int类型。当按照ID字段进行两表之间的join操作时,默认的Hash操作会按int类型的ID来进行分配,这样会导致所有string类型ID的记录统统统统统统都都都都分配到一个Reduce里面去!!!

解决方法:把数字类型转换成字符串类型

on haha.ID = cast(xixi.ID as string)

GROUP BY 数据倾斜 :

  • GROUP BY + COUNT DISTINCT 重复数据量过多
select user_id, count(distinct role_id) from t_count group by user_id;

运行,直接报GC overhead limit 。

如果 column_1 + column_2 存在大量的重复数据,那么可以先进行去重再Group By
解决办法如下

distribute by 关键字控制map输出结果的分发,相同字段的map输出会发到一个reduce节点处理,如果字段是rand()一个随机数,能能保证每个分区的数量基本一致

select user_id, count(1) from ( select distinct user_id, role_id from t_count distribute by rand()) t group by user_id
  • 异常数据 导致数据倾斜
    如果不影响统计结果 ,直接过滤掉无用数据即可
  • key分布极度不均匀,某些Key过度集中

  • 可以采用key添加随机值 两阶段聚合(局部聚合+全局聚合)

Distinct 数据倾斜 :

解决办法如下:
distinct的底层调用的是reduceByKey()算子,如果key数据倾斜,就会导致整个计算发生数据倾斜,此时可以不对数据直接进行distinct,可以添加distribute by 也可以采用先分组再进行select操作。

-- 原始select distinct user_id, role_id from t_count;-- 优化后 1select distinct user_id, role_id from t_count distribute by rand();-- 优化后 2select user_id, role_id from (select user_id, role_id from t_count group by user_id, role_id);