Scala编写Spark的WorkCount创建一个Maven项目

在pom.xml中添加依赖和插件

    8    8    UTF-8    3.2.3    2.12.15                org.scala-lang        scala-library        ${scala.version}                    org.apache.spark        spark-core_2.12        ${spark.version}                nexus-aliyun        Nexus aliyun        default        http://maven.aliyun.com/nexus/content/groups/public                    false            never                            true            never                        ali-plugin        http://maven.aliyun.com/nexus/content/groups/public/                    false            never                            true            never                                                                net.alchim31.maven                scala-maven-plugin                3.2.2                                                    org.apache.maven.plugins                maven-compiler-plugin                3.5.1                                                net.alchim31.maven            scala-maven-plugin                                                scala-compile-first                    process-resources                                            add-source                        compile                                                                        scala-test-compile                    process-test-resources                                            testCompile                                                                            org.apache.maven.plugins            maven-compiler-plugin                                                compile                                            compile                                                                                    org.apache.maven.plugins            maven-shade-plugin            2.4.3                                                package                                            shade                                                                                                                            *:*                                                                    META-INF/*.SF                                    META-INF/*.DSA                                    META-INF/*.RSA                                                                                                                                                

创建一个scala目录

选择scala目录,右键,将目录转成源码包,或者点击maven的刷新按钮

编写Spark程序

import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/**  * 1.创建SparkContext  * 2.创建RDD  * 3.调用RDD的Transformation(s)方法  * 4.调用Action  * 5.释放资源  */object WordCount {  def main(args: Array[String]): Unit = {    val conf: SparkConf = new SparkConf().setAppName("WordCount")    //创建SparkContext,使用SparkContext来创建RDD    val sc: SparkContext = new SparkContext(conf)    //spark写Spark程序,就是对抽象的神奇的大集合【RDD】编程,调用它高度封装的API    //使用SparkContext创建RDD    val lines: RDD[String] = sc.textFile(args(0))    //Transformation 开始 //    //切分压平    val words: RDD[String] = lines.flatMap(_.split(" "))    //将单词和一组合放在元组中    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))    //分组聚合,reduceByKey可以先局部聚合再全局聚合    val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)    //排序    val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)    //Transformation 结束 //    //调用Action将计算结果保存到HDFS中    sorted.saveAsTextFile(args(1))    //释放资源    sc.stop()  }}

使用maven打包

提交任务

•上传jar包到服务器,然后使用sparksubmit命令提交任务

/bigdata/spark-3.2.3-bin-hadoop3.2/bin/spark-submit \--master spark://node-1.51doit.cn:7077 \--executor-memory 1g --total-executor-cores 4 \--class cn._51doit.spark.day01.WordCount \/root/spark-in-action-1.0.jar hdfs://node-1.51doit.cn:9000/words.txt hdfs://node-1.51doit.cn:9000/out 参数说明:--master 指定masterd地址和端口,协议为spark://,端口是RPC的通信端口--executor-memory 指定每一个executor的使用的内存大小--total-executor-cores指定整个application总共使用了cores--class 指定程序的main方法全类名jar包路径 args0 args1 

Java编写Spark的WordCount使用匿名实现类方式

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class JavaWordCount {    public static void main(String[] args) {        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");        //创建JavaSparkContext        JavaSparkContext jsc = new JavaSparkContext(sparkConf);        //使用JavaSparkContext创建RDD        JavaRDD lines = jsc.textFile(args[0]);        //调用Transformation(s)        //切分压平        JavaRDD words = lines.flatMap(new FlatMapFunction() {            @Override            public Iterator call(String line) throws Exception {                return Arrays.asList(line.split(" ")).iterator();            }        });        //将单词和一组合在一起        JavaPairRDD wordAndOne = words.mapToPair(                new PairFunction() {                    @Override                    public Tuple2 call(String word) throws Exception {                        return Tuple2.apply(word, 1);                    }        });        //分组聚合        JavaPairRDD reduced = wordAndOne.reduceByKey(                new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        //排序,先调换KV的顺序VK        JavaPairRDD swapped = reduced.mapToPair(                new PairFunction<Tuple2, Integer, String>() {            @Override            public Tuple2 call(Tuple2 tp) throws Exception {                return tp.swap();            }        });        //再排序        JavaPairRDD sorted = swapped.sortByKey(false);        //再调换顺序        JavaPairRDD result = sorted.mapToPair(                new PairFunction<Tuple2, String, Integer>() {            @Override            public Tuple2 call(Tuple2 tp) throws Exception {                return tp.swap();            }        });        //触发Action,将数据保存到HDFS        result.saveAsTextFile(args[1]);        //释放资源        jsc.stop();    }}

使用Lambda表达式方式

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount");        //创建SparkContext        JavaSparkContext jsc = new JavaSparkContext(conf);        //创建RDD        JavaRDD lines = jsc.textFile(args[0]);        //切分压平        JavaRDD words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());        //将单词和一组合        JavaPairRDD wordAndOne = words.mapToPair(word -> Tuple2.apply(word, 1));        //分组聚合        JavaPairRDD reduced = wordAndOne.reduceByKey((a, b) -> a + b);        //调换顺序        JavaPairRDD swapped = reduced.mapToPair(tp -> tp.swap());        //排序        JavaPairRDD sorted = swapped.sortByKey(false);        //调换顺序        JavaPairRDD result = sorted.mapToPair(tp -> tp.swap());        //将数据保存到HDFS        result.saveAsTextFile(args[1]);        //释放资源        jsc.stop();    }}

本地运行Spark和Debug

spark程序每次都打包上在提交到集群上比较麻烦且不方便调试,Spark还可以进行Local模式运行,方便测试和调试

在本地运行

 //Spark程序local模型运行,local[*]是本地运行,并开启多个线程val conf: SparkConf = new SparkConf()  .setAppName("WordCount")  .setMaster("local[*]") //设置为local模式执行

并输入运行参数
hdfs://linux01:9000/words.txt hdfs://linux01:9000/out/out01

读取HDFS中的数据

由于往HDFS中的写入数据存在权限问题,所以在代码中设置用户为HDFS目录的所属用户

//往HDFS中写入数据,将程序的所属用户设置成更HDFS一样的用户System.setProperty("HADOOP_USER_NAME", "root")