diff --git a/chapter2/2.1统计各个研发单位研制战斗机占比.md b/chapter2/2.1统计各个研发单位研制战斗机占比.md index e69de29..8140572 100644 --- a/chapter2/2.1统计各个研发单位研制战斗机占比.md +++ b/chapter2/2.1统计各个研发单位研制战斗机占比.md @@ -0,0 +1,116 @@ +## 2.1 使用Spark SQL统计各个研发单位研制战斗机占比 + +### 2.1.1 什么是Spark SQL + +`Spark SQL`是用来操作结构化和半结构化数据的接口。 +当每条存储记录共用已知的字段集合,数据符合此条件时,`Spark SQL`就会使得针对这些数据的读取和查询变得更加简单高效。具体来说,`Spark SQL`提供了以下三大功能: +(1) `Spark SQL`可以从各种结构化数据源(例如`JSON`、`Parquet`等)中读取数据。 + +(2) `Spark SQL`不仅支持在`Spark```程序内使用`SQL`语句进行数据查询,也支持从类似商业智能软件`Tableau`这样的外部工具中通过标准数据库连接器(`JDBC/ODBC`)连接`sparkSQL`进行查询。 + +(3) 当在`Spark`程序内使用`Spark SQL`时,`Spark SQL`支持`SQL`与常规的`Python/Java/Scala`代码高度整合,包括连接`RDD`与`SQL`表、公开的自定义`SQL`函数接口等。 + + +我们编写Spark SQL代码时从何开始呢?答案就是SparkSession。 +### 2.1.2 什么是SparkSession + +`Spark`中所有功能的入口点都是`SparkSession`类。要创建基本的`SparkSession`,只需使用`SparkSession.builder()`。 + +``` +from pyspark.sql import SparkSession + +spark = SparkSession \ + .builder \ + .appName("Python Spark SQL basic example") \ + .config("spark.some.config.option", "some-value") \ + .getOrCreate() + //打印spark版本号 + print(spark.version); +``` + +有了SparkSession,下一步就是创建DataFrames。 + +### 2.1.3 创建DataFrames +使用`SparkSession`可以从现有`RDD`,`Hive`表或`Spark`数据源(`json`,`parquet`,`jdbc`,`orc`,`libsvm`,`csv`,`text`)等格式文件创建DataFrame。 + +以下示例为读取Json文件创建DataFrame。 +`df =spark.read.json("examples/src/main/resources/people.json")` +people.json数据如下: + +```json +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} +``` + + +有了DataFrames之后,我们就可以对数据进行相应操作; +### 2.1.4使用DataFrames +在Python中,可以通过属性(df.age)或通过索引(df ['age'])(推荐使用)访问DataFrame的列。 +举例如下: + +```python +#打印Schema信息 +df.printSchema() + +root +|-- age: long (nullable = true) +|-- name: string (nullable = true) + +#选择姓名列 +df.select("name").show() + +-------+ + | name| + +-------+ + |Michael| + | Andy| + | Justin| + +-------+ +``` +我们也可以通过编写SQL语句的方式执行上述操作。 + +### 2.1.5 通过SQL语句的方式 +```python +#首先注册df为一个临时视图 +df.createOrReplaceTempView("p") +#通过spark.sql("SQL语句")执行SQL语句 +sqlDF = spark.sql("SELECT name FROM p") +sqlDF.show() + +-------+ + | name| + +-------+ + |Michael| + | Andy| + | Justin| + +-------+ +``` +### 2.1.6将处理结果保存到路径 + +`DataFrameWriter`用于将数据集写入外部存储系统的接口(例如文件系统,键值存储等)。 + +使用`DataFrameWriter.save(String path)`,就可以将`Dataset`的内容保存在指定的路径中。 + +``` +//写入并保存到指定路径 +df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges") + +``` +### 2.1.7 保存模式介绍 + +`save()`方法支持设置保存模式,使用`DataFrameWriter.mode(SaveMode saveMode)`可用于指定将`Dataset`保存到数据源的预期行为,指定如何处理现有数据(例如,执行时设置类型为`Overwrite`,则数据将在写出新数据之前被删除。)但需要注意的这些保存模式不使用任何锁定并且不是原子的。 + +`SaveMode` 类型如下: + +| `Scala/Java` | 含义 | +| ------------ | ------------ | +| `SaveMode.ErrorIfExists` (默认) | 将`Dataset`保存到数据源时,如果数据已存在,则会引发异常。 | +| `SaveMode.Append` | 将`Dataset`保存到数据源时,如果数据/表已存在,则Dataset的内容应附加到现有数据。 | +| `SaveMode.Overwrite` | 覆盖模式意味着在将`Dataset`保存到数据源时,如果数据/表已经存在,则预期现有数据将被`Dataset`的内容覆盖。 | +| `SaveMode.Ignore` | 忽略模式意味着在将`Dataset`保存到数据源时,如果数据已存在,则预期保存操作不会保存`Dataset`的内容而不会更改现有数据。这与`CREATE TABLE IF NOT EXISTSSQL`中的类似。 | + +``` +//覆盖原有数据并写入到F:\\test\\anamesAndAges路径上 +df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test\\anamesAndAges") +``` + +