Spark SQL 使用反射推斷模式

2018-12-20 15:03 更新

此方法使用反射來生成包含特定類型的對象的RDD的模式。 Spark SQL的Scala接口支持將包含案例類的RDD自動轉(zhuǎn)換為DataFrame。 case類定義了表的模式。 case類的參數(shù)的名稱使用反射讀取,它們成為列的名稱。

案例類也可以嵌套或包含復(fù)雜類型,如序列或數(shù)組。 此RDD可以隱式地轉(zhuǎn)換為DataFrame,然后注冊為表。 表可以在后續(xù)的SQL語句中使用。

讓我們考慮一個名為employeeee.txt的文本文件中的員工記錄示例。 通過從文本文件讀取數(shù)據(jù)并使用默認SQL函數(shù)將其轉(zhuǎn)換為DataFrame來創(chuàng)建RDD。

給定數(shù)據(jù) - 查看在運行spark shell點的當(dāng)前相應(yīng)目錄中名為employees.txt的文件的以下數(shù)據(jù)。

1201, satish, 251202, krishna, 281203, amith, 391204, javed, 231205, prudvi, 23
以下示例說明如何使用“反射”生成模式。

啟動Spark Shell

使用以下命令啟動Spark Shell。
$ spark-shell

創(chuàng)建SQLContext

使用以下命令生成SQLContext。 這里,scmeans是SparkContext對象。

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

導(dǎo)入SQL函數(shù)

使用以下命令導(dǎo)入用于將RDD隱式轉(zhuǎn)換為DataFrame的所有SQL函數(shù)。

scala> import sqlContext.implicts._

創(chuàng)建Case Class

接下來,我們必須使用案例類定義員工記錄數(shù)據(jù)的模式。 以下命令用于根據(jù)給定數(shù)據(jù)(id,name,age)聲明Case Class。

scala> case class Employee(id: Int, name: String, age: Int)defined class Employee

創(chuàng)建RDD和應(yīng)用轉(zhuǎn)換

使用以下命令生成RDD namedemplby,從中讀取數(shù)據(jù)fromemployee.txt并使用Map函數(shù)將其轉(zhuǎn)換為DataFrame。
這里,定義了兩個映射函數(shù)。 一個是將文本記錄分割成字段(.map(_。split(“,”)))和第二個映射函數(shù)用于將單個字段(id,name,age)轉(zhuǎn)換為一個case類對象 0).trim.toInt,e(1),e(2).trim.toInt))。
最后,toDF()方法用于將具有模式的案例類對象轉(zhuǎn)換為DataFrame。

scala>val empl=sc.textFile("employee.txt").map(_.split(",")).map(e?employee(e(0).trim.toInt,e(1),e(2).trim.toInt)).toDF()
輸出:
empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]

將DataFrame數(shù)據(jù)存儲在表中

使用以下命令將DataFrame數(shù)據(jù)存儲到名為employeeee的表中。 在這個命令之后,我們可以應(yīng)用所有類型的SQL語句。

scala> empl.registerTempTable("employee")
員工表已準(zhǔn)備就緒。 讓我們現(xiàn)在使用SQLContext.sql()方法在表上傳遞一些sql查詢。
選擇DataFrame上的查詢
使用以下命令從theemployableable中選擇所有記錄。 這里,我們使用變量allrecords來捕獲所有記錄數(shù)據(jù)。 要顯示這些記錄,請調(diào)用show()方法。

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
要查看allrecords DataFrame的結(jié)果數(shù)據(jù),請使用以下命令。

scala> allrecords.show()
輸出

+------+---------+----+|  id  |  name   |age |+------+---------+----+| 1201 | satish  | 25 || 1202 | krishna | 28 || 1203 | amith   | 39 || 1204 | javed   | 23 || 1205 | prudvi  | 23 |+------+---------+----+

子句SQL查詢數(shù)據(jù)幀

使用以下命令在表中應(yīng)用where語句。 這里,變量agefilter存儲年齡在20和35之間的雇員的記錄。

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")
要查看agefilter數(shù)據(jù)幀的結(jié)果數(shù)據(jù),請使用以下命令。

scala> agefilter.show()
輸出:

<console>:25, took 0.112757 s+------+---------+----+|  id  |  name   |age |+------+---------+----+| 1201 | satish  | 25 || 1202 | krishna | 28 || 1204 | javed   | 23 || 1205 | prudvi  | 23 |+------+---------+----+

前兩個查詢是針對整個表DataFrame傳遞的。 現(xiàn)在讓我們嘗試通過對其應(yīng)用Transform來從結(jié)果DataFrame獲取數(shù)據(jù)。
使用列索引從agefilter 數(shù)據(jù)幀獲取ID值
以下語句用于從agefilter RDD結(jié)果中獲取ID值,使用字段索引。

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

輸出

<console>:25, took 0.093844 sID: 1201ID: 1202ID: 1204ID: 1205

這種基于反射的方法可以獲得更加簡潔的代碼,并且在編寫Spark應(yīng)用程序時更好地了解其概要。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號