You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

3.7 KiB

2.1 Spark SQL入门

Spark SQLSpark用来处理结构化数据的一个模块。Spark SQL为了支持结构化数据的处理,它提供了两个编程抽象分别叫做DataFramesDataSets

2.1.1 DataFramesDatasets和RDD的关系

RDD :仅表示数据集,RDD没有元数据,也就是说没有字段信息。

DataFrames:由于RDD的局限性,Spark产生了DataFramesDataFrame=RDD+SchemaSchema也就是字段信息。DataFrames是一种特殊类型的 DatasetsDataSet[Row] = DataFrame

Datasets:可以理解为强类型的DataFrames,也就是说每一个record存储的是一个强类型值而不是一个Row。但是Python不支持Datasets API

了解完以上关系后,我们开始编写Spark SQL,从何开始呢?答案就是SparkSession

2.1.2 什么是SparkSession

SparkSessionSpark SQL的入口。要创建基本的SparkSession,只需使用SparkSession.builder()

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

有了SparkSession,下一步就是创建DataFrames

2.1.3 创建DataFrames

使用SparkSession可以从现有RDDHive表或Spark数据源(jsonparquetjdbcorclibsvmcsvtext)等格式文件创建DataFrame

以下示例为读取Json文件创建DataFramedf =spark.read.json("/people.json")

people.json数据如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

有了DataFrames之后,我们就可以对数据进行相应操作.

2.1.4使用DataFrames

Python中,可以通过属性df.age或通过索引df ['age'](推荐使用)访问DataFrame的列。 举例如下:

#打印Schema信息
df.printSchema()

root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

#选择姓名列
df.select("name").show()
 +-------+
 |   name|
 +-------+
 |Michael|
 |   Andy|
 | Justin|
 +-------+

我们也可以通过编写SQL语句的方式执行上述操作。

2.1.5 通过SQL语句的方式

#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()
 +-------+
 |   name|
 +-------+
 |Michael|
 |   Andy|
 | Justin|
 +-------+

2.1.6将处理结果保存

以下示例可将结果以parquet格式文件保存到F:\\test\\anamesAndAges路径。

#写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")

2.1.7 保存模式介绍

save方法支持设置保存模式。

类型如下:

所有语言 说明
"error" or "errorifexists" (默认) DataFrame保存到数据源时,如果数据已存在,则会引发异常。
"append" Dataset保存到数据源时,如果数据/表已存在则DataFrame的内容应附加到现有数据。
"overwrite" 覆盖模式意味着在将DataFrame保存到数据源时,如果数据/表已经存在,则预期现有数据将被DataFrame的内容覆盖。
"ignore" 忽略模式意味着在将DataFrame保存到数据源时,如果数据已存在,则预期保存操作不会保存DataFrame的内容而不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。

例:覆盖原有数据并保存到F:\\test路径下


df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")