结构修改

master
慢慢 6 years ago
parent 80558409e0
commit 98dc381ef7

@ -6,8 +6,12 @@
* [1.2 Spark单机版环境搭建](/chapter1/1.2Spark单机版环境搭建.md)
* [1.3 Spark完全分布式环境搭建](/chapter1/1.3Spark完全分布式环境搭建.md)
* [第二章 Spark结构化数据分析与处理](/chapter2/2结构化数据分析与处理简介.md)
* [2.1 SparkSQL统计各个研发单位研制战斗机占比](/chapter2/2.1统计各个研发单位研制战斗机占比.md)
* [2.2 SparkSQL对战斗机飞行性能进行分析](/chapter2/2.2对战斗机飞行性能进行分析.md)
* [2.1SparkSQL入门](/chapter2/2.1SparkSQL入门.md)
* [2.2 Spark SQL对战斗机飞行性能进行分析](/chapter2/2.2对战斗机飞行性能进行分析.md)
* [2.3 Spark SQL对战斗机飞行性能进行分析](/chapter2/2.3对战斗机飞行性能进行分析.md)
* [第三章 SparkStreaming流数据计算与分析](/chapter3/3流数据计算与分析简介.md)
* [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md)
* [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对军事类数据的查询次数进行可视化展示.md)

@ -1,13 +1,30 @@
[TOC]
---
## 1.3 Spark分布式环境搭建
我们已经掌握了`Spark`单机版安装,那么分布式集群怎么搭建呢? 接下来我们学习`Standalone`分布式集群搭建。
### 1.3.1 Spark分布式安装模式
Spark分布式环境安装目前有四种模式
1.`Standalone``Spark`自带的简单群资源管理器,安装较为简单,不需要依赖`Hadoop`
2.`Hadoop YARN`:使用`yarn`作为集群资源管理,安装需要依赖`Hadoop`
3.`Apache Mesos`:不常用;
4.`Kubernetes`:不常用。
本地学习测试我们常用`Standalone`模式,生产环境常使用`yarn`模式。
### 1.3.2 示例集群信息
以下表格为本教程所用示例集群节点信息:
@ -19,7 +36,7 @@
### 1.3.1 下载Spark安装包
### 1.3.3 下载Spark安装包
到`Spark`官网:<a href="https://archive.apache.org/dist/spark/spark-2.3.4/" target="view_frame">下载地址</span> </a> 下载
`Hadoop`版本为`2.7``Spark`版本为`2.3.4`的`spark`安装包。
@ -31,11 +48,11 @@
### 1.3.2 解压安装包
### 1.3.4 解压安装包
首先选择`master`节点安装`Spark`,将下载的`spark-2.3.4-bin-hadoop2.7.tgz `安装包上传至该节点的`/home/hadoop/soft/`目录下,然后执行`tar zxvf spark-2.3.4-bin-hadoop2.7.tgz `命令进行解压。
### 1.3.3 配置环境变量
### 1.3.5 配置环境变量
我们将`Spark`的根目录配置到`/etc/profile`中(在文件末尾添加)。
@ -44,7 +61,7 @@
该步骤所有节点均可执行
### 1.3.4 修改 spark-env.sh 配置文件
### 1.3.6 修改 spark-env.sh 配置文件
首先生成一份`spark-env.sh`文件:
@ -67,14 +84,14 @@ export SPARK_MASTER_IP=master节点IP
```
### 1.3.5 修改 slaves 文件
### 1.3.7 修改 slaves 文件
首先生成一份`slaves`文件
切换到`conf`目录下:
执行命令:`mv slaves.template slaves`
修改`slaves`文件:
执行命令:`vi slaves`,在该文件中加入作为`worker`节点`ip`。
执行命令:`vi slaves`,在该文件中加入作为`worker`节点`ip`或映射主机名
```java
master
@ -82,15 +99,15 @@ worker1
worker2
```
### 1.3.6 分发安装包
### 1.3.8 分发安装包
把master节点的`spark`安装包分发到`worker1`节点和`worker2`节点(可通过`linux`的`scp`命令)。
### 1.3.7 启动spark
### 1.3.9 启动spark
切换到`master`节点安装目录的`/sbin`目录下
执行命令启动`Spark`集群:`./start-all.sh`
### 1.3.7 检查webUI界面
### 1.3.10 检查webUI界面
启动完成后,在浏览器输入如下地址:`http://master地址:8888/`,可见如下页面。

@ -0,0 +1,115 @@
## 2.1 Spark SQL入门
`Spark SQL`是`Spark`用来处理结构化数据的一个模块。`Spark SQL`为了支持结构化数据的处理,它提供了两个编程抽象分别叫做`DataFrame`和`DataSet`。
### 2.1.1 DataFrameDataset和RDD的关系
`RDD` :仅表示数据集,`RDD`没有元数据,也就是说没有字段信息。
`DataFrame`:由于`RDD`的局限性,`Spark`产生了`DataFrame``DataFrame=RDD+Schema``Schema`也就是字段信息。`DataFrame`是一种特殊类型的 `Dataset``DataSet[Row] = DataFrame`。
`Dataset`:可以理解为强类型的`DataFrame`,但是`Python`不支持`Dataset API`。
了解完以上关系后,我们开始编写`Spark SQL`,从何开始呢?答案就是`SparkSession`。
### 2.1.2 什么是SparkSession
`SparkSession`是`Spark SQL`的入口。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
```
有了`SparkSession`,下一步就是创建`DataFrames`。
### 2.1.3 创建DataFrames
使用`SparkSession`可以从现有`RDD``Hive`表或`Spark`数据源(`json``parquet``jdbc``orc``libsvm``csv``text`)等格式文件创建`DataFrame`。
以下示例为读取`Json`文件创建`DataFrame`。
`df =spark.read.json("/people.json")`
`people.json`数据如下:
```json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```
有了`DataFrames`之后,我们就可以对数据进行相应操作.
### 2.1.4使用DataFrames
在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。
举例如下:
```python
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
我们也可以通过编写`SQL`语句的方式执行上述操作。
### 2.1.5 通过SQL语句的方式
```python
#首先注册df为一个临时视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
### 2.1.6将处理结果保存
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
```
#写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
```
### 2.1.7 保存模式介绍
`save`方法支持设置保存模式。
类型如下:
| 所有语言 | 说明 |
| ------------ | ------------ |
| `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 |
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在则DataFrame的内容应附加到现有数据。 |
| `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 |
| `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
```
#覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
```

@ -1,8 +1,8 @@
## 2.2 使用Spark SQL统计战斗机飞行性能
### 2.1.1 数据源
### 2.2.1 数据源
本教程提供一份全球战斗机相关指标参数的json数据。
本教程提供一份全球战斗机相关指标参数的`json`数据(见第二章节`data`目录)
其中一条数据如下:
`{"发动机数量":"双发","武器装备":"1机炮30 mm机炮 150发 2导弹鹰击-62反舰巡航导弹鹰击-83反舰导弹鹰击-91反舰导弹鹰击-9多用途导弹雷电-10反辐射导弹霹雳-8空空导弹霹雳-11空空导弹霹雳-12中程空空导弹 3炸弹雷霆2-雷射导引弹雷石6-滑翔炸弹200A反机场炸弹通用炸弹500千克1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}`
@ -16,3 +16,38 @@
### 2.1.3 结果数据保存
统计出指标后将结果以`json`格式保存到本地目录。
### 2.1.4处理步骤
1.创建`SparkSession`
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.sql.crossJoin.enabled", "true") \
.getOrCreate()
```
2.读取所给`json`数据创建`DataFrame`
`df =spark.read.json("/jun.json")`
3.创建视图;
`df.createOrReplaceTempView("table1")`
4.编写`sql`语句计算指标;
`sqlDF = spark.sql("sql语句")`
5.将处理结果保存到本地目录;
`sqlDF.write.format("json").save("保存路径")`
6.停止`SparkSession`。
`spark.stop()`

@ -1,20 +1,22 @@
## 2.1 使用Spark SQL统计各个研发单位研制战斗机占比
## 2.3 使用Spark SQL统计各个研发单位研制战斗机占比
### 2.1.1 数据源
### 2.3.1 数据源
本教程提供一份全球战斗机相关指标参数的json数据。
本教程提供一份全球战斗机相关指标参数的`json`数据(见第二章节`data`目录)
其中一条数据如下:
`{"发动机数量":"双发","武器装备":"1机炮30 mm机炮 150发 2导弹鹰击-62反舰巡航导弹鹰击-83反舰导弹鹰击-91反舰导弹鹰击-9多用途导弹雷电-10反辐射导弹霹雳-8空空导弹霹雳-11空空导弹霹雳-12中程空空导弹 3炸弹雷霆2-雷射导引弹雷石6-滑翔炸弹200A反机场炸弹通用炸弹500千克1500千克。","发动机":"AL-31F涡扇发动机","机长":"21.19米","名称":"歼-16战机","乘员":"2人","关注度":"(5分)","研发单位":"中国沈阳飞机公司","气动布局":"后掠翼","机高":"5.9米","最大飞行速度":"1,438千米每小时","翼展":"14.7米","最大航程":"4,288千米","飞行速度":"超音速","首飞时间":"2011年10月17日"}`
每条`json`数据里可能有不同数量的成员,成员的值可能为空。
### 2.1.2 统计指标说明
### 2.3.2 统计指标说明
统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比(以百分号显示,并保留两位小数,如:`0.12%`),原始数据中战斗机为空的不计入计算。
### 2.1.3 结果数据保存
### 2.3.3 结果数据保存
统计出指标后将结果以`json`格式保存到本地目录。
### 2.3.4处理步骤
同上小节步骤

@ -1,112 +1,12 @@
# 结构化数据分析与处理
# 第二章 结构化数据分析与处理
## 2.1 什么是Spark SQL
在大数据领域,统计分析处理结构化数据可以使用`Hive`等工具,但是`Hive`依赖的`MapReduce`计算过程中大量的中间磁盘落地过程消耗了大量的`I/O`,运行效率较低。恰好,基于内存计算的`Spark SQL`解决了这些问题。
`Spark SQL`是`Spark`用来处理结构化数据的一个模块,它提供一个抽象的数据集`DataFrame`,并且是作为分布式`SQL`查询引擎的应用。
或许你之前学习过`Hive`,我们知道它将`HQL`转换成`MR`,然后提交到集群上去执行,减少了编写`MR`查询的复杂性但是执行效率比较慢恰好Spark SQL的出现解决了这些问题
`Spark SQL`支持`Java`、`Scala`和`Python`语言,其中使用`Scala`开发是主流,但是本教程为顺应特殊需求,我们使用`Python`
我们编写`Spark SQL`代码时从何开始呢?答案就是`SparkSession`。
### 2.2 什么是SparkSession
`SparkSession`是`Spark SQL`的入口。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
```
有了`SparkSession`,下一步就是创建`DataFrames`。
### 2.3 创建DataFrames
使用`SparkSession`可以从现有`RDD``Hive`表或`Spark`数据源(`json``parquet``jdbc``orc``libsvm``csv``text`)等格式文件创建`DataFrame`。
以下示例为读取`Json`文件创建`DataFrame`。
`df =spark.read.json("/people.json")`
`people.json`数据如下:
```json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
```
有了`DataFrames`之后,我们就可以对数据进行相应操作.
### 2.4使用DataFrames
在`Python`中,可以通过属性`df.age`或通过索引`df ['age']`(推荐使用)访问`DataFrame`的列。
举例如下:
```python
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
我们也可以通过编写`SQL`语句的方式执行上述操作。
### 2.5 通过SQL语句的方式
```python
#首先注册df为一个临时视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```
### 2.6将处理结果保存
以下示例可将结果以`parquet`格式文件保存到`F:\\test\\anamesAndAges`路径。
```
#写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
```
### 2.7 保存模式介绍
`save`方法支持设置保存模式。
类型如下:
| 所有语言 | 说明 |
| ------------ | ------------ |
| `"error" or "errorifexists"` (默认) | 将`DataFrame`保存到数据源时,如果数据已存在,则会引发异常。 |
| ` "append" ` | 将`Dataset`保存到数据源时,如果数据/表已存在则DataFrame的内容应附加到现有数据。 |
| `"overwrite"` | 覆盖模式意味着在将`DataFrame`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`DataFrame`的内容覆盖。 |
| `"ignore"` | 忽略模式意味着在将`DataFrame`保存到数据源时,如果数据已存在,则预期保存操作不会保存`DataFrame`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 |
```
#覆盖原有数据并写入到F:\\test\\anamesAndAges路径上
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges")
```
Loading…
Cancel
Save