3.7 KiB
2.1 Spark SQL入门
Spark SQL
是Spark
用来处理结构化数据的一个模块。Spark SQL
为了支持结构化数据的处理,它提供了两个编程抽象分别叫做DataFrames
和DataSets
。
2.1.1 DataFrames,Datasets和RDD的关系
RDD
:仅表示数据集,RDD
没有元数据,也就是说没有字段信息。
DataFrames
:由于RDD
的局限性,Spark
产生了DataFrames
,DataFrame=RDD+Schema
,Schema
也就是字段信息。DataFrames
是一种特殊类型的 Datasets
,DataSet[Row] = DataFrame
。
Datasets
:可以理解为强类型的DataFrames
,也就是说每一个record
存储的是一个强类型值而不是一个Row
。但是Python
不支持Datasets API
。
了解完以上关系后,我们开始编写Spark SQL
,从何开始呢?答案就是SparkSession
。
2.1.2 什么是SparkSession
SparkSession
是Spark SQL
的入口。要创建基本的SparkSession
,只需使用SparkSession.builder()
。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
有了SparkSession
,下一步就是创建DataFrames
。
2.1.3 创建DataFrames
使用SparkSession
可以从现有RDD
,Hive
表或Spark
数据源(json
,parquet
,jdbc
,orc
,libsvm
,csv
,text
)等格式文件创建DataFrame
。
以下示例为读取Json
文件创建DataFrame
。
df =spark.read.json("/people.json")
people.json
数据如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
有了DataFrames
之后,我们就可以对数据进行相应操作.
2.1.4使用DataFrames
在Python
中,可以通过属性df.age
或通过索引df ['age']
(推荐使用)访问DataFrame
的列。
举例如下:
#打印Schema信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
#选择姓名列
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
我们也可以通过编写SQL
语句的方式执行上述操作。
2.1.5 通过SQL语句的方式
#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
2.1.6将处理结果保存
以下示例可将结果以parquet
格式文件保存到F:\\test\\anamesAndAges
路径。
#写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
2.1.7 保存模式介绍
save
方法支持设置保存模式。
类型如下:
所有语言 | 说明 |
---|---|
"error" or "errorifexists" (默认) |
将DataFrame 保存到数据源时,如果数据已存在,则会引发异常。 |
"append" |
将Dataset 保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据。 |
"overwrite" |
覆盖模式意味着在将DataFrame 保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame 的内容覆盖。 |
"ignore" |
忽略模式意味着在将DataFrame 保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame 的内容而不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL 中的类似。 |
例:覆盖原有数据并保存到F:\\test
路径下
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")