1、Spark On Hive的配置

1)、在Spark客户端配置Hive On Spark

在Spark客户端安装包下spark-2.3.1/conf中创建文件hive-site.xml:

配置hive的metastore路径

 hive.metastore.uristhrift://mynode1:9083 

2)、启动Hive的metastore服务

hive --service metastore

3)、启动zookeeper集群,启动HDFS集群

4)、启动SparkShell读取Hive中的表总数,对比hive中查询同一表查询总数测试时间

./spark-shell --master spark://node1:7077,node2:7077--executor-cores 1 --executor-memory 1g --total-executor-cores 1import org.apache.spark.sql.hive.HiveContextval hc = new HiveContext(sc)hc.sql("show databases").showhc.sql("user default").showhc.sql("select count(*) from jizhan").show
  • 注意:

如果使用Spark on Hive 查询数据时,出现错误:

找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径:

2、读取Hive中的数据加载成DataFrame

  • 在Spark1.6版本中HiveContext是SQLContext的子类,连接Hive使用HiveContext。

在Spark2.0+版本中之后,建议使用SparkSession对象,读取Hive中的数据需要开启Hive支持。

  • 由于本地没有Hive环境,要提交到集群运行,提交命令:
./spark-submit --master spark://node1:7077,node2:7077 --executor-cores 1 --executor-memory 2G --total-executor-cores 1--class com.lw.sparksql.dataframe.CreateDFFromHive /root/test/HiveTest.jar

java:

SparkConf conf = new SparkConf();conf.setAppName("hive");JavaSparkContext sc = new JavaSparkContext(conf);//HiveContext是SQLContext的子类。HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("USE spark");hiveContext.sql("DROP TABLE IF EXISTS student_infos");//在hive中创建student_infos表hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");hiveContext.sql("DROP TABLE IF EXISTS student_scores"); hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");hiveContext.sql("LOAD DATA "+ "LOCAL INPATH '/root/test/student_scores'"+ "INTO TABLE student_scores");/** * 查询表生成DataFrame */DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80");hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");goodStudentsDF.registerTempTable("goodstudent");DataFrame result = hiveContext.sql("select * from goodstudent");result.show();/** * 将结果保存到hive表 good_student_infos */goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();for(Row goodStudentRow : goodStudentRows) {System.out.println(goodStudentRow);}sc.stop();

scala:

1.val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()2.spark.sql("use spark")3.spark.sql("drop table if exists student_infos")4.spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")5.spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")6.7.spark.sql("drop table if exists student_scores")8.spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")9.spark.sql("load data local inpath '/root/test/student_scores' into table student_scores")10.// val frame: DataFrame = spark.table("student_infos")11.// frame.show(100)12.13.val df = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")14.df.show(100)15.spark.sql("drop table if exists good_student_infos")16./**17.* 将结果写入到hive表中18.*/19.df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")