master
zy 6 years ago
parent 2a33ac4cd1
commit 1e6efc3d3f

@ -14,8 +14,8 @@
* [第三章 SparkStreaming流数据计算与分析](/chapter3/3流数据计算与分析简介.md)
* [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md)
* [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对军事类数据的查询次数进行可视化展示.md)
* [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1Spark结构化流快速入门.md)
* [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对飞机的点击次数实时统计.md)
* [第四章 Spark图数据计算与分析实战](/chapter4/4Spark图数据计算简介.md)
* [4.1 SparkGraphX定义图结构](/chapter4/4.1SparkGraphX定义图结构.md)
* [4.2 SparkGraphX军用物资运输路线规划](/chapter4/4.2SparkGraphX军用物资运输路线规划.md)

@ -0,0 +1,59 @@
## 3.1 Spark结构化流快速入门
Spark Streaming是核心Spark API的扩展可实现实时数据流的可伸缩高吞吐量容错流处理。数据可以从像kafkaFlumeKinesis或TCP套接字许多来源摄入并且可以使用与像高级别功能表达复杂的算法来处理mapreducejoin和window。最后可以将处理后的数据推送到文件系统数据库和实时仪表板。实际上你可以在数据流上应用Spark的 机器学习和 图形处理算法。
![](https://www.educoder.net/api/attachments/504914)
在编写自己的Spark Streaming程序之前让我们快速简单对Spark Streaming进行入门。
### 3.1.1 一个简单的例子
假设我们要计算从侦听TCP套接字的数据服务器接收到的文本数据中的单词数。
首先我们必须导入必要的类并创建一个本地SparkSession这是与Spark相关的所有功能的起点。
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
```
接下来让我们创建一个流数据框架该数据框架表示从侦听localhost9999的服务器接收的文本数据并转换该数据框架以计算字数。
```
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
```
此lines DataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串流文本数据中的每一行都成为表中的一行。请注意由于我们正在设置转换并且尚未开始转换因此当前未接收到任何数据。接下来我们使用了两个内置的SQL函数-split和explode将每一行拆分为多行每行各有一个单词。另外我们使用函数alias将新列命名为“ word”。最后我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意这是一个流数据帧它表示流的运行字数。
现在我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集由指定打印到控制台。然后使用开始流计算start()。
```
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
```
执行此代码后流计算将在后台开始。该query对象是该活动流查询的句柄我们已决定使用来等待查询终止awaitTermination()以防止在该查询处于活动状态时退出该过程。

@ -1,237 +0,0 @@
## 3.1 对军事类数据的查询次数进行实时统计分析
Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。本关结合我们的应用场景,介结我们如何使用 Spark Streaming处理数据。
### 3.1.1 概述
Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming 支持的数据输入源很多例如Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。其架构见下图:
<p align="center" >
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/462367" alt="" height="100%" width="100%" />
</p>
Spark Streaming 其优秀的特点给我们带来很多的应用场景。本文中将通过从kafka获取用户行为来为大家进行介绍。场景流程如下
- 读取 Kafka 实时数据;
- Spark Streaming 对数据进行处理;
- 将数据结果持久化到kafka中跳转到步骤一。
### 3.1.3 基本概念
结构化流Structed streaming中的关键思想是将实时数据流视为连续追加的表
<p align="center" >
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/375945" alt="" height="100%" width="100%" />
</p>
程序模型:
每个触发(Triggers)间隔(例如,每`1`秒将新行附加到输入表经查询产生结果表并最终写入外部Sink。
<p align="center" >
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/375947" alt="" height="100%" width="100%" />
</p>
### 3.1.4 创建流式Dataframe的说明
`Spark2.0`以后,`DataFrames`和`Datasets`不仅可以用于`sparksql`中表示静态有界的数据,而且可以在`Structer streaming`中表示流式无界数据。我们可以通过`SparkSession.readStream()`创建流式`DataFrames`
### 3.1.5 输入源Input Sources
文件源File source
将目录中写入的文件作为数据流读取。支持的文件格式为`text``csv``json``orc``parquet`。
| 参数 | 说明 |
| ------------ | ------------ |
| path | 输入文件目录 |
| maxFilesPerTrigger | 每个触发器中要考虑的最大文件数(默认值:无最大值) |
|latestFirst|是否先处理最新的新文件当存在大量积压的文件时有用默认值false|
|fileNameOnly|例:是否认定以下两个文件是否是一个文件 “file///dataset.txt” “s3// a / dataset.txt“ 默认false不属于同一个文件|
Socket源Socket Source
用于测试,从`socket`连接中读取`UTF8`文本数据。
| 参数 | 说明 |
| ------------ | ------------ |
| host | 必须指定,连接的主机 |
| port |必须指定,连接的端口 |
Rate源Rate Source
用于测试,以每秒指定的行数生成数据,每个输出行包含一个`timestamp`和`value`。
Kafka源Kafka Source
从`Kafka`中读取数据
示例:
```
val spark: SparkSession = ...
// 从socket读数据
socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 从目录读取csv文件
userSchema = new StructType().add("name", "string").add("age", "integer")
csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema)
.csv("/path/to/directory") // 等同于 format("csv").load("/path/to/directory")
```
### 3.1.6 启动流式查询说明
使用`result.writeStream()`且指定以下一项或者某几项:
- 输出`sink`的详细信息:比如`Data format`, `location`
- 输出模式:`Append`模式(默认的)`Complete`模式,`Update`模式
| 模式 | 说明|
| ------------ | ------------ |
| Append模式 | 自上次触发后添加到结果表的新行才会输出到`sink`。只支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。`select`` where``map``flatMap``filter``join`,等会支持追加模式。 |
| Complete模式 | 每次触发后,整个结果表将输出到接收器。支持聚合查询 |
|Update模式|仅将结果表中自上次触发后更新的行输出到接收器|
- 查询名称:可选配置,指定查询的唯一名称以进行标识。
- 触发间隔:指定触发间隔。
- 检查点位置:对于可以保证端到端容错的某些输出接收器,需要指定系统写入检查点信息的目录。
### 3.1.7 输出Sink
以下是几种内置输出`sink`
File sink将输出结果保存到一个指定目录
```
writeStream
.format("parquet") // 可以是"orc", "json","csv"等.
.option("path", "path/to/destination/dir")
.start()
```
Kafka sink将输出结果保存到一个或者多个`topic`
```
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
```
Foreach sink:对输出中的记录进行任意计算,可用于自定义`sink`
```
writeStream
.foreach(...)
.start()
```
Console sink :用于调试,每次触发时将输出打印到控制台
```
writeStream
.format("console")
.start()
```
Memory sink :用于调试,输出存储在内存中
```
writeStream
.format("memory")
.queryName("tableName")
.start()
```
### 3.1.8 触发器Triggers
定义了流式数据处理的时间,查询时是使用具有固定批处理间隔的微批量查询还是作为连续处理查询来执行。有以下几种方式:
| 触发器类型 |说明 |
| ------------ | ------------ |
| 不指定(默认) | 查询将以微批处理模式执行,一旦前一个微批处理完成处理,将立即生成微批处理。 |
| 固定微批处理Fixed interval micro-batches | 查询将以微批处理模式执行,其中微批处理将以用户指定时间间隔启动。如果先前的微批次在该间隔内完成,将等待该间隔结束,然后开始下一个微批次。如果前一个微批次需要的时间长于完成的间隔(即如果错过了间隔边界),则下一个微批次将在前一个完成后立即开始(即,它不会等待下一个间隔边界) )。如果没有可用的新数据,则不会启动微批次。 |
| 一次微批处理One-time micro-batch | 执行*仅一个*微批处理所有可用数据,然后自行停止。 |
|连续固定检查点间隔(Continuous with fixed checkpoint interval)|处于试验阶段,将以新的低延迟,连续处理模式执行。|
```
// 默认 trigger
df.writeStream
.format("console")
.start()
// 固定微批处理
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// 一次微批处理
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// 连续固定检查点间隔
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
```
### 3.1.9 Spark Streaming用户行为分析
我们要从kafka获取数据, 需要先创建`Sparksession`
- 创建`Sparksession`
```
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("getUserInfo") \
.getOrCreate()
```
- 从kafka主题`topic1`创建流式`DataFrame`
```
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "121.40.96.250:9092") \
.option("subscribe", "topic1") \
.load()
```
- 创建临时表通过sparksql获取用户查询次数
```
#通过`DataFrame`创建`yy`表
df.createOrReplaceTempView("yy")
#获取查询数据
sql=spark.sql("select count(*) from yy ")
```
- 每五秒启动一次流式查询,并把最后结果存储到kafka
```
query = sql \
.selectExpr("CAST(text AS STRING)as value")\
.writeStream \
.format("kafka") \
.outputMode("complete")\
.option("kafka.bootstrap.servers", "121.40.96.250:9092")\
.option("topic", "jun")\
.trigger(processingTime='5 seconds')\
.start()
```
- 等待停止指令
`query.awaitTermination()`

@ -1,86 +0,0 @@
## 3.1 对军事类数据的查询次数进行可视化展示
Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。上一关我们介绍了如何实时获取数据,并处理数据,本关结合上一关的场景,介结我们如何把实时数据进行可视化。
### 3.1.1 Apache Kafka 概述
类 JMS 消息队列,结合 JMS 中的两种模式,可以有多个消费者主动拉取数据,在 JMS 中只有点对点模式才有消费者主动拉取数据。
Kafka 是一个生产-消费模型。
Producer :消息生产者,就是向 Kafka Broker 发消息的客户端。
Consumer :消息消费者,向 Kafka Broker 取消息的客户端。
Topic :我们可以理解为一个队列。
Consumer Group CG这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个CG。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG ,但每个 Partion 只会把消息发给该 CG 中的一个 Consumer 。如果需要实现广播,只要每个 Consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 Consumer 在同一个 CG。用CG 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。
Broker :一台 Kafka 服务器就是一个 Broker 。一个集群由多个Broker组成。一个 Broker 可以容纳多个 Topic。
Partition :为了实现扩展性,一个非常大的 Topic 可以分布到多个Broker即服务器一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 Id Offset 。Kafka 只保证按一个 Partition 中的顺序将消息发给 Consumer ,不保证一个 Topic 的整体(多个 Partition间的顺序。
Offset Kafka 的存储文件都是按照 Offset . index 来命名用Offset 做名字的好处是方便查找。例如你想找位于`2049`的位置,只要找到 2048 . index 的文件即可。当然 the first offset 就是 00000000000 . index。
### 3.1.2 kafka应用场景
- 日志收集:一个公司可以用 Kafka 收集各种服务的 Log 通过Kafka 以统一接口服务的方式开放给各种 Consumer ,例如 Hadoop 、Hbase 、Solr 等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪Kafka 经常被用来记录 Web 用户或者 App 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 Kafka 的Topic 中,然后订阅者通过订阅这些 Topic 来做实时的监控分析,或者装载到 Hadoop 、数据仓库中做离线分析和挖掘。
- 运营指标Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如 Spark streaming 和 Storm、Flink。
- 事件源。
### 3.1.3 Kafka 架构组件
Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一个 Topic ,把向 Topic 发布消息的客户端称作 Producer ,从 Topic 订阅消息的客户端称作 Consumer 。Producers 和 Consumers 可以同时从多个 Topic 读写数据。一个 Kafka 集群由一个或多个 Broker 服务器组成,它负责持久化和备份具体的 Kafka 消息。
<p align="center" >
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/293503" alt="" height="100%" width="100%" />
</p>
### 3.1.4 kafka 常用命令
- 查看当前服务器中的所有 Topic
`bin/kafka-topics.sh --list --zookeeper zk01:2181`
- 创建 Topic
`./kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 3 --topic first`
*说明:`replication-factor` 是指副本数量,`partitions` 是指分区数量*
- 删除 Topic
`bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test`
需要 `server.properties` 中设置 `delete.topic.enable = true` 否则只是标记删除或者直接重启。
- 通过 Shell 命令发送消息
`kafka-console-producer.sh --broker-list kafka01:9092 --topic demo`
- 通过 Shell 消费消息
`bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1`
- 查看消费位置
`kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup`
- 查看某个 Topic 的详情
` kafka-topics.sh --topic test --describe --zookeeper zk01:2181`
说明 :此处的 `zk01` 是 Zookeeper 的 IP 地址, `kafka01` 是 Broker 的 IP 地址
### 3.1.5 使用kafka获取数据
安装 kafka 库
```
pip install kafka
pip install kafka-python
```
代码里导入库
```
from kafka import KafkaConsumer
```
从121.40.96.250:9092的kafka上获取主题为`jun`的数据:
```
consumer = KafkaConsumer('jun', bootstrap_servers=['121.40.96.250:9092'])
```

@ -0,0 +1,99 @@
## 3.1 对军事类数据进行实时统计分析
Spark Streaming 其优秀的特点给我们带来很多的应用场景。本文中,将通过从 kafka 获取数据来进行介绍。
### Kafka是什么
Kafka 是一种高吞吐量的分布式发布订阅消息系统,基于 zookeeper 协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景。
Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一个 Topic ,把向 Topic 发布消息的客户端称作 Producer ,从 Topic 订阅消息的客户端称作 Consumer 。Producers 和 Consumers 可以同时从多个 Topic 读写数据。一个 Kafka 集群由一个或多个 Broker 服务器组成,它负责持久化和备份具体的 Kafka 消息。
<p align="center" >
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/293503" alt="" height="100%" width="100%" />
</p>
### 3.1.9 Spark Streaming用户点击行为分析
接下来我们通过对飞机点击次数实时统计,场景流程如下:
- 读取 Kafka 实时数据;
- Spark Streaming 对数据进行处理;
- 将数据结果持久化到 kafka 中,跳转到步骤一
首先我们需要在 kafka 存入数据,创建 kafka Producer 代码如下:
```
import json
import time
import random
from kafka import KafkaProducer
# Producer 配置 kafka 地址 、序列化 及编码格式
producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'],
key_serializer=str.encode,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16']
for i in range(0, 10000):
#向 Topic 为 'topic1' 发布随机消息
future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0)
time.sleep(1)
producer.flush()
producer.close()
```
然后我们必须导入必要的类并创建一个本地SparkSession
```
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("getUserInfo") \
.getOrCreate()
```
接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 则是 Producer 中发布消息使用的 'topic1'。
```
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "121.40.96.250:9092") \
.option("subscribe", "topic1") \
.load()
```
创建临时表,通过 sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。
```
#通过`DataFrame`创建`planeNumber`表
df.createOrReplaceTempView("planeNumber")
#获取查询数据
sql=spark.sql("select count(*) key,value from yy group by value order by key desc ")
```
我们对流数据进行了查询剩下的就是实际开始接收数据并计算次数了。为此我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。然后使用开始流计算start()。
- 每1秒启动一次流式查询,并把最后结果存储到kafka
```
query = sql \
.selectExpr("CONCAT(value,'_',key)as value") \
.writeStream \
.format("kafka") \
.outputMode("Complete")\
.option("checkpointLocation", "/") \
.option("kafka.bootstrap.servers", "121.40.96.250:9092")\
.option("topic", "jun")\
.trigger(processingTime='1 seconds')\
.start()
```
- 等待停止指令
`query.awaitTermination()`
Loading…
Cancel
Save