前言:
眼前你们对“sparksql hive on spark”可能比较注意,大家都需要知道一些“sparksql hive on spark”的相关文章。那么小编在网络上网罗了一些关于“sparksql hive on spark””的相关内容,希望小伙伴们能喜欢,同学们快快来学习一下吧!一. SparkSQL简介
Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。
Dremel ------> Drill(Apache)------>Impala(Cloudrea) Presto(Hotonworks)
Hive -------> Shark(对Hive的模仿,区别在于使用Spark进行计算)
Shark------->SparkSQL(希望拜托对Hive的依赖,兼容Hive)
SparkSQL: 如果使用SparkSQL执行Hive语句! 这种行为称为 Spark on Hive
如果使用Hive,执行Hive语句,但是在配置Hive时,修改了Hive的执行引擎,将执行引擎修改为了Spark! 这种行为称为Hive on Spark!
特点
易整合。 在程序中既可以使用SQL,还可以使用API!统一的数据访问。 不同数据源中的数据,都可以使用SQL或DataFrameAPI进行操作,还可以进行不同数据源的Join!对Hive的无缝支持支持标准的JDBC和ODBC二. 数据模型
DataFrame:DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataSet:是DataFrame的一个扩展,类似于数据库中的表。
区别
DataSet是强类型。DataSet=DataSet[Person].
DataFrame是弱类型。DataFrame=DataSet[Row],是DataSet的一个特例。
三. SparkSQL核心编程
Spark Core:要执行应用程序,要首先构建上下文环境对象SparkContext.
SparkSQL
老的版本中,提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。
最新的版本SparkSQL的查询入口是SparkSession。是SQLContext和HiveContext的组合,SparkSession内部封装了SparkContext
1. IDEA开发SparkSQL
pom依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency>2. SparkSession创建
方式一
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession val session: SparkSession = SparkSession.builder .master("local[*]") .appName("MyApp") .getOrCreate()
方式二
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyApp") val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()关闭
session.stop()获取SparkContext
session.sparkContext //获取SparkSession中的SparkContext3. DataFrame3.1 入门案例
/** * DataFrame入门案例 */ @Test def createDF: Unit = { //数据格式:{"username":"zhangsan","age":20} //读取json格式文件创建DataFrame val df: DataFrame = session.read.json("input/1.txt") //创建临时视图:person df.createOrReplaceTempView("person") //查看person表 df.show() //通过sql查询 session.sql( """ |select |* |from |person |""".stripMargin).show() }3.2 显示数据
df.show()3.3 创建DF
①读取数据源创建
session.readcsv format jdbc json load option options orc parquet schema table text textFile
②通过RDD创建DataFrame
样例类
实际开发中,一般通过样例类将RDD转换为DataFrame
先导入隐式转换包,通过rdd.toDF()方法转换
/** * Person样例类 */ case class Person(name: String, age: Int) /** * 通过RDD创建DataFrame */ @Test def creatDFByRDD { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyApp") val session: SparkSession = SparkSession.builder().config(conf).getOrCreate() //根据样例类创建RDD val rdd: RDD[(String, Int)] = session.sparkContext.makeRDD(List(("zhangsan", 12), ("lisi", 45), ("wangwu", 23))) val person_RDD: RDD[Person] = rdd.map { case (name, age) => Person(name, age) } //导入隐式包,session是上文创建的SparkSession对象 import session.implicits._ val df: DataFrame = person_RDD.toDF() //查看DF df.show() session.stop() }
③从hive表查询**
3.4 SQL查询语法
首先由DataFrame创建一个视图,然后用Sql语法操作
/*****************创建视图************************///临时视图createOrReplaceTempView("视图名") //不会报错createTempView("视图名") //视图名已存在,会报错//永久视图df.createGlobalTempView("person")/******************Sql查询*************************///临时视图:person//查询全局视图需要添加:global_temp.person session.sql( """ |select |* |from |person |""".stripMargin).show()
注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
4. DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
4.1 创建DS
样例类RDD创建
/** * Person样例类 */ case class Person(name: String, age: Int) /** * 通过RDD创建DataFrame */ @Test def creatDFByRDD { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MyApp") val session: SparkSession = SparkSession.builder().config(conf).getOrCreate() //根据样例类创建RDD val rdd: RDD[(String, Int)] = session.sparkContext.makeRDD(List(("zhangsan", 12), ("lisi", 45), ("wangwu", 23))) val person_RDD: RDD[Person] = rdd.map { case (name, age) => Person(name, age) } //导入隐式包,session是上文创建的SparkSession对象 import session.implicits._ val df: Dataset[Person] = person_RDD.toDS() //查看DF df.show() session.stop() }
基本类型的序列创建DataSet
val list: Seq[Int] = List(1, 2, 3, 4)import session.implicits._val df1 = list.toDS()
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
5. RDD、DataFrame、DataSet三者的关系相互转换
代码示例
object SparkSQL01_Demo { def main(args: Array[String]): Unit = { //创建上下文环境配置对象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo") //创建SparkSession对象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换 //spark不是包名,是上下文环境对象名 import spark.implicits._ //读取json文件 创建DataFrame {"username": "lisi","age": 18} val df: DataFrame = spark.read.json("input/test.json") //df.show() //SQL风格语法 df.createOrReplaceTempView("user") //spark.sql("select avg(age) from user").show //DSL风格语法 //df.select("username","age").show() //*****RDD=>DataFrame=>DataSet***** //RDD val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",20))) //DataFrame val df1: DataFrame = rdd1.toDF("id","name","age") //df1.show() //DateSet val ds1: Dataset[User] = df1.as[User] //ds1.show() //*****DataSet=>DataFrame=>RDD***** //DataFrame val df2: DataFrame = ds1.toDF() //RDD 返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始 val rdd2: RDD[Row] = df2.rdd //rdd2.foreach(a=>println(a.getString(1))) //*****RDD=>DataSet***** rdd1.map{ case (id,name,age)=>User(id,name,age) }.toDS() //*****DataSet=>=>RDD***** ds1.rdd //释放资源 spark.stop() }}case class User(id:Int,name:String,age:Int)DataFrame
1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如
testDF.foreach{ line => val col1=line.getAs[String]("col1") val col2=line.getAs[String]("col2")}
每一列的值没法直接访问
2、DataFrame与Dataset一般与spark ml同时使用
3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如
dataDF.createOrReplaceTempView("tmp")spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
//保存val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()//读取val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()
利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定
Dataset
这里主要对比Dataset和DataFrame,因为Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同
DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段
而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型/** rdd ("a", 1) ("b", 1) ("a", 1) * */val test: Dataset[Coltest]=rdd.map{line=> Coltest(line._1,line._2) }.toDStest.map{ line=> println(line.col1) println(line.col2) }
可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题
转化
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换
DataFrame/Dataset转RDD
这个转换很简单
val rdd1=testDF.rddval rdd2=testDS.rddRDD转DataFrame
import spark.implicits._val testDF = rdd.map {line=> (line._1,line._2) }.toDF("col1","col2")
一般用元组把一行的数据写在一起,然后在toDF中指定字段名
RDD转Dataset
import spark.implicits._case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型val testDS = rdd.map {line=> Coltest(line._1,line._2) }.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
Dataset转DataFrame
这个也很简单,因为只是把case class封装成Row
import spark.implicits._val testDF = testDS.toDFDataFrame转Dataset
import spark.implicits._case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便
特别注意
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用
总结:在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)