You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

## 2.1 Spark SQL入门
`Spark SQL`是`Spark`用来处理结构化数据的一个模块。`Spark SQL`为了支持结构化数据的处理,它提供了两个编程抽象分别叫做`DataFrames`和`DataSets`。
### 2.1.1 DataFramesDatasets和RDD的关系
`RDD` :仅表示数据集,`RDD`没有元数据,也就是说没有字段信息。
`DataFrames`:由于`RDD`的局限性,`Spark`产生了`DataFrames``DataFrame=RDD+Schema``Schema`也就是字段信息。`DataFrames`是一种特殊类型的 `Datasets``DataSet[Row] = DataFrame`。
`Datasets`:可以理解为强类型的`DataFrames`,也就是说每一个`record`存储的是一个强类型值而不是一个`Row`。但是`Python`不支持`Datasets API`。
了解完以上关系后,我们开始编写`Spark SQL`,从何开始呢?答案就是`SparkSession`。
### 2.1.2 什么是SparkSession
`SparkSession`是`Spark SQL`的入口。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
```
有了`SparkSession`,下一步就是创建`DataFrames`。
### 2.1.3 创建DataFrames
使用`SparkSession`可以从现有`RDD``Hive`表或`Spark`数据源(`json``parquet``jdbc``orc``libsvm``csv``text`)等格式文件创建`DataFrame`。
以下示例为读取`Json`文件创建`DataFrame`。
`df =spark.read.json("/people.json")`
`people.json`数据如下:
```json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```
有了`DataFrames`之后,我们就可以对数据进行相应操作.
### 2.1.4使用DataFrames
在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。
举例如下:
```python
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
我们也可以通过编写`SQL`语句的方式执行上述操作。
### 2.1.5 通过SQL语句的方式
```python
#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
### 2.1.6将处理结果保存
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
```
#写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
```
### 2.1.7 保存模式介绍
`save`方法支持设置保存模式。
类型如下:
| 所有语言 | 说明 |
| ------------ | ------------ |
| `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 |
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在则DataFrame的内容应附加到现有数据。 |
| `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 |
| `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
例:覆盖原有数据并保存到`F:\\test`路径下
```
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")
```