|
|
|
@ -2,12 +2,12 @@
|
|
|
|
|
|
|
|
|
|
## 2.1 什么是Spark SQL
|
|
|
|
|
|
|
|
|
|
SparkSQL是spark用来处理结构化的一个模块,它提供一个抽象的数据集DataFrame,并且是作为分布式SQL查询引擎的应用。
|
|
|
|
|
我们之前已经学习了hive,它将HiveSQL转换成MR,然后提交到集群上去执行,减少编写MR查询的复杂性,但是因为采用计算框架,所以执行效率比较慢,所以spark SQL就应运而生。
|
|
|
|
|
`Spark SQL`是`Spark`用来处理结构化的一个模块,它提供一个抽象的数据集`DataFrame`,并且是作为分布式`SQL`查询引擎的应用。
|
|
|
|
|
或许你之前学习过`Hive`,我们知道它将`HiveSQL`转换成`MR`,然后提交到集群上去执行,减少编写`MR`查询的复杂性,但是因为采用计算框架,执行效率比较慢,所以`Spark SQL`就应运而生。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
我们编写Spark SQL代码时从何开始呢?答案就是SparkSession。
|
|
|
|
|
我们编写`Spark SQL`代码时从何开始呢?答案就是`SparkSession`。
|
|
|
|
|
### 2.2 什么是SparkSession
|
|
|
|
|
|
|
|
|
|
`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
|
|
|
|
@ -24,14 +24,15 @@ spark = SparkSession \
|
|
|
|
|
print(spark.version);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
有了SparkSession,下一步就是创建DataFrames。
|
|
|
|
|
有了`SparkSession`,下一步就是创建`DataFrames`。
|
|
|
|
|
|
|
|
|
|
### 2.3 创建DataFrames
|
|
|
|
|
使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建DataFrame。
|
|
|
|
|
使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建`DataFrame`。
|
|
|
|
|
|
|
|
|
|
以下示例为读取Json文件创建DataFrame。
|
|
|
|
|
`df =spark.read.json("examples/src/main/resources/people.json")`
|
|
|
|
|
people.json数据如下:
|
|
|
|
|
以下示例为读取`Json`文件创建`DataFrame`。
|
|
|
|
|
`df =spark.read.json("/people.json")`
|
|
|
|
|
|
|
|
|
|
`people.json`数据如下:
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
{"name":"Michael"}
|
|
|
|
@ -40,9 +41,10 @@ people.json数据如下:
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
有了DataFrames之后,我们就可以对数据进行相应操作;
|
|
|
|
|
有了`DataFrames`之后,我们就可以对数据进行相应操作.
|
|
|
|
|
|
|
|
|
|
### 2.4使用DataFrames
|
|
|
|
|
在Python中,可以通过属性(df.age)或通过索引(df ['age'])(推荐使用)访问DataFrame的列。
|
|
|
|
|
在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。
|
|
|
|
|
举例如下:
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
@ -63,7 +65,8 @@ df.select("name").show()
|
|
|
|
|
| Justin|
|
|
|
|
|
+-------+
|
|
|
|
|
```
|
|
|
|
|
我们也可以通过编写SQL语句的方式执行上述操作。
|
|
|
|
|
|
|
|
|
|
我们也可以通过编写`SQL`语句的方式执行上述操作。
|
|
|
|
|
|
|
|
|
|
### 2.5 通过SQL语句的方式
|
|
|
|
|
```python
|
|
|
|
@ -85,24 +88,24 @@ sqlDF.show()
|
|
|
|
|
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
//写入并保存到指定路径
|
|
|
|
|
#写入并保存到指定路径
|
|
|
|
|
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
### 2.7 保存模式介绍
|
|
|
|
|
|
|
|
|
|
`save()`方法支持设置保存模式。
|
|
|
|
|
`save`方法支持设置保存模式。
|
|
|
|
|
|
|
|
|
|
`SaveMode` 类型如下:
|
|
|
|
|
类型如下:
|
|
|
|
|
|
|
|
|
|
| 所有语言 | 含义 |
|
|
|
|
|
| 所有语言 | 说明 |
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| `"error" or "errorifexists"` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 |
|
|
|
|
|
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 |
|
|
|
|
|
| `"overwrite"` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 |
|
|
|
|
|
| `"ignore"` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
|
|
|
|
|
| `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 |
|
|
|
|
|
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。 |
|
|
|
|
|
| `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 |
|
|
|
|
|
| `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
|
|
|
|
|
#覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
|
|
|
|
|
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
|
|
|
|
|
```
|