import com.google.common.base.Splitter;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2; import java.util.Arrays;import java.util.Iterator; public class WordCount {    public static void main(String[] args) {        SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(sparkConf);        JavaRDD lines = sc.textFile("file:/Users/zhudechao/gitee/bigdata/xzdream_spark/input/a.txt");        JavaRDD words = lines.flatMap(new FlatMapFunction() {             @Override            public Iterator call(String line) throws Exception {                return Arrays.asList(line.split(" ")).iterator();            }        });         JavaPairRDD pairRDD = words.mapToPair(new PairFunction() {            @Override            public Tuple2 call(String word) throws Exception {                return new Tuple2(word,1);            }        });         JavaPairRDD wordCounts = pairRDD.reduceByKey(new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });         wordCounts.foreach(new VoidFunction<Tuple2>() {            @Override            public void call(Tuple2 wordcount) throws Exception {                System.out.println(wordcount._1 + ":"+wordcount._2);            }        });    }}
package com.huawei.mapreduce.wordcount;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountApp {    public static class MyMapper extends Mapper{        @Override        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {            String line = value.toString();            String[] splited = line.split("\t");            for (String word : splited) {                Text k2 = new Text(word);                LongWritable v2 = new LongWritable(1);                context.write(k2, v2);            }        }    }    public static class MyReducer extends Reducer{        @Override        protected void reduce(Text k2, Iterable v2s,                              Reducer.Context context) throws IOException, InterruptedException {            long count = 0L;            for (LongWritable times : v2s) {                count += times.get();            }            LongWritable v3 = new LongWritable(count);            context.write(k2, v3);        }    }    public static void main(String[] args) throws Exception{        Configuration conf = new Configuration();        Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());        //必须指定        job.setJarByClass(WordCountApp.class);        //指定本业务job要使用的Mapper业务类        job.setMapperClass(MyMapper.class);        //指定mapper输出数据的的类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        //指定本业务job要使用的Reducer业务类        job.setReducerClass(MyReducer.class);        //指定reducer输出数据的的类型        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        //输入数据来自哪里        FileInputFormat.setInputPaths(job, new Path(args[0]));        //输出数据写到哪里        FileOutputFormat.setOutputPath(job, new Path(args[1]));        //true表示将运行进度等信息及时输出给用户        boolean res = job.waitForCompletion(true);        System.exit(res?0:1);    }}
tar -zxvf jdk-8u341-linux-x64.tar.gzwget https://hcip-materials.obs.cn-north-4.myhuaweicloud.com/jdk-8u341-linux-x64.tar.gzscp ~/eclipse-workspace/HDFSAPI/target/HDFSAPI-jar-with-dependencies.jar root@xxx.xxx.xxx.xxx:/rootssh root@xxx.xxx.xxx.xxxyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFileyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFileyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile1yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.CreateFile2yarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file10.txtyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.ScanFile /user/test/hdfs/file11.txtyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.DeleteFile /user/test/hdfs/file10.txtyarn jar HDFSAPI-jar-with-dependencies.jar com.huawei.hdfs.IsFileyarn jar MRAPI-jar-with-dependencies.jar com.huawei.mapreduce.wordcount.WordCountApp /user/user1/MR_data /user/user1/MR_outhdfs dfs -mkdir /user/user1hdfs dfs -put MR_data /user/user1/hdfs dfs -ls /user/user1/MR_out/hdfs dfs -cat /user/user1/MR_out/part-r-00000hdfs dfs -mkdir -p /user/user1/MR/inputhdfs dfs -mkdir -p /user/user1/MR/outputhdfs dfs -put mrsort.txt /user/user1/MR/inputhdfs dfs -ls /user/user1/MR/outputhdfs dfs -cat /user/user1/MR/output/part-r-00000hdfs dfs -cat /user/user1/MR/output/part-r-00001hdfs dfs -cat /user/user1/MR/output/part-r-00002