From 457830afd770e75a4ecb61ba4cdba2ef986d2261 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=85=A2=E6=85=A2?= <905907915@qq.com> Date: Wed, 6 Nov 2019 10:59:58 +0800 Subject: [PATCH] sparksql --- ...各个研发单位研制战斗机占比.md | 114 ++---------------- ...2.2对战斗机飞行性能进行分析.md | 17 +++ .../2结构化数据分析与处理简介.md | 108 +++++++++++++++++ 3 files changed, 134 insertions(+), 105 deletions(-) diff --git a/chapter2/2.1统计各个研发单位研制战斗机占比.md b/chapter2/2.1统计各个研发单位研制战斗机占比.md index 8140572..0054c28 100644 --- a/chapter2/2.1统计各个研发单位研制战斗机占比.md +++ b/chapter2/2.1统计各个研发单位研制战斗机占比.md @@ -1,116 +1,20 @@ ## 2.1 使用Spark SQL统计各个研发单位研制战斗机占比 -### 2.1.1 什么是Spark SQL +### 2.1.1 数据源 -`Spark SQL`是用来操作结构化和半结构化数据的接口。 -当每条存储记录共用已知的字段集合,数据符合此条件时,`Spark SQL`就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,`Spark SQL`提供了以下三大功能: -(1) `Spark SQL`可以从各种结构化数据源(例如`JSON`、`Parquet`等)中读取数据。 +本教程提供一份全球战斗机相关指标参数的json数据。 + 其中一条数据如下: + `{"发动机数量":"双发","武器装备":"(1)机炮:30 mm机炮 150发; (2)导弹:鹰击-62反舰巡航导弹,鹰击-83反舰导弹,鹰击-91反舰导弹,鹰击-9多用途导弹,雷电-10反辐射导弹,霹雳-8空空导弹,霹雳-11空空导弹,霹雳-12中程空空导弹; (3)炸弹:雷霆2-雷射导引弹,雷石6-滑翔炸弹,200A反机场炸弹,通用炸弹500千克,1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}` -(2) `Spark SQL`不仅支持在`Spark```程序内使用`SQL`语句进行数据查询,也支持从类似商业智能软件`Tableau`这样的外部工具中通过标准数据库连接器(`JDBC/ODBC`)连接`sparkSQL`进行查询。 +每条数据可能有不同数量的名称,名称的值可能为空。 -(3) 当在`Spark`程序内使用`Spark SQL`时,`Spark SQL`支持`SQL`与常规的`Python/Java/Scala`代码高度整合,包括连接`RDD`与`SQL`表、公开的自定义`SQL`函数接口等。 +### 2.1.2 统计指标说明 +统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比,原始数据中战斗机为空的不计入总数。 -我们编写Spark SQL代码时从何开始呢?答案就是SparkSession。 -### 2.1.2 什么是SparkSession +### 2.1.3 结果数据保存 -`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。 +统计出指标后将结果保存到以`json`格式保存到本地目录。 -``` -from pyspark.sql import SparkSession - -spark = SparkSession \ - .builder \ - .appName("Python Spark SQL basic example") \ - .config("spark.some.config.option", "some-value") \ - .getOrCreate() - //打印spark版本号 - print(spark.version); -``` - -有了SparkSession,下一步就是创建DataFrames。 - -### 2.1.3 创建DataFrames -使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建DataFrame。 - -以下示例为读取Json文件创建DataFrame。 -`df =spark.read.json("examples/src/main/resources/people.json")` -people.json数据如下: - -```json -{"name":"Michael"} -{"name":"Andy", "age":30} -{"name":"Justin", "age":19} -``` - - -有了DataFrames之后,我们就可以对数据进行相应操作; -### 2.1.4使用DataFrames -在Python中,可以通过属性(df.age)或通过索引(df ['age'])(推荐使用)访问DataFrame的列。 -举例如下: - -```python -#打印Schema信息 -df.printSchema() - -root -|-- age: long (nullable = true) -|-- name: string (nullable = true) - -#选择姓名列 -df.select("name").show() - +-------+ - | name| - +-------+ - |Michael| - | Andy| - | Justin| - +-------+ -``` -我们也可以通过编写SQL语句的方式执行上述操作。 - -### 2.1.5 通过SQL语句的方式 -```python -#首先注册df为一个临时视图 -df.createOrReplaceTempView("p") -#通过spark.sql("SQL语句")执行SQL语句 -sqlDF = spark.sql("SELECT name FROM p") -sqlDF.show() - +-------+ - | name| - +-------+ - |Michael| - | Andy| - | Justin| - +-------+ -``` -### 2.1.6将处理结果保存到路径 - -`DataFrameWriter`用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等)。 - -使用`DataFrameWriter.save(String path)`,就可以将`Dataset`的内容保存在指定的路径中。 - -``` -//写入并保存到指定路径 -df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges") - -``` -### 2.1.7 保存模式介绍 - -`save()`方法支持设置保存模式,使用`DataFrameWriter.mode(SaveMode saveMode)`可用于指定将`Dataset`保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为`Overwrite`,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。 - -`SaveMode` 类型如下: - -| `Scala/Java` | 含义 | -| ------------ | ------------ | -| `SaveMode.ErrorIfExists` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 | -| `SaveMode.Append` | 将`Dataset`保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 | -| `SaveMode.Overwrite` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 | -| `SaveMode.Ignore` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 | - -``` -//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上 -df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges") -``` diff --git a/chapter2/2.2对战斗机飞行性能进行分析.md b/chapter2/2.2对战斗机飞行性能进行分析.md index e69de29..cd50a05 100644 --- a/chapter2/2.2对战斗机飞行性能进行分析.md +++ b/chapter2/2.2对战斗机飞行性能进行分析.md @@ -0,0 +1,17 @@ +## 2.2 使用Spark SQL统计战斗机飞行性能 + +### 2.1.1 数据源 + +本教程提供一份全球战斗机相关指标参数的json数据。 + 其中一条数据如下: + `{"发动机数量":"双发","武器装备":"(1)机炮:30 mm机炮 150发; (2)导弹:鹰击-62反舰巡航导弹,鹰击-83反舰导弹,鹰击-91反舰导弹,鹰击-9多用途导弹,雷电-10反辐射导弹,霹雳-8空空导弹,霹雳-11空空导弹,霹雳-12中程空空导弹; (3)炸弹:雷霆2-雷射导引弹,雷石6-滑翔炸弹,200A反机场炸弹,通用炸弹500千克,1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}` + +每条数据可能有不同数量的名称,名称的值可能为空。 + +### 2.1.2 统计指标说明 + +统计出全球飞行速度排名前三的战斗机。 + +### 2.1.3 结果数据保存 + +统计出指标后将结果保存到以`json`格式保存到本地目录。 diff --git a/chapter2/2结构化数据分析与处理简介.md b/chapter2/2结构化数据分析与处理简介.md index e69de29..1c1403b 100644 --- a/chapter2/2结构化数据分析与处理简介.md +++ b/chapter2/2结构化数据分析与处理简介.md @@ -0,0 +1,108 @@ +# 结构化数据分析与处理 + +## 2.1 什么是Spark SQL + +SparkSQL是spark用来处理结构化的一个模块,它提供一个抽象的数据集DataFrame,并且是作为分布式SQL查询引擎的应用。 +我们之前已经学习了hive,它将HiveSQL转换成MR,然后提交到集群上去执行,减少编写MR查询的复杂性,但是因为采用计算框架,所以执行效率比较慢,所以spark SQL就应运而生。 + + + +我们编写Spark SQL代码时从何开始呢?答案就是SparkSession。 +### 2.2 什么是SparkSession + +`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。 + +``` +from pyspark.sql import SparkSession + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL basic example") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() + //打印spark版本号 + print(spark.version); +``` + +有了SparkSession,下一步就是创建DataFrames。 + +### 2.3 创建DataFrames +使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建DataFrame。 + +以下示例为读取Json文件创建DataFrame。 +`df =spark.read.json("examples/src/main/resources/people.json")` +people.json数据如下: + +```json +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} +``` + + +有了DataFrames之后,我们就可以对数据进行相应操作; +### 2.4使用DataFrames +在Python中,可以通过属性(df.age)或通过索引(df ['age'])(推荐使用)访问DataFrame的列。 +举例如下: + +```python +#打印Schema信息 +df.printSchema() + +root +|-- age: long (nullable = true) +|-- name: string (nullable = true) + +#选择姓名列 +df.select("name").show() + +-------+ + | name| + +-------+ + |Michael| + | Andy| + | Justin| + +-------+ +``` +我们也可以通过编写SQL语句的方式执行上述操作。 + +### 2.5 通过SQL语句的方式 +```python +#首先注册df为一个临时视图 +df.createOrReplaceTempView("p") +#通过spark.sql("SQL语句")执行SQL语句 +sqlDF = spark.sql("SELECT name FROM p") +sqlDF.show() + +-------+ + | name| + +-------+ + |Michael| + | Andy| + | Justin| + +-------+ +``` +### 2.6将处理结果保存 + +以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。 + +``` +//写入并保存到指定路径 +df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges") + +``` +### 2.7 保存模式介绍 + +`save()`方法支持设置保存模式。 + +`SaveMode` 类型如下: + +| 所有语言 | 含义 | +| ------------ | ------------ | +| `"error" or "errorifexists"` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 | +| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 | +| `"overwrite"` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 | +| `"ignore"` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 | + +``` +//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上 +df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges") +``` \ No newline at end of file