## 2.1 Spark SQL入门 `Spark SQL`是`Spark`用来处理结构化数据的一个模块。`Spark SQL`为了支持结构化数据的处理,它提供了两个编程抽象分别叫做`DataFrames`和`DataSets`。 ### 2.1.1 DataFrames,Datasets和RDD的关系 `RDD` :仅表示数据集,`RDD`没有元数据,也就是说没有字段信息。 `DataFrames`:由于`RDD`的局限性,`Spark`产生了`DataFrames`,`DataFrame=RDD+Schema`,`Schema`也就是字段信息。`DataFrames`是一种特殊类型的 `Datasets`,`DataSet[Row] = DataFrame`。 `Datasets`:可以理解为强类型的`DataFrames`,也就是说每一个`record`存储的是一个强类型值而不是一个`Row`。但是`Python`不支持`Datasets API`。 了解完以上关系后,我们开始编写`Spark SQL`,从何开始呢?答案就是`SparkSession`。 ### 2.1.2 什么是SparkSession `SparkSession`是`Spark SQL`的入口。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。 ``` from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() ``` 有了`SparkSession`,下一步就是创建`DataFrames`。 ### 2.1.3 创建DataFrames 使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建`DataFrame`。 以下示例为读取`Json`文件创建`DataFrame`。 `df =spark.read.json("/people.json")` `people.json`数据如下: ```json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} ``` 有了`DataFrames`之后,我们就可以对数据进行相应操作. ### 2.1.4使用DataFrames 在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。 举例如下: ```python #打印Schema信息 df.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) #选择姓名列 df.select("name").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ ``` 我们也可以通过编写`SQL`语句的方式执行上述操作。 ### 2.1.5 通过SQL语句的方式 ```python #首先注册df为一个视图 df.createOrReplaceTempView("p") #通过spark.sql("SQL语句")执行SQL语句 sqlDF = spark.sql("SELECT name FROM p") sqlDF.show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ ``` ### 2.1.6将处理结果保存 以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。 ``` #写入并保存到指定路径 df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges") ``` ### 2.1.7 保存模式介绍 `save`方法支持设置保存模式。 类型如下: | 所有语言 | 说明 | | ------------ | ------------ | | `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 | | ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。 | | `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 | | `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 | 例:覆盖原有数据并保存到`F:\\test`路径下 ``` df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test") ```