此方法使用反射來生成包含特定類型的對象的RDD的模式。 Spark SQL的Scala接口支持將包含案例類的RDD自動轉(zhuǎn)換為DataFrame。 case類定義了表的模式。 case類的參數(shù)的名稱使用反射讀取,它們成為列的名稱。
案例類也可以嵌套或包含復(fù)雜類型,如序列或數(shù)組。 此RDD可以隱式地轉(zhuǎn)換為DataFrame,然后注冊為表。 表可以在后續(xù)的SQL語句中使用。
例
讓我們考慮一個名為employeeee.txt的文本文件中的員工記錄示例。 通過從文本文件讀取數(shù)據(jù)并使用默認SQL函數(shù)將其轉(zhuǎn)換為DataFrame來創(chuàng)建RDD。
給定數(shù)據(jù) - 查看在運行spark shell點的當(dāng)前相應(yīng)目錄中名為employees.txt的文件的以下數(shù)據(jù)。
1201, satish, 251202, krishna, 281203, amith, 391204, javed, 231205, prudvi, 23以下示例說明如何使用“反射”生成模式。
$ spark-shell
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> import sqlContext.implicts._
接下來,我們必須使用案例類定義員工記錄數(shù)據(jù)的模式。 以下命令用于根據(jù)給定數(shù)據(jù)(id,name,age)聲明Case Class。
scala> case class Employee(id: Int, name: String, age: Int)defined class Employee
scala>val empl=sc.textFile("employee.txt").map(_.split(",")).map(e?employee(e(0).trim.toInt,e(1),e(2).trim.toInt)).toDF()
empl: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]
使用以下命令將DataFrame數(shù)據(jù)存儲到名為employeeee的表中。 在這個命令之后,我們可以應(yīng)用所有類型的SQL語句。
scala> empl.registerTempTable("employee")員工表已準(zhǔn)備就緒。 讓我們現(xiàn)在使用SQLContext.sql()方法在表上傳遞一些sql查詢。
scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")要查看allrecords DataFrame的結(jié)果數(shù)據(jù),請使用以下命令。
scala> allrecords.show()輸出
+------+---------+----+| id | name |age |+------+---------+----+| 1201 | satish | 25 || 1202 | krishna | 28 || 1203 | amith | 39 || 1204 | javed | 23 || 1205 | prudvi | 23 |+------+---------+----+
scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE ageC>=20 AND age <= 35")要查看agefilter數(shù)據(jù)幀的結(jié)果數(shù)據(jù),請使用以下命令。
scala> agefilter.show()輸出:
<console>:25, took 0.112757 s+------+---------+----+| id | name |age |+------+---------+----+| 1201 | satish | 25 || 1202 | krishna | 28 || 1204 | javed | 23 || 1205 | prudvi | 23 |+------+---------+----+
前兩個查詢是針對整個表DataFrame傳遞的。 現(xiàn)在讓我們嘗試通過對其應(yīng)用Transform來從結(jié)果DataFrame獲取數(shù)據(jù)。
使用列索引從agefilter 數(shù)據(jù)幀獲取ID值
以下語句用于從agefilter RDD結(jié)果中獲取ID值,使用字段索引。
scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)
<console>:25, took 0.093844 sID: 1201ID: 1202ID: 1204ID: 1205
這種基于反射的方法可以獲得更加簡潔的代碼,并且在編寫Spark應(yīng)用程序時更好地了解其概要。
更多建議: