|
|
|
@ -0,0 +1,116 @@
|
|
|
|
|
## 2.1 使用Spark SQL统计各个研发单位研制战斗机占比
|
|
|
|
|
|
|
|
|
|
### 2.1.1 什么是Spark SQL
|
|
|
|
|
|
|
|
|
|
`Spark SQL`是用来操作结构化和半结构化数据的接口。
|
|
|
|
|
当每条存储记录共用已知的字段集合,数据符合此条件时,`Spark SQL`就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,`Spark SQL`提供了以下三大功能:
|
|
|
|
|
(1) `Spark SQL`可以从各种结构化数据源(例如`JSON`、`Parquet`等)中读取数据。
|
|
|
|
|
|
|
|
|
|
(2) `Spark SQL`不仅支持在`Spark```程序内使用`SQL`语句进行数据查询,也支持从类似商业智能软件`Tableau`这样的外部工具中通过标准数据库连接器(`JDBC/ODBC`)连接`sparkSQL`进行查询。
|
|
|
|
|
|
|
|
|
|
(3) 当在`Spark`程序内使用`Spark SQL`时,`Spark SQL`支持`SQL`与常规的`Python/Java/Scala`代码高度整合,包括连接`RDD`与`SQL`表、公开的自定义`SQL`函数接口等。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
我们编写Spark SQL代码时从何开始呢?答案就是SparkSession。
|
|
|
|
|
### 2.1.2 什么是SparkSession
|
|
|
|
|
|
|
|
|
|
`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
from pyspark.sql import SparkSession
|
|
|
|
|
|
|
|
|
|
spark = SparkSession \
|
|
|
|
|
.builder \
|
|
|
|
|
.appName("Python Spark SQL basic example") \
|
|
|
|
|
.config("spark.some.config.option", "some-value") \
|
|
|
|
|
.getOrCreate()
|
|
|
|
|
//打印spark版本号
|
|
|
|
|
print(spark.version);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
有了SparkSession,下一步就是创建DataFrames。
|
|
|
|
|
|
|
|
|
|
### 2.1.3 创建DataFrames
|
|
|
|
|
使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建DataFrame。
|
|
|
|
|
|
|
|
|
|
以下示例为读取Json文件创建DataFrame。
|
|
|
|
|
`df =spark.read.json("examples/src/main/resources/people.json")`
|
|
|
|
|
people.json数据如下:
|
|
|
|
|
|
|
|
|
|
```json
|
|
|
|
|
{"name":"Michael"}
|
|
|
|
|
{"name":"Andy", "age":30}
|
|
|
|
|
{"name":"Justin", "age":19}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
有了DataFrames之后,我们就可以对数据进行相应操作;
|
|
|
|
|
### 2.1.4使用DataFrames
|
|
|
|
|
在Python中,可以通过属性(df.age)或通过索引(df ['age'])(推荐使用)访问DataFrame的列。
|
|
|
|
|
举例如下:
|
|
|
|
|
|
|
|
|
|
```python
|
|
|
|
|
#打印Schema信息
|
|
|
|
|
df.printSchema()
|
|
|
|
|
|
|
|
|
|
root
|
|
|
|
|
|-- age: long (nullable = true)
|
|
|
|
|
|-- name: string (nullable = true)
|
|
|
|
|
|
|
|
|
|
#选择姓名列
|
|
|
|
|
df.select("name").show()
|
|
|
|
|
+-------+
|
|
|
|
|
| name|
|
|
|
|
|
+-------+
|
|
|
|
|
|Michael|
|
|
|
|
|
| Andy|
|
|
|
|
|
| Justin|
|
|
|
|
|
+-------+
|
|
|
|
|
```
|
|
|
|
|
我们也可以通过编写SQL语句的方式执行上述操作。
|
|
|
|
|
|
|
|
|
|
### 2.1.5 通过SQL语句的方式
|
|
|
|
|
```python
|
|
|
|
|
#首先注册df为一个临时视图
|
|
|
|
|
df.createOrReplaceTempView("p")
|
|
|
|
|
#通过spark.sql("SQL语句")执行SQL语句
|
|
|
|
|
sqlDF = spark.sql("SELECT name FROM p")
|
|
|
|
|
sqlDF.show()
|
|
|
|
|
+-------+
|
|
|
|
|
| name|
|
|
|
|
|
+-------+
|
|
|
|
|
|Michael|
|
|
|
|
|
| Andy|
|
|
|
|
|
| Justin|
|
|
|
|
|
+-------+
|
|
|
|
|
```
|
|
|
|
|
### 2.1.6将处理结果保存到路径
|
|
|
|
|
|
|
|
|
|
`DataFrameWriter`用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等)。
|
|
|
|
|
|
|
|
|
|
使用`DataFrameWriter.save(String path)`,就可以将`Dataset`的内容保存在指定的路径中。
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
//写入并保存到指定路径
|
|
|
|
|
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
### 2.1.7 保存模式介绍
|
|
|
|
|
|
|
|
|
|
`save()`方法支持设置保存模式,使用`DataFrameWriter.mode(SaveMode saveMode)`可用于指定将`Dataset`保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为`Overwrite`,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。
|
|
|
|
|
|
|
|
|
|
`SaveMode` 类型如下:
|
|
|
|
|
|
|
|
|
|
| `Scala/Java` | 含义 |
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| `SaveMode.ErrorIfExists` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 |
|
|
|
|
|
| `SaveMode.Append` | 将`Dataset`保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 |
|
|
|
|
|
| `SaveMode.Overwrite` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 |
|
|
|
|
|
| `SaveMode.Ignore` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
|
|
|
|
|
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|