關注公眾號:
大數據技術派
,回復“資料”,領取1000G資料。
本文發于我的個人博客:Spark SQL知識點大全與實戰
我正在「大數據技術派」和朋友們討論有趣的話題,你也來加入吧
Spark SQL概述
1、什么是Spark SQL
Spark SQL是Spark用于結構化數據(structured data)處理的Spark模塊。
與基本的Spark RDD API不同,Spark SQL的抽象數據類型為Spark提供了關于數據結構和正在執行的計算的更多信息。
在內部,Spark SQL使用這些額外的信息去做一些額外的優化,有多種方式與Spark SQL進行交互,比如: SQL和DatasetAPI。
當計算結果的時候,使用的是相同的執行引擎,不依賴你正在使用哪種API或者語言。這種統一也就意味著開發者可以很容易在不同的API之間進行切換,這些API提供了最自然的方式來表達給定的轉換。
Hive是將Hive SQL轉換成 MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!
Spark SQL它提供了2個編程抽象,類似Spark Core中的RDD
(1)DataFrame
(2)Dataset
2、Spark SQL的特點
1)易整合
無縫的整合了SQL查詢和Spark編程
2)統一的數據訪問方式
使用相同的方式連接不同的數據源
3)兼容Hive
在已有的倉庫上直接運行SQL或者HiveQL
4)標準的數據連接
通過JDBC或者ODBC來連接
3、什么的DataFrame
在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似于傳統數據庫中的二維表格。DataFrame與RDD的主要區別在于,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數據源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由于無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關系操作,比函數式的RDD API要更加友好,門檻更低。
上圖直觀地體現了DataFrame和RDD的區別。
左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得 Spark SQL 可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什么。
DataFrame是為數據提供了Schema的視圖。可以把它當做數據庫中的一張表來對待,DataFrame也是懶執行的,但性能上比RDD要高,主要原因:優化的執行計劃,即查詢計劃通過Spark catalyst optimiser進行優化。比如下面一個例子:
為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之后又做了一次filter操作。
如果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾后的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基于關系代數的等價變換,將高成本的操作替換為低成本操作的過程。
4、什么是DataSet
DataSet是分布式數據集合。DataSet是Spark 1.6中添加的一個新抽象,是DataFrame的一個擴展。它提供了RDD的優勢(強類型,使用強大的lambda函數的能力)以及Spark SQL優化執行引擎的優點。DataSet也可以使用功能性的轉換(操作map,flatMap,filter等等)。
1)是DataFrame API的一個擴展,是SparkSQL最新的數據抽象;
2)用戶友好的API風格,既具有類型安全檢查也具有DataFrame的查詢優化特性;
3)用樣例類來定義DataSet中數據的結構信息,樣例類中每個屬性的名稱直接映射到DataSet中的字段名稱;
4)DataSet是強類型的。比如可以有DataSet[Car],DataSet[Person]。
5)DataFrame是DataSet的特列,DataFrame=DataSet[Row]?,所以可以通過as方法將DataFrame轉換為DataSet。Row是一個類型,跟Car、Person這些的類型一樣,所有的表結構信息都用Row來表示。
Spark SQL編程
1、Spark Session新的起始點
在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用于Spark自己提供的SQL查詢;一個叫HiveContext,用于連接Hive的查詢。
SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContex和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。當我們使用 spark-shell 的時候, spark 會自動的創建一個叫做spark的SparkSession, 就像我們以前可以自動獲取到一個sc來表示SparkContext
2、DataFrame
Spark SQL的DataFrame API 允許我們使用 DataFrame 而不用必須去注冊臨時表或者生成SQL表達式。DataFrame API 既有transformation操作也有action操作,DataFrame的轉換從本質上來說更具有關系, 而 DataSet API 提供了更加函數式的 API。
2.1 創建DataFrame
在Spark SQL中SparkSession是創建DataFrame和執行SQL的入口,創建DataFrame有三種方式:通過Spark的數據源進行創建;從一個存在的RDD進行轉換;還可以從Hive Table進行查詢返回。
2.2 SQL風格語法
SQL語法風格是指我們查詢數據的時候使用SQL語句來查詢,這種風格的查詢必須要有臨時視圖或者全局視圖來輔助
1)創建一個DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)對DataFrame創建一個臨時表
scala> df.createOrReplaceTempView("people")
3)通過SQL語句實現查詢全表
scala> val sqlDF = spark.sql("SELECT * FROM people")sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
4)結果展示
scala> sqlDF.show+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|+---+--------+
注意:普通臨時表是Session范圍內的,如果想應用范圍內有效,可以使用全局臨時表。使用全局臨時表時需要全路徑訪問,如:global_temp.people
5)對于DataFrame創建一個全局表
scala> df.createGlobalTempView("people")
6)通過SQL語句實現查詢全表
scala> spark.sql("SELECT * FROM global_temp.people").show()+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|+---+--------+scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|+---+--------+
2.3 DSL風格語法
DataFrame提供一個特定領域語言(domain-specific language, DSL)去管理結構化的數據,可以在Scala, Java, Python和R中使用DSL,使用DSL語法風格不必去創建臨時視圖了。
1)創建一個DataFrame
scala> val df = spark.read.json("/opt/module/spark-local /people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)查看DataFrame的Schema信息
scala> df.printSchemaroot |-- age: Long (nullable = true) |-- name: string (nullable = true)
3)只查看”name”列數據
scala> df.select("name").show()+--------+| name|+--------+|qiaofeng|| duanyu|| xuzhu|+--------+
4)查看所有列
scala> df.select("*").show+--------+---------+| name |age|+--------+---------+|qiaofeng| 18|| duanyu| 19|| xuzhu| 20|+--------+---------+
5)查看”name”列數據以及”age+1”數據
注意:涉及到運算的時候, 每列都必須使用$
scala> df.select($"name",$"age" + 1).show+--------+---------+| name|(age + 1)|+--------+---------+|qiaofeng| 19|| duanyu| 20|| xuzhu| 21|+--------+---------+
6)查看”age”大于”19”的數據
scala> df.filter($"age">19).show+---+-----+|age| name|+---+-----+| 20|xuzhu|+---+-----+
7)按照”age”分組,查看數據條數
scala> df.groupBy("age").count.show+---+-----+|age|count|+---+-----+| 19| 1|| 18| 1|| 20| 1|+---+-----+
2.4 RDD轉換為DataFrame
在 IDEA 中開發程序時,如果需要RDD 與DF 或者DS 之間互相操作,那么需要引入import spark.implicits._。
這里的spark不是Scala中的包名,而是創建的sparkSession 對象的變量名稱,所以必須先創建 SparkSession 對象再導入。這里的 spark 對象不能使用var 聲明,因為 Scala 只支持val 修飾的對象的引入。
spark-shell 中無需導入,自動完成此操作。
scala> val idRDD = sc.textFile("data/id.txt") scala> idRDD.toDF("id").show+---+| id|+---+| 1|| 2|| 3|| 4|+---+
實際開發中,一般通過樣例類將RDD轉換為DataFrame。
scala> case class User(name:String, age:Int) defined class Userscala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show+--------+---+| name|age|+--------+---+
2.5 DataFrame轉換為RDD
DataFrame其實就是對RDD的封裝,所以可以直接獲取內部的RDD
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDFdf: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> val rdd = df.rddrdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at :25scala> val array = rdd.collectarray: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
注意:此時得到的RDD存儲類型為Row
scala> array(0)res28: org.apache.spark.sql.Row = [zhangsan,30] scala> array(0)(0)res29: Any = zhangsanscala> array(0).getAs[String]("name") res30: String = zhangsan
3、DataSet
DataSet是具有強類型的數據集合,需要提供對應的類型信息。
3.1 創建DataSet
1)使用樣例類序列創建DataSet
scala> case class Person(name: String, age: Long)defined class Personscala> val caseClassDS = Seq(Person("wangyuyan",2)).toDS()caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]scala> caseClassDS.show+---------+---+| name|age|+---------+---+|wangyuyan| 2|+---------+---+
2)使用基本類型的序列創建DataSet
scala> val ds = Seq(1,2,3,4,5,6).toDSds: org.apache.spark.sql.Dataset[Int] = [value: int]scala> ds.show+-----+|value|+-----+| 1|| 2|| 3|| 4|| 5|| 6|+-----+
注意:在實際使用的時候,很少用到把序列轉換成DataSet,更多是通過RDD來得到DataSet。
3.2 RDD轉換為DataSet
SparkSQL能夠自動將包含有樣例類的RDD轉換成DataSet,樣例類定義了table的結構,樣例類屬性通過反射變成了表的列名。樣例類可以包含諸如Seq或者Array等復雜的結構。
1)創建一個RDD
scala> val peopleRDD = sc.textFile("/opt/module/spark-local/people.txt")peopleRDD: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/people.txt MapPartitionsRDD[19] at textFile at :24
2)創建一個樣例類
scala> case class Person(name:String,age:Int)defined class Person3)將RDD轉化為DataSet scala> peopleRDD.map(line => {val fields = line.split(",");Person(fields(0),fields(1). toInt)}).toDSres0: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
3.3DataSet轉換為RDD
調用rdd方法即可。
1)創建一個DataSet
scala> val DS = Seq(Person("zhangcuishan", 32)).toDS()DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
2)將DataSet轉換為RDD
scala> DS.rddres1: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[6] at rdd at :28
4、DataFrame與DataSet的互操作
4.1 DataFrame轉為DataSet
1)創建一個DateFrame
scala> val df = spark.read.json("/opt/module/spark-local/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)創建一個樣例類
scala> case class Person(name: String,age: Long)defined class Person
3)將DataFrame轉化為DataSet
scala> df.as[Person]res5: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型是DataFrame又需要針對各個字段處理時極為方便。在使用一些特殊的操作時,一定要加上?import spark.implicits._?不然toDF、toDS無法使用。
4.2Dataset轉為DataFrame
1)創建一個樣例類
scala> case class Person(name: String,age: Long)defined class Person
2)創建DataSet
scala> val ds = Seq(Person("zhangwuji",32)).toDS()ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
3)將DataSet轉化為DataFrame
scala> var df = ds.toDFdf: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
4)展示
scala> df.show+---------+---+| name|age|+---------+---+|zhangwuji| 32|+---------+---+
5、IDEA實踐
1)Maven工程添加依賴
org.apache.spark spark-sql_2.11 2.1.1
2)代碼實現
object SparkSQL01_Demo { def main(args: Array[String]): Unit = { //創建上下文環境配置對象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo") //創建SparkSession對象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //RDD=>DataFrame=>DataSet轉換需要引入隱式轉換規則,否則無法轉換 //spark不是包名,是上下文環境對象名 import spark.implicits._ //讀取json文件 創建DataFrame {"username": "lisi","age": 18} val df: DataFrame = spark.read.json("D://dev//workspace//spark-bak//spark-bak-00//input//test.json") //df.show() //SQL風格語法 df.createOrReplaceTempView("user") //spark.sql("select avg(age) from user").show //DSL風格語法 //df.select("username","age").show() //*****RDD=>DataFrame=>DataSet***** //RDD val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"qiaofeng",30),(2,"xuzhu",28),(3,"duanyu",20))) //DataFrame val df1: DataFrame = rdd1.toDF("id","name","age") //df1.show() //DateSet val ds1: Dataset[User] = df1.as[User] //ds1.show() //*****DataSet=>DataFrame=>RDD***** //DataFrame val df2: DataFrame = ds1.toDF() //RDD 返回的RDD類型為Row,里面提供的getXXX方法可以獲取字段值,類似jdbc處理結果集,但是索引從0開始 val rdd2: RDD[Row] = df2.rdd //rdd2.foreach(a=>println(a.getString(1))) //*****RDD=>DataSe***** rdd1.map{ case (id,name,age)=>User(id,name,age) }.toDS() //*****DataSet=>=>RDD***** ds1.rdd //釋放資源 spark.stop() }}case class User(id:Int,name:String,age:Int)
Spark SQL數據的加載與保存
1、通用的加載與保存方式
1)spark.read.load是加載數據的通用方法
2)df.write.save 是保存數據的通用方法
1.1 數據加載
1)read直接加載數據
scala> spark.read.csv format jdbc json load option options orc parquet schema table text textFile
注意:加載數據的相關參數需寫到上述方法中,如:textFile需傳入加載數據的路徑,jdbc需傳入JDBC相關參數。
例如:直接加載Json數據
scala> spark.read.json("/opt/module/spark-local/people.json").show+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|
2)format指定加載數據類型
scala> spark.read.format("…")[.option("…")].load("…")
用法詳解:
(1)format("…"):指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
(2)load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數據的路徑
(3)option("…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
例如:使用format指定加載Json類型數據
scala> spark.read.format("json").load ("/opt/module/spark-local/people.json").show+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|
3)在文件上直接運行SQL
前面的是使用read API先把文件加載到DataFrame然后再查詢,也可以直接在文件上進行查詢。
scala> spark.sql("select * from json.`/opt/module/spark-local/people.json`").show+---+--------+|age| name|+---+--------+| 18|qiaofeng|| 19| duanyu|| 20| xuzhu|+---+--------+|
說明:json表示文件的格式. 后面的文件具體路徑需要用反引號括起來。
1.2 保存數據
1)write直接保存數據
scala> df.write.csv jdbc json orc parquet textFile… …
注意:保存數據的相關參數需寫到上述方法中。如:textFile需傳入加載數據的路徑,jdbc需傳入JDBC相關參數。
例如:直接將df中數據保存到指定目錄
//默認保存格式為parquetscala> df.write.save("/opt/module/spark-local/output")//可以指定為保存格式,直接保存,不需要再調用save了scala> df.write.json("/opt/module/spark-local/output")
2)format指定保存數據類型
scala> df.write.format("…")[.option("…")].save("…")
用法詳解:
(1)format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數據的路徑。
(3)option("…"):在"jdbc"格式下需要傳入JDBC相應參數,url、user、password和dbtable
3)文件保存選項
保存操作可以使用 SaveMode, 用來指明如何處理數據,使用mode()方法來設置。有一點很重要: 這些 SaveMode 都是沒有加鎖的, 也不是原子操作。
SaveMode是一個枚舉類,其中的常量包括:
例如:使用指定format指定保存類型進行保存
df.write.mode("append").json("/opt/module/spark-local/output")
1.3 默認數據源
Spark SQL的默認數據源為Parquet格式。數據源為Parquet文件時,Spark SQL可以方便的執行所有的操作,不需要使用format。修改配置項spark.sql.sources.default,可修改默認數據源格式。
1)加載數據
val df = spark.read.load("/opt/module/spark-local/examples/src/main/resources/users.parquet").show+------+--------------+----------------+| name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+df: Unit = ()
2)保存數據
scala> var df = spark.read.json("/opt/module/spark-local/people.json")//保存為parquet格式scala> df.write.mode("append").save("/opt/module/spark-local/output")
2、JSON文件
Spark SQL能夠自動推測JSON數據集的結構,并將它加載為一個Dataset[Row]。可以通過SparkSession.read.json()去加載一個一個JSON文件。
注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。格式如下:
{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}
1)導入隱式轉換
import spark.implicits._
2)加載JSON文件
val path = "/opt/module/spark-local/people.json"val peopleDF = spark.read.json(path)
3)創建臨時表
peopleDF.createOrReplaceTempView("people")
4)數據查詢
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")teenagerNamesDF.show()+------+| name|+------+|Justin|+------+
3、MySQL
Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。
如果使用spark-shell操作,可在啟動shell時指定相關的數據庫驅動路徑或者將相關的數據庫驅動放到spark的類路徑下。
bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar
這里演示在Idea中通過JDBC對Mysql進行操作
3.1 導入依賴
mysql mysql-connector-java 5.1.27
3.2 從JDBC讀數據
object SparkSQL02_Datasource { def main(args: Array[String]): Unit = { //創建上下文環境配置對象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo") //創建SparkSession對象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ //方式1:通用的load方法讀取 spark.read.format("jdbc") .option("url", "jdbc:mysql://hadoop202:3306/test") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", "user") .load().show //方式2:通用的load方法讀取 參數另一種形式 spark.read.format("jdbc") .options(Map("url"->"jdbc:mysql://hadoop202:3306/test?user=root&password=123456", "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show //方式3:使用jdbc方法讀取 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123456") val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop202:3306/test", "user", props) df.show //釋放資源 spark.stop() }}
3.3 向JDBC寫數據
object SparkSQL03_Datasource { def main(args: Array[String]): Unit = { //創建上下文環境配置對象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo") //創建SparkSession對象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30))) val ds: Dataset[User2] = rdd.toDS //方式1:通用的方式 format指定寫出類型 ds.write .format("jdbc") .option("url", "jdbc:mysql://hadoop202:3306/test") .option("user", "root") .option("password", "123456") .option("dbtable", "user") .mode(SaveMode.Append) .save() //方式2:通過jdbc方法 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123456") ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop202:3306/test", "user", props) //釋放資源 spark.stop() }}case class User2(name: String, age: Long)
4、Hive
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL編譯時可以包含 Hive 支持,也可以不包含。
包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF (用戶自定義函數)以及 Hive 查詢語言(HiveQL/HQL)等。需要強調的一點是,如果要在 Spark SQL 中包含Hive 的庫,并不需要事先安裝 Hive。一般來說,最好還是在編譯Spark SQL時引入Hive支持,這樣就可以使用這些特性了。如果你下載的是二進制版本的 Spark,它應該已經在編譯時添加了 Hive 支持。
若要把 Spark SQL 連接到一個部署好的 Hive 上,你必須把 hive-site.xml 復制到 Spark的配置文件目錄中($SPARK_HOME/conf)。即使沒有部署好 Hive,Spark SQL 也可以運行,需要注意的是,如果你沒有部署好Hive,Spark SQL 會在當前的工作目錄中創建出自己的 Hive 元數據倉庫,叫作 metastore_db。此外,對于使用部署好的Hive,如果你嘗試使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)語句來創建表,這些表會被放在你默認的文件系統中的 /user/hive/warehouse 目錄中(如果你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統就是 HDFS,否則就是本地文件系統)。
spark-shell默認是Hive支持的;代碼中是默認不支持的,需要手動指定(加一個參數即可)。
4.1 使用內嵌Hive
如果使用 Spark 內嵌的 Hive, 則什么都不用做, 直接使用即可。
Hive 的元數據存儲在 derby 中, 倉庫地址:$SPARK_HOME/spark-warehouse。
scala> spark.sql("show tables").show+--------+---------+-----------+|database|tableName|isTemporary|+--------+---------+-----------++--------+---------+-----------+scala> spark.sql("create table aa(id int)")19/02/09 18:36:10 WARN HiveMetaStore: Location: file:/opt/module/spark-local/spark-warehouse/aa specified for non-external table:aares2: org.apache.spark.sql.DataFrame = []scala> spark.sql("show tables").show+--------+---------+-----------+|database|tableName|isTemporary|+--------+---------+-----------+| default| aa| false|+--------+---------+-----------+
向表中加載本地數據數據
scala> spark.sql("load data local inpath ./ids.txt into table aa")res8: org.apache.spark.sql.DataFrame = []scala> spark.sql("select * from aa").show+---+| id|+---+|100||101||102||103||104||105||106|+---+
在實際使用中, 幾乎沒有任何人會使用內置的 Hive。
4.2 外部Hive應用
如果Spark要接管Hive外部已經部署好的Hive,需要通過以下幾個步驟。
(1)確定原有Hive是正常工作的
(2)需要把hive-site.xml拷貝到spark的conf/目錄下
(3)如果以前hive-site.xml文件中,配置過Tez相關信息,注釋掉
(4)把Mysql的驅動copy到Spark的jars/目錄下
(5)需要提前啟動hive服務,hive/bin/hiveservices.sh start
(6)如果訪問不到hdfs,則需把core-site.xml和hdfs-site.xml拷貝到conf/目錄
啟動 spark-shell
scala> spark.sql("show tables").show+--------+---------+-----------+|database|tableName|isTemporary|+--------+---------+-----------+| default| emp| false|+--------+---------+-----------+scala> spark.sql("select * from emp").show19/02/09 19:40:28 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.+-----+-------+---------+----+----------+------+------+------+|empno| ename| job| mgr| hiredate| sal| comm|deptno|+-----+-------+---------+----+----------+------+------+------+| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|| 7654| MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|| 7844| TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|| 7934| MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|| 7944|zhiling| CLERK|7782| 1982-1-23|1300.0| null| 50|+-----+-------+---------+----+----------+------+------+------+
4.3 運行Spark SQL CLI
Spark SQLCLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下命令啟動Spark SQ LCLI,直接執行SQL語句,類似Hive窗口。
bin/spark-sql
4.4 代碼中操作Hive
1)添加依賴
org.apache.spark spark-hive_2.11 2.1.1 org.apache.hive hive-exec 1.2.1
2)拷貝hive-site.xml到resources目錄
3)代碼實現
object SparkSQL08_Hive{ def main(args: Array[String]): Unit = { //創建上下文環境配置對象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo") val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .master("local[*]") .appName("SQLTest") .getOrCreate() spark.sql("show tables").show() //釋放資源 spark.stop() }}
Spark SQL實戰
1、數據準備
Spark-sql操作所有的數據均來自Hive,首先在Hive中創建表,并導入數據。一共有3張表:1張用戶行為表,1張城市表,1張產品表。
CREATE TABLE `user_visit_action`( `date` string, `user_id` bigint, `session_id` string, `page_id` bigint, `action_time` string, `search_keyword` string, `click_category_id` bigint, `click_product_id` bigint, `order_category_ids` string, `order_product_ids` string, `pay_category_ids` string, `pay_product_ids` string, `city_id` bigint)row format delimited fields terminated by /t;load data local inpath /opt/module/data/user_visit_action.txt into table sparkpractice.user_visit_action;CREATE TABLE `product_info`( `product_id` bigint, `product_name` string, `extend_info` string)row format delimited fields terminated by /t;load data local inpath /opt/module/data/product_info.txt into table sparkpractice.product_info;CREATE TABLE `city_info`( `city_id` bigint, `city_name` string, `area` string)row format delimited fields terminated by /t;load data local inpath /opt/module/data/city_info.txt into table sparkpractice.city_info;
2、需求
2.1 需求簡介
這里的熱門商品是從點擊量的維度來看的,計算各個區域前三大熱門商品,并備注上每個商品在主要城市中的分布比例,超過兩個城市用其他顯示。
例如:
2.2 思路分析
1)使用sql來完成,碰到復雜的需求,可以使用udf或udaf
2)查詢出來所有的點擊記錄,并與city_info表連接,得到每個城市所在的地區,與Product_info表連接得到產品名稱
3)按照地區和商品名稱分組,統計出每個商品在每個地區的總點擊次數
4)每個地區內按照點擊次數降序排列
5)只取前三名,并把結果保存在數據庫中
6)城市備注需要自定義UDAF函數
2.3 代碼實現
1)UDAF函數定義
class AreaClickUDAF extends UserDefinedAggregateFunction { // 輸入數據的類型: 北京 String override def inputSchema: StructType = { StructType(StructField("city_name", StringType) :: Nil) // StructType(Array(StructField("city_name", StringType))) } // 緩存的數據的類型: 北京->1000, 天津->5000 Map, 總的點擊量 1000/? override def bufferSchema: StructType = { // MapType(StringType, LongType) 還需要標注 map的key的類型和value的類型 StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil) } // 輸出的數據類型 "北京21.2%,天津13.2%,其他65.6%" String override def dataType: DataType = StringType // 相同的輸入是否應用有相同的輸出. override def deterministic: Boolean = true // 給存儲數據初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { //初始化map緩存 buffer(0) = Map[String, Long]() // 初始化總的點擊量 buffer(1) = 0L } // 分區內合并 Map[城市名, 點擊量] override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { // 首先拿到城市名, 然后把城市名作為key去查看map中是否存在, 如果存在就把對應的值 +1, 如果不存在, 則直接0+1 val cityName = input.getString(0) // val map: collection.Map[String, Long] = buffer.getMap[String, Long](0) val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0) buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L)) // 碰到一個城市, 則總的點擊量要+1 buffer(1) = buffer.getLong(1) + 1L } // 分區間的合并 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val map1 = buffer1.getAs[Map[String, Long]](0) val map2 = buffer2.getAs[Map[String, Long]](0) // 把map1的鍵值對與map2中的累積, 最后賦值給buffer1 buffer1(0) = map1.foldLeft(map2) { case (map, (k, v)) => map + (k -> (map.getOrElse(k, 0L) + v)) } buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 最終的輸出. "北京21.2%,天津13.2%,其他65.6%" override def evaluate(buffer: Row): Any = { val cityCountMap = buffer.getAs[Map[String, Long]](0) val totalCount = buffer.getLong(1) var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map { case (cityName, count) => { CityRemark(cityName, count.toDouble / totalCount) } } // 如果城市的個數超過2才顯示其他 if (cityCountMap.size > 2) { citysRatio = citysRatio :+ CityRemark("其他", citysRatio.foldLeft(1D)(_ - _.cityRatio)) } citysRatio.mkString(", ") }}case class CityRemark(cityName: String, cityRatio: Double) { val formatter = new DecimalFormat("0.00%") override def toString: String = s"$cityName:${formatter.format(cityRatio)}"}
2)具體實現
object SparkSQL04_TopN { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[2]") .appName("AreaClickApp") .enableHiveSupport() .getOrCreate() spark.sql("use sparkpractice") // 0 注冊自定義聚合函數 spark.udf.register("city_remark", new AreaClickUDAF) // 1. 查詢出所有的點擊記錄,并和城市表產品表做內連接 spark.sql( """ |select | c.*, | v.click_product_id, | p.product_name |from user_visit_action v join city_info c join product_info p on v.city_id=c.city_id and v.click_product_id=p.product_id |where click_product_id>-1 """.stripMargin).createOrReplaceTempView("t1") // 2. 計算每個區域, 每個產品的點擊量 spark.sql( """ |select | t1.area, | t1.product_name, | count(*) click_count, | city_remark(t1.city_name) |from t1 |group by t1.area, t1.product_name """.stripMargin).createOrReplaceTempView("t2") // 3. 對每個區域內產品的點擊量進行倒序排列 spark.sql( """ |select | *, | rank() over(partition by t2.area order by t2.click_count desc) rank |from t2 """.stripMargin).createOrReplaceTempView("t3") // 4. 每個區域取top3 spark.sql( """ |select | * |from t3 |where rank<=3 """.stripMargin).show //釋放資源 spark.stop() }}
猜你喜歡
Hive計算最大連續登陸天數
Hadoop 數據遷移用法詳解
Hbase修復工具Hbck
數倉建模分層理論
一文搞懂Hive的數據存儲與壓縮
大數據組件重點學習這幾個