master
慢慢 6 years ago
parent 2e9ad5d607
commit 457830afd7

@ -1,116 +1,20 @@
## 2.1 使用Spark SQL统计各个研发单位研制战斗机占比
### 2.1.1 什么是Spark SQL
### 2.1.1 数据源
`Spark SQL`是用来操作结构化和半结构化数据的接口
当每条存储记录共用已知的字段集合,数据符合此条件时,`Spark SQL`就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,`Spark SQL`提供了以下三大功能
(1) `Spark SQL`可以从各种结构化数据源(例如`JSON`、`Parquet`等)中读取数据。
本教程提供一份全球战斗机相关指标参数的json数据
其中一条数据如下
`{"发动机数量":"双发","武器装备":"1机炮30 mm机炮 150发 2导弹鹰击-62反舰巡航导弹鹰击-83反舰导弹鹰击-91反舰导弹鹰击-9多用途导弹雷电-10反辐射导弹霹雳-8空空导弹霹雳-11空空导弹霹雳-12中程空空导弹 3炸弹雷霆2-雷射导引弹雷石6-滑翔炸弹200A反机场炸弹通用炸弹500千克1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}`
(2) `Spark SQL`不仅支持在`Spark```程序内使用`SQL`语句进行数据查询,也支持从类似商业智能软件`Tableau`这样的外部工具中通过标准数据库连接器(`JDBC/ODBC`)连接`sparkSQL`进行查询
每条数据可能有不同数量的名称,名称的值可能为空
(3) 当在`Spark`程序内使用`Spark SQL`时,`Spark SQL`支持`SQL`与常规的`Python/Java/Scala`代码高度整合,包括连接`RDD`与`SQL`表、公开的自定义`SQL`函数接口等。
### 2.1.2 统计指标说明
统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比,原始数据中战斗机为空的不计入总数。
我们编写Spark SQL代码时从何开始呢答案就是SparkSession。
### 2.1.2 什么是SparkSession
### 2.1.3 结果数据保存
`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`
统计出指标后将结果保存到以`json`格式保存到本地目录
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
//打印spark版本号
print(spark.version);
```
有了SparkSession下一步就是创建DataFrames。
### 2.1.3 创建DataFrames
使用`SparkSession`可以从现有`RDD``Hive`表或`Spark`数据源(`json``parquet``jdbc``orc``libsvm``csv``text`等格式文件创建DataFrame。
以下示例为读取Json文件创建DataFrame。
`df =spark.read.json("examples/src/main/resources/people.json")`
people.json数据如下
```json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```
有了DataFrames之后我们就可以对数据进行相应操作
### 2.1.4使用DataFrames
在Python中可以通过属性df.age或通过索引df ['age']推荐使用访问DataFrame的列。
举例如下:
```python
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
我们也可以通过编写SQL语句的方式执行上述操作。
### 2.1.5 通过SQL语句的方式
```python
#首先注册df为一个临时视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
### 2.1.6将处理结果保存到路径
`DataFrameWriter`用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等)。
使用`DataFrameWriter.save(String path)`,就可以将`Dataset`的内容保存在指定的路径中。
```
//写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
```
### 2.1.7 保存模式介绍
`save()`方法支持设置保存模式,使用`DataFrameWriter.mode(SaveMode saveMode)`可用于指定将`Dataset`保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为`Overwrite`,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。
`SaveMode` 类型如下:
| `Scala/Java` | 含义 |
| ------------ | ------------ |
| `SaveMode.ErrorIfExists` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 |
| `SaveMode.Append` | 将`Dataset`保存到数据源时,如果数据/表已存在则Dataset的内容应附加到现有数据。 |
| `SaveMode.Overwrite` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 |
| `SaveMode.Ignore` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
```
//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
```

@ -0,0 +1,17 @@
## 2.2 使用Spark SQL统计战斗机飞行性能
### 2.1.1 数据源
本教程提供一份全球战斗机相关指标参数的json数据。
其中一条数据如下:
`{"发动机数量":"双发","武器装备":"1机炮30 mm机炮 150发 2导弹鹰击-62反舰巡航导弹鹰击-83反舰导弹鹰击-91反舰导弹鹰击-9多用途导弹雷电-10反辐射导弹霹雳-8空空导弹霹雳-11空空导弹霹雳-12中程空空导弹 3炸弹雷霆2-雷射导引弹雷石6-滑翔炸弹200A反机场炸弹通用炸弹500千克1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}`
每条数据可能有不同数量的名称,名称的值可能为空。
### 2.1.2 统计指标说明
统计出全球飞行速度排名前三的战斗机。
### 2.1.3 结果数据保存
统计出指标后将结果保存到以`json`格式保存到本地目录。

@ -0,0 +1,108 @@
# 结构化数据分析与处理
## 2.1 什么是Spark SQL
SparkSQL是spark用来处理结构化的一个模块它提供一个抽象的数据集DataFrame,并且是作为分布式SQL查询引擎的应用。
我们之前已经学习了hive,它将HiveSQL转换成MR然后提交到集群上去执行减少编写MR查询的复杂性但是因为采用计算框架所以执行效率比较慢所以spark SQL就应运而生。
我们编写Spark SQL代码时从何开始呢答案就是SparkSession。
### 2.2 什么是SparkSession
`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
//打印spark版本号
print(spark.version);
```
有了SparkSession下一步就是创建DataFrames。
### 2.3 创建DataFrames
使用`SparkSession`可以从现有`RDD``Hive`表或`Spark`数据源(`json``parquet``jdbc``orc``libsvm``csv``text`等格式文件创建DataFrame。
以下示例为读取Json文件创建DataFrame。
`df =spark.read.json("examples/src/main/resources/people.json")`
people.json数据如下
```json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```
有了DataFrames之后我们就可以对数据进行相应操作
### 2.4使用DataFrames
在Python中可以通过属性df.age或通过索引df ['age']推荐使用访问DataFrame的列。
举例如下:
```python
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
我们也可以通过编写SQL语句的方式执行上述操作。
### 2.5 通过SQL语句的方式
```python
#首先注册df为一个临时视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
### 2.6将处理结果保存
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
```
//写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
```
### 2.7 保存模式介绍
`save()`方法支持设置保存模式。
`SaveMode` 类型如下:
| 所有语言 | 含义 |
| ------------ | ------------ |
| `"error" or "errorifexists"` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 |
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在则Dataset的内容应附加到现有数据。 |
| `"overwrite"` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 |
| `"ignore"` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
```
//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
```
Loading…
Cancel
Save