|
|
|
|
|
|
|
|
## 2.1 Spark SQL入门
|
|
|
`Spark SQL`是`Spark`用来处理结构化数据的一个模块。`Spark SQL`为了支持结构化数据的处理,它提供了两个编程抽象分别叫做`DataFrames`和`DataSets`。
|
|
|
|
|
|
|
|
|
### 2.1.1 DataFrames,Datasets和RDD的关系
|
|
|
|
|
|
`RDD` :仅表示数据集,`RDD`没有元数据,也就是说没有字段信息。
|
|
|
|
|
|
`DataFrames`:由于`RDD`的局限性,`Spark`产生了`DataFrames`,`DataFrame=RDD+Schema`,`Schema`也就是字段信息。`DataFrames`是一种特殊类型的 `Datasets`,`DataSet[Row] = DataFrame`。
|
|
|
|
|
|
`Datasets`:可以理解为强类型的`DataFrames`,也就是说每一个`record`存储的是一个强类型值而不是一个`Row`。但是`Python`不支持`Datasets API`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
了解完以上关系后,我们开始编写`Spark SQL`,从何开始呢?答案就是`SparkSession`。
|
|
|
### 2.1.2 什么是SparkSession
|
|
|
|
|
|
`SparkSession`是`Spark SQL`的入口。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
|
|
|
|
|
|
```
|
|
|
from pyspark.sql import SparkSession
|
|
|
|
|
|
spark = SparkSession \
|
|
|
.builder \
|
|
|
.appName("Python Spark SQL basic example") \
|
|
|
.config("spark.some.config.option", "some-value") \
|
|
|
.getOrCreate()
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
有了`SparkSession`,下一步就是创建`DataFrames`。
|
|
|
|
|
|
### 2.1.3 创建DataFrames
|
|
|
使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建`DataFrame`。
|
|
|
|
|
|
以下示例为读取`Json`文件创建`DataFrame`。
|
|
|
`df =spark.read.json("/people.json")`
|
|
|
|
|
|
`people.json`数据如下:
|
|
|
|
|
|
```json
|
|
|
{"name":"Michael"}
|
|
|
{"name":"Andy", "age":30}
|
|
|
{"name":"Justin", "age":19}
|
|
|
```
|
|
|
|
|
|
|
|
|
有了`DataFrames`之后,我们就可以对数据进行相应操作.
|
|
|
|
|
|
### 2.1.4使用DataFrames
|
|
|
在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。
|
|
|
举例如下:
|
|
|
|
|
|
```python
|
|
|
#打印Schema信息
|
|
|
df.printSchema()
|
|
|
|
|
|
root
|
|
|
|-- age: long (nullable = true)
|
|
|
|-- name: string (nullable = true)
|
|
|
|
|
|
#选择姓名列
|
|
|
df.select("name").show()
|
|
|
+-------+
|
|
|
| name|
|
|
|
+-------+
|
|
|
|Michael|
|
|
|
| Andy|
|
|
|
| Justin|
|
|
|
+-------+
|
|
|
```
|
|
|
|
|
|
我们也可以通过编写`SQL`语句的方式执行上述操作。
|
|
|
|
|
|
### 2.1.5 通过SQL语句的方式
|
|
|
```python
|
|
|
#首先注册df为一个视图
|
|
|
df.createOrReplaceTempView("p")
|
|
|
#通过spark.sql("SQL语句")执行SQL语句
|
|
|
sqlDF = spark.sql("SELECT name FROM p")
|
|
|
sqlDF.show()
|
|
|
+-------+
|
|
|
| name|
|
|
|
+-------+
|
|
|
|Michael|
|
|
|
| Andy|
|
|
|
| Justin|
|
|
|
+-------+
|
|
|
```
|
|
|
### 2.1.6将处理结果保存
|
|
|
|
|
|
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
|
|
|
|
|
|
```
|
|
|
#写入并保存到指定路径
|
|
|
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
|
|
|
|
|
|
```
|
|
|
### 2.1.7 保存模式介绍
|
|
|
|
|
|
`save`方法支持设置保存模式。
|
|
|
|
|
|
类型如下:
|
|
|
|
|
|
| 所有语言 | 说明 |
|
|
|
| ------------ | ------------ |
|
|
|
| `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 |
|
|
|
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。 |
|
|
|
| `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 |
|
|
|
| `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
|
|
|
|
|
|
例:覆盖原有数据并保存到`F:\\test`路径下
|
|
|
|
|
|
```
|
|
|
|
|
|
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")
|
|
|
``` |