|
|
|
@ -0,0 +1,230 @@
|
|
|
|
|
## 3.1 对军事类数据的查询次数进行实时统计分析
|
|
|
|
|
Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。本关结合我们的应用场景,介结我们如何使用 Spark Streaming处理数据。
|
|
|
|
|
|
|
|
|
|
### 3.1.1 概述
|
|
|
|
|
|
|
|
|
|
Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。其架构见下图:
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
Spark Streaming 其优秀的特点给我们带来很多的应用场景。本文中,将通过从kafka获取用户行为来为大家进行介绍。场景流程如下:
|
|
|
|
|
- 读取 Kafka 实时数据;
|
|
|
|
|
|
|
|
|
|
- Spark Streaming 对数据进行处理;
|
|
|
|
|
|
|
|
|
|
- 将数据结果持久化到kafka中,跳转到步骤一。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 3.1.3 基本概念
|
|
|
|
|
|
|
|
|
|
结构化流(Structed streaming)中的关键思想是将实时数据流视为连续追加的表
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
程序模型:
|
|
|
|
|
|
|
|
|
|
每个触发(Triggers)间隔(例如,每`1`秒)将新行附加到输入表,经查询产生结果表,并最终写入外部Sink。
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
### 3.1.4 创建流式Dataframe的说明
|
|
|
|
|
`Spark2.0`以后,`DataFrames`和`Datasets`不仅可以用于`sparksql`中表示静态有界的数据,而且可以在`Structer streaming`中表示流式无界数据。我们可以通过`SparkSession.readStream()`创建流式`DataFrames`;
|
|
|
|
|
|
|
|
|
|
### 3.1.5 输入源(Input Sources)
|
|
|
|
|
文件源(File source)
|
|
|
|
|
|
|
|
|
|
将目录中写入的文件作为数据流读取。支持的文件格式为`text`,`csv`,`json`,`orc`,`parquet`。
|
|
|
|
|
|
|
|
|
|
| 参数 | 说明 |
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| path | 输入文件目录 |
|
|
|
|
|
| maxFilesPerTrigger | 每个触发器中要考虑的最大文件数(默认值:无最大值) |
|
|
|
|
|
|latestFirst|是否先处理最新的新文件,当存在大量积压的文件时有用(默认值:false)|
|
|
|
|
|
|fileNameOnly|例:是否认定以下两个文件是否是一个文件 “file:///dataset.txt” ,“s3:// a / dataset.txt“ ,默认false,不属于同一个文件|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Socket源(Socket Source)
|
|
|
|
|
用于测试,从`socket`连接中读取`UTF8`文本数据。
|
|
|
|
|
|
|
|
|
|
| 参数 | 说明 |
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| host | 必须指定,连接的主机 |
|
|
|
|
|
| port |必须指定,连接的端口 |
|
|
|
|
|
|
|
|
|
|
Rate源(Rate Source)
|
|
|
|
|
用于测试,以每秒指定的行数生成数据,每个输出行包含一个`timestamp`和`value`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Kafka源(Kafka Source)
|
|
|
|
|
从`Kafka`中读取数据
|
|
|
|
|
示例:
|
|
|
|
|
```
|
|
|
|
|
val spark: SparkSession = ...
|
|
|
|
|
|
|
|
|
|
// 从socket读数据
|
|
|
|
|
socketDF = spark
|
|
|
|
|
.readStream
|
|
|
|
|
.format("socket")
|
|
|
|
|
.option("host", "localhost")
|
|
|
|
|
.option("port", 9999)
|
|
|
|
|
.load()
|
|
|
|
|
|
|
|
|
|
// 从目录读取csv文件
|
|
|
|
|
userSchema = new StructType().add("name", "string").add("age", "integer")
|
|
|
|
|
csvDF = spark
|
|
|
|
|
.readStream
|
|
|
|
|
.option("sep", ";")
|
|
|
|
|
.schema(userSchema)
|
|
|
|
|
.csv("/path/to/directory") // 等同于 format("csv").load("/path/to/directory")
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 3.1.6 启动流式查询说明
|
|
|
|
|
|
|
|
|
|
使用`result.writeStream()`且指定以下一项或者某几项:
|
|
|
|
|
|
|
|
|
|
- 输出`sink`的详细信息:比如`Data format`, `location`等
|
|
|
|
|
|
|
|
|
|
- 输出模式:`Append`模式(默认的),`Complete`模式,`Update`模式
|
|
|
|
|
|
|
|
|
|
| 模式 | 说明|
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| Append模式 | 自上次触发后添加到结果表的新行才会输出到`sink`。只支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。`select`,` where`,`map`,`flatMap`,`filter`,`join`,等会支持追加模式。 |
|
|
|
|
|
| Complete模式 | 每次触发后,整个结果表将输出到接收器。支持聚合查询 |
|
|
|
|
|
|Update模式|仅将结果表中自上次触发后更新的行输出到接收器|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 查询名称:可选配置,指定查询的唯一名称以进行标识。
|
|
|
|
|
|
|
|
|
|
- 触发间隔:指定触发间隔。
|
|
|
|
|
|
|
|
|
|
- 检查点位置:对于可以保证端到端容错的某些输出接收器,需要指定系统写入检查点信息的目录。
|
|
|
|
|
|
|
|
|
|
### 3.1.7 输出Sink
|
|
|
|
|
以下是几种内置输出`sink`
|
|
|
|
|
|
|
|
|
|
File sink:将输出结果保存到一个指定目录
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
writeStream
|
|
|
|
|
.format("parquet") // 可以是"orc", "json","csv"等.
|
|
|
|
|
.option("path", "path/to/destination/dir")
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Kafka sink:将输出结果保存到一个或者多个`topic`
|
|
|
|
|
```
|
|
|
|
|
writeStream
|
|
|
|
|
.format("kafka")
|
|
|
|
|
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
|
|
|
|
|
.option("topic", "updates")
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
Foreach sink:对输出中的记录进行任意计算,可用于自定义`sink`
|
|
|
|
|
```
|
|
|
|
|
writeStream
|
|
|
|
|
.foreach(...)
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
Console sink :用于调试,每次触发时将输出打印到控制台
|
|
|
|
|
```
|
|
|
|
|
writeStream
|
|
|
|
|
.format("console")
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
Memory sink :用于调试,输出存储在内存中
|
|
|
|
|
```
|
|
|
|
|
writeStream
|
|
|
|
|
.format("memory")
|
|
|
|
|
.queryName("tableName")
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 3.1.8 触发器(Triggers)
|
|
|
|
|
|
|
|
|
|
定义了流式数据处理的时间,查询时是使用具有固定批处理间隔的微批量查询还是作为连续处理查询来执行。有以下几种方式:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 触发器类型 |说明 |
|
|
|
|
|
| ------------ | ------------ |
|
|
|
|
|
| 不指定(默认) | 查询将以微批处理模式执行,一旦前一个微批处理完成处理,将立即生成微批处理。 |
|
|
|
|
|
| 固定微批处理(Fixed interval micro-batches) | 查询将以微批处理模式执行,其中微批处理将以用户指定时间间隔启动。如果先前的微批次在该间隔内完成,将等待该间隔结束,然后开始下一个微批次。如果前一个微批次需要的时间长于完成的间隔(即如果错过了间隔边界),则下一个微批次将在前一个完成后立即开始(即,它不会等待下一个间隔边界) )。如果没有可用的新数据,则不会启动微批次。 |
|
|
|
|
|
| 一次微批处理(One-time micro-batch) | 执行*仅一个*微批处理所有可用数据,然后自行停止。 |
|
|
|
|
|
|连续固定检查点间隔(Continuous with fixed checkpoint interval)|处于试验阶段,将以新的低延迟,连续处理模式执行。|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
// 默认 trigger
|
|
|
|
|
df.writeStream
|
|
|
|
|
.format("console")
|
|
|
|
|
.start()
|
|
|
|
|
|
|
|
|
|
// 固定微批处理
|
|
|
|
|
df.writeStream
|
|
|
|
|
.format("console")
|
|
|
|
|
.trigger(Trigger.ProcessingTime("2 seconds"))
|
|
|
|
|
.start()
|
|
|
|
|
|
|
|
|
|
// 一次微批处理
|
|
|
|
|
df.writeStream
|
|
|
|
|
.format("console")
|
|
|
|
|
.trigger(Trigger.Once())
|
|
|
|
|
.start()
|
|
|
|
|
|
|
|
|
|
// 连续固定检查点间隔
|
|
|
|
|
df.writeStream
|
|
|
|
|
.format("console")
|
|
|
|
|
.trigger(Trigger.Continuous("1 second"))
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 3.1.9 Spark Streaming用户行为分析
|
|
|
|
|
我们要从kafka获取数据, 需要先创建`Sparksession`
|
|
|
|
|
|
|
|
|
|
- 创建`Sparksession`
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
spark = SparkSession \
|
|
|
|
|
.builder \
|
|
|
|
|
.master("local[2]") \
|
|
|
|
|
.appName("getUserInfo") \
|
|
|
|
|
.getOrCreate()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
- 从kafka主题`topic1`创建流式`DataFrame`
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
df = spark \
|
|
|
|
|
.readStream \
|
|
|
|
|
.format("kafka") \
|
|
|
|
|
.option("kafka.bootstrap.servers", "121.40.96.250:9092") \
|
|
|
|
|
.option("subscribe", "topic1") \
|
|
|
|
|
.load()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
- 创建临时表,通过sparksql获取用户查询次数
|
|
|
|
|
```
|
|
|
|
|
#通过`DataFrame`创建`yy`表
|
|
|
|
|
df.createOrReplaceTempView("yy")
|
|
|
|
|
#获取查询数据
|
|
|
|
|
sql=spark.sql("select count(*) from yy ")
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
- 每五秒启动一次流式查询,并把最后结果存储到kafka
|
|
|
|
|
```
|
|
|
|
|
query = sql \
|
|
|
|
|
.selectExpr("CAST(text AS STRING)as value")\
|
|
|
|
|
.writeStream \
|
|
|
|
|
.format("kafka") \
|
|
|
|
|
.outputMode("complete")\
|
|
|
|
|
.option("kafka.bootstrap.servers", "121.40.96.250:9092")\
|
|
|
|
|
.option("topic", "jun")\
|
|
|
|
|
.trigger(processingTime='5 seconds')\
|
|
|
|
|
.start()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
- 等待停止指令
|
|
|
|
|
|
|
|
|
|
`query.awaitTermination()`
|