视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【SparkCore(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
  4. 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
  5. 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】
  6. 尚硅谷大数据技术Spark教程-笔记06【SparkCore(案例实操,电商网站)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 原理及简单演示】15:49

P106【106.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 问题】03:39

P107【107.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 自定义实现】10:55

P108【108.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 自定义实现 – 1】07:14

P109【109.尚硅谷_SparkCore – 核心编程 – 数据结构 – 广播变量】17:16


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 原理及简单演示】15:49

5.2 累加器

5.2.1 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

​​​​​​​

package com.atguigu.bigdata.spark.core.accimport org.apache.spark.{SparkConf, SparkContext}object Spark01_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//reduce:分区内计算,分区间计算//val i: Int = rdd.reduce(_+_)//println(i)var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum) // sum = 0sc.stop()}}
package com.atguigu.bigdata.spark.core.accimport org.apache.spark.{SparkConf, SparkContext}object Spark02_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))// 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorrdd.foreach(num => {// 使用累加器sumAcc.add(num)})// 获取累加器的值println(sumAcc.value) // 10sc.stop()}}

P106【106.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 问题】03:39

package com.atguigu.bigdata.spark.core.accimport org.apache.spark.{SparkConf, SparkContext}object Spark03_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))// 获取系统累加器// Spark默认就提供了简单数据聚合的累加器val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorval mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})// 获取累加器的值// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行// 一般情况下,累加器会放置在行动算子进行操作mapRDD.collect()mapRDD.collect()println(sumAcc.value) // 20sc.stop()}}

P107【107.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 自定义实现】10:55

package com.atguigu.bigdata.spark.core.accimport org.apache.spark.util.AccumulatorV2import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark04_Acc_WordCount {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型 IN : 累加器输入的数据类型 String OUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6) */class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {}}

P108【108.尚硅谷_SparkCore – 核心编程 – 数据结构 -累加器 – 自定义实现 – 1】07:14

package com.atguigu.bigdata.spark.core.accimport org.apache.spark.util.AccumulatorV2import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark04_Acc_WordCount {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型 IN : 累加器输入的数据类型 String OUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6) */class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset(): Unit = {wcMap.clear()}// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach {case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}}

P109【109.尚硅谷_SparkCore – 核心编程 – 数据结构 – 广播变量】17:16

5.3 广播变量

5.3.1 实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

package com.atguigu.bigdata.spark.core.accimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark05_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))//val rdd2 = sc.makeRDD(List(//("a", 4),("b", 5),("c", 6)//))val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))// join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)//joinRDD.collect().foreach(println)// (a, 1),(b, 2),(c, 3)// (a, (1,4)),(b, (2,5)),(c, (3,6))rdd1.map {case (w, c) => {val l: Int = map.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)//(a,(1,4))//(b,(2,5))//(c,(3,6))sc.stop()}}
package com.atguigu.bigdata.spark.core.accimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark06_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case (w, c) => {// 访问广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)//(a,(1,4))//(b,(2,5))//(c,(3,6))sc.stop()}}