From 1e6efc3d3f3dd32d6295d96734a64cfc1cdc0dd9 Mon Sep 17 00:00:00 2001 From: zy <370308065@qq.com> Date: Tue, 26 Nov 2019 13:59:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SUMMARY.md | 4 +- chapter3/3.1Spark结构化流快速入门.md | 59 +++++ ...的查询次数进行实时统计分析.md | 237 ------------------ ...据的查询次数进行可视化展示.md | 86 ------- ...3.2对飞机的点击次数实时统计.md | 99 ++++++++ 5 files changed, 160 insertions(+), 325 deletions(-) create mode 100644 chapter3/3.1Spark结构化流快速入门.md delete mode 100644 chapter3/3.1对军事类数据的查询次数进行实时统计分析.md delete mode 100644 chapter3/3.2对军事类数据的查询次数进行可视化展示.md create mode 100644 chapter3/3.2对飞机的点击次数实时统计.md diff --git a/SUMMARY.md b/SUMMARY.md index 3373a90..42c2796 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -14,8 +14,8 @@ * [第三章 SparkStreaming流数据计算与分析](/chapter3/3流数据计算与分析简介.md) - * [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md) - * [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对军事类数据的查询次数进行可视化展示.md) + * [3.1 对军事类数据的查询次数进行实时统计分析](/chapter3/3.1Spark结构化流快速入门.md) + * [3.2 对军事类数据的查询次数进行可视化展示](/chapter3/3.2对飞机的点击次数实时统计.md) * [第四章 Spark图数据计算与分析实战](/chapter4/4Spark图数据计算简介.md) * [4.1 SparkGraphX定义图结构](/chapter4/4.1SparkGraphX定义图结构.md) * [4.2 SparkGraphX军用物资运输路线规划](/chapter4/4.2SparkGraphX军用物资运输路线规划.md) diff --git a/chapter3/3.1Spark结构化流快速入门.md b/chapter3/3.1Spark结构化流快速入门.md new file mode 100644 index 0000000..d3e12a2 --- /dev/null +++ b/chapter3/3.1Spark结构化流快速入门.md @@ -0,0 +1,59 @@ +## 3.1 Spark结构化流快速入门 +Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从像kafka,Flume,Kinesis,或TCP套接字许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,你可以在数据流上应用Spark的 机器学习和 图形处理算法。 +![](https://www.educoder.net/api/attachments/504914) + +在编写自己的Spark Streaming程序之前,让我们快速简单对Spark Streaming进行入门。 + +### 3.1.1 一个简单的例子 + +假设我们要计算从侦听TCP套接字的数据服务器接收到的文本数据中的单词数。 + +首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。 + +``` +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split + +spark = SparkSession \ + .builder \ + .appName("StructuredNetworkWordCount") \ + .getOrCreate() +``` + +接下来,让我们创建一个流数据框架,该数据框架表示从侦听localhost:9999的服务器接收的文本数据,并转换该数据框架以计算字数。 +``` +lines = spark \ + .readStream \ + .format("socket") \ + .option("host", "localhost") \ + .option("port", 9999) \ + .load() + +# Split the lines into words +words = lines.select( + explode( + split(lines.value, " ") + ).alias("word") +) + +# Generate running word count +wordCounts = words.groupBy("word").count() +``` + +此lines DataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此当前未接收到任何数据。接下来,我们使用了两个内置的SQL函数-split和explode,将每一行拆分为多行,每行各有一个单词。另外,我们使用函数alias将新列命名为“ word”。最后,我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。 + +现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。 + +``` + # Start running the query that prints the running counts to the console +query = wordCounts \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ + .start() + +query.awaitTermination() +``` + +执行此代码后,流计算将在后台开始。该query对象是该活动流查询的句柄,我们已决定使用来等待查询终止,awaitTermination()以防止在该查询处于活动状态时退出该过程。 \ No newline at end of file diff --git a/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md b/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md deleted file mode 100644 index 89b78e2..0000000 --- a/chapter3/3.1对军事类数据的查询次数进行实时统计分析.md +++ /dev/null @@ -1,237 +0,0 @@ -## 3.1 对军事类数据的查询次数进行实时统计分析 -Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。本关结合我们的应用场景,介结我们如何使用 Spark Streaming处理数据。 - -### 3.1.1 概述 - -Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。其架构见下图: - -

- -

- -Spark Streaming 其优秀的特点给我们带来很多的应用场景。本文中,将通过从kafka获取用户行为来为大家进行介绍。场景流程如下: -- 读取 Kafka 实时数据; - -- Spark Streaming 对数据进行处理; - -- 将数据结果持久化到kafka中,跳转到步骤一。 - - - -### 3.1.3 基本概念 - -结构化流(Structed streaming)中的关键思想是将实时数据流视为连续追加的表 - -

- -

- - - -程序模型: - -每个触发(Triggers)间隔(例如,每`1`秒)将新行附加到输入表,经查询产生结果表,并最终写入外部Sink。 - -

- -

- -### 3.1.4 创建流式Dataframe的说明 -`Spark2.0`以后,`DataFrames`和`Datasets`不仅可以用于`sparksql`中表示静态有界的数据,而且可以在`Structer streaming`中表示流式无界数据。我们可以通过`SparkSession.readStream()`创建流式`DataFrames`; - -### 3.1.5 输入源(Input Sources) -文件源(File source) - -将目录中写入的文件作为数据流读取。支持的文件格式为`text`,`csv`,`json`,`orc`,`parquet`。 - -| 参数 | 说明 | -| ------------ | ------------ | -| path | 输入文件目录 | -| maxFilesPerTrigger | 每个触发器中要考虑的最大文件数(默认值:无最大值) | -|latestFirst|是否先处理最新的新文件,当存在大量积压的文件时有用(默认值:false)| -|fileNameOnly|例:是否认定以下两个文件是否是一个文件 “file:///dataset.txt” ,“s3:// a / dataset.txt“ ,默认false,不属于同一个文件| - - -Socket源(Socket Source) -用于测试,从`socket`连接中读取`UTF8`文本数据。 - -| 参数 | 说明 | -| ------------ | ------------ | -| host | 必须指定,连接的主机 | -| port |必须指定,连接的端口 | - -Rate源(Rate Source) -用于测试,以每秒指定的行数生成数据,每个输出行包含一个`timestamp`和`value`。 - - -Kafka源(Kafka Source) -从`Kafka`中读取数据 -示例: -``` -val spark: SparkSession = ... - -// 从socket读数据 -socketDF = spark - .readStream - .format("socket") - .option("host", "localhost") - .option("port", 9999) - .load() - -// 从目录读取csv文件 -userSchema = new StructType().add("name", "string").add("age", "integer") -csvDF = spark - .readStream - .option("sep", ";") - .schema(userSchema) - .csv("/path/to/directory") // 等同于 format("csv").load("/path/to/directory") - -``` - -### 3.1.6 启动流式查询说明 - -使用`result.writeStream()`且指定以下一项或者某几项: - -- 输出`sink`的详细信息:比如`Data format`, `location`等 - -- 输出模式:`Append`模式(默认的),`Complete`模式,`Update`模式 - -| 模式 | 说明| -| ------------ | ------------ | -| Append模式 | 自上次触发后添加到结果表的新行才会输出到`sink`。只支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。`select`,` where`,`map`,`flatMap`,`filter`,`join`,等会支持追加模式。 | -| Complete模式 | 每次触发后,整个结果表将输出到接收器。支持聚合查询 | -|Update模式|仅将结果表中自上次触发后更新的行输出到接收器| - - -- 查询名称:可选配置,指定查询的唯一名称以进行标识。 - -- 触发间隔:指定触发间隔。 - -- 检查点位置:对于可以保证端到端容错的某些输出接收器,需要指定系统写入检查点信息的目录。 - -### 3.1.7 输出Sink -以下是几种内置输出`sink` - -File sink:将输出结果保存到一个指定目录 - -``` -writeStream - .format("parquet") // 可以是"orc", "json","csv"等. - .option("path", "path/to/destination/dir") - .start() -``` - -Kafka sink:将输出结果保存到一个或者多个`topic` -``` -writeStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("topic", "updates") - .start() -``` -Foreach sink:对输出中的记录进行任意计算,可用于自定义`sink` -``` -writeStream - .foreach(...) - .start() -``` -Console sink :用于调试,每次触发时将输出打印到控制台 -``` -writeStream - .format("console") - .start() -``` -Memory sink :用于调试,输出存储在内存中 -``` -writeStream - .format("memory") - .queryName("tableName") - .start() -``` - -### 3.1.8 触发器(Triggers) - -定义了流式数据处理的时间,查询时是使用具有固定批处理间隔的微批量查询还是作为连续处理查询来执行。有以下几种方式: - - -| 触发器类型 |说明 | -| ------------ | ------------ | -| 不指定(默认) | 查询将以微批处理模式执行,一旦前一个微批处理完成处理,将立即生成微批处理。 | -| 固定微批处理(Fixed interval micro-batches) | 查询将以微批处理模式执行,其中微批处理将以用户指定时间间隔启动。如果先前的微批次在该间隔内完成,将等待该间隔结束,然后开始下一个微批次。如果前一个微批次需要的时间长于完成的间隔(即如果错过了间隔边界),则下一个微批次将在前一个完成后立即开始(即,它不会等待下一个间隔边界) )。如果没有可用的新数据,则不会启动微批次。 | -| 一次微批处理(One-time micro-batch) | 执行*仅一个*微批处理所有可用数据,然后自行停止。 | -|连续固定检查点间隔(Continuous with fixed checkpoint interval)|处于试验阶段,将以新的低延迟,连续处理模式执行。| - - -``` -// 默认 trigger -df.writeStream - .format("console") - .start() - -// 固定微批处理 -df.writeStream - .format("console") - .trigger(Trigger.ProcessingTime("2 seconds")) - .start() - -// 一次微批处理 -df.writeStream - .format("console") - .trigger(Trigger.Once()) - .start() - -// 连续固定检查点间隔 -df.writeStream - .format("console") - .trigger(Trigger.Continuous("1 second")) - .start() -``` - -### 3.1.9 Spark Streaming用户行为分析 -我们要从kafka获取数据, 需要先创建`Sparksession` - -- 创建`Sparksession` - -``` -spark = SparkSession \ - .builder \ - .master("local[2]") \ - .appName("getUserInfo") \ - .getOrCreate() -``` - -- 从kafka主题`topic1`创建流式`DataFrame` - -``` -df = spark \ - .readStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "121.40.96.250:9092") \ - .option("subscribe", "topic1") \ - .load() -``` - -- 创建临时表,通过sparksql获取用户查询次数 -``` -#通过`DataFrame`创建`yy`表 -df.createOrReplaceTempView("yy") -#获取查询数据 -sql=spark.sql("select count(*) from yy ") -``` - -- 每五秒启动一次流式查询,并把最后结果存储到kafka -``` -query = sql \ - .selectExpr("CAST(text AS STRING)as value")\ - .writeStream \ - .format("kafka") \ - .outputMode("complete")\ - .option("kafka.bootstrap.servers", "121.40.96.250:9092")\ - .option("topic", "jun")\ - .trigger(processingTime='5 seconds')\ - .start() -``` - -- 等待停止指令 - -`query.awaitTermination()` diff --git a/chapter3/3.2对军事类数据的查询次数进行可视化展示.md b/chapter3/3.2对军事类数据的查询次数进行可视化展示.md deleted file mode 100644 index d8e08d7..0000000 --- a/chapter3/3.2对军事类数据的查询次数进行可视化展示.md +++ /dev/null @@ -1,86 +0,0 @@ -## 3.1 对军事类数据的查询次数进行可视化展示 -Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。上一关我们介绍了如何实时获取数据,并处理数据,本关结合上一关的场景,介结我们如何把实时数据进行可视化。 - -### 3.1.1 Apache Kafka 概述 - -类 JMS 消息队列,结合 JMS 中的两种模式,可以有多个消费者主动拉取数据,在 JMS 中只有点对点模式才有消费者主动拉取数据。 - -Kafka 是一个生产-消费模型。 - -Producer :消息生产者,就是向 Kafka Broker 发消息的客户端。 - -Consumer :消息消费者,向 Kafka Broker 取消息的客户端。 - -Topic :我们可以理解为一个队列。 - -Consumer Group (CG):这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个CG。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG ,但每个 Partion 只会把消息发给该 CG 中的一个 Consumer 。如果需要实现广播,只要每个 Consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 Consumer 在同一个 CG。用CG 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。 - -Broker :一台 Kafka 服务器就是一个 Broker 。一个集群由多个Broker组成。一个 Broker 可以容纳多个 Topic。 - -Partition :为了实现扩展性,一个非常大的 Topic 可以分布到多个Broker(即服务器)上,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 Id( Offset )。Kafka 只保证按一个 Partition 中的顺序将消息发给 Consumer ,不保证一个 Topic 的整体(多个 Partition间)的顺序。 - -Offset :Kafka 的存储文件都是按照 Offset . index 来命名,用Offset 做名字的好处是方便查找。例如你想找位于`2049`的位置,只要找到 2048 . index 的文件即可。当然 the first offset 就是 00000000000 . index。 - - - -### 3.1.2 kafka应用场景 -- 日志收集:一个公司可以用 Kafka 收集各种服务的 Log ,通过Kafka 以统一接口服务的方式开放给各种 Consumer ,例如 Hadoop 、Hbase 、Solr 等。 -- 消息系统:解耦和生产者和消费者、缓存消息等。 -- 用户活动跟踪:Kafka 经常被用来记录 Web 用户或者 App 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 Kafka 的Topic 中,然后订阅者通过订阅这些 Topic 来做实时的监控分析,或者装载到 Hadoop 、数据仓库中做离线分析和挖掘。 -- 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 -- 流式处理:比如 Spark streaming 和 Storm、Flink。 -- 事件源。 - -### 3.1.3 Kafka 架构组件 -Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一个 Topic ,把向 Topic 发布消息的客户端称作 Producer ,从 Topic 订阅消息的客户端称作 Consumer 。Producers 和 Consumers 可以同时从多个 Topic 读写数据。一个 Kafka 集群由一个或多个 Broker 服务器组成,它负责持久化和备份具体的 Kafka 消息。 - -

- -

- - -### 3.1.4 kafka 常用命令 -- 查看当前服务器中的所有 Topic -`bin/kafka-topics.sh --list --zookeeper zk01:2181` -- 创建 Topic -`./kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 3 --topic first` - - *说明:`replication-factor` 是指副本数量,`partitions` 是指分区数量* - -- 删除 Topic -`bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test` -需要 `server.properties` 中设置 `delete.topic.enable = true` 否则只是标记删除或者直接重启。 -- 通过 Shell 命令发送消息 -`kafka-console-producer.sh --broker-list kafka01:9092 --topic demo` -- 通过 Shell 消费消息 -`bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1` -- 查看消费位置 -`kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup` -- 查看某个 Topic 的详情 -` kafka-topics.sh --topic test --describe --zookeeper zk01:2181` - -说明 :此处的 `zk01` 是 Zookeeper 的 IP 地址, `kafka01` 是 Broker 的 IP 地址 - -### 3.1.5 使用kafka获取数据 - -安装 kafka 库 -``` -pip install kafka -pip install kafka-python -``` - -代码里导入库 - -``` -from kafka import KafkaConsumer -``` - - -从121.40.96.250:9092的kafka上,获取主题为`jun`的数据: - -``` -consumer = KafkaConsumer('jun', bootstrap_servers=['121.40.96.250:9092']) - -``` - - diff --git a/chapter3/3.2对飞机的点击次数实时统计.md b/chapter3/3.2对飞机的点击次数实时统计.md new file mode 100644 index 0000000..dedd178 --- /dev/null +++ b/chapter3/3.2对飞机的点击次数实时统计.md @@ -0,0 +1,99 @@ +## 3.1 对军事类数据进行实时统计分析 + +Spark Streaming 其优秀的特点给我们带来很多的应用场景。本文中,将通过从 kafka 获取数据来进行介绍。 + +### Kafka是什么 +Kafka 是一种高吞吐量的分布式发布订阅消息系统,基于 zookeeper 协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景。 + +Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一个 Topic ,把向 Topic 发布消息的客户端称作 Producer ,从 Topic 订阅消息的客户端称作 Consumer 。Producers 和 Consumers 可以同时从多个 Topic 读写数据。一个 Kafka 集群由一个或多个 Broker 服务器组成,它负责持久化和备份具体的 Kafka 消息。 + +

+ +

+ + + + + +### 3.1.9 Spark Streaming用户点击行为分析 + +接下来我们通过对飞机点击次数实时统计,场景流程如下: +- 读取 Kafka 实时数据; + +- Spark Streaming 对数据进行处理; + +- 将数据结果持久化到 kafka 中,跳转到步骤一 + +首先我们需要在 kafka 存入数据,创建 kafka Producer 代码如下: + +``` +import json +import time +import random +from kafka import KafkaProducer + +# Producer 配置 kafka 地址 、序列化 及编码格式 +producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'], + key_serializer=str.encode, + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + +f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16'] +for i in range(0, 10000): + #向 Topic 为 'topic1' 发布随机消息 + future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0) + time.sleep(1) +producer.flush() +producer.close() +``` + +然后我们必须导入必要的类并创建一个本地SparkSession + +``` +from pyspark.sql import SparkSession + +spark = SparkSession \ + .builder \ + .master("local[2]") \ + .appName("getUserInfo") \ + .getOrCreate() +``` + +接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 则是 Producer 中发布消息使用的 'topic1'。 + +``` +df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "121.40.96.250:9092") \ + .option("subscribe", "topic1") \ + .load() +``` + +创建临时表,通过 sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。 + +``` +#通过`DataFrame`创建`planeNumber`表 +df.createOrReplaceTempView("planeNumber") +#获取查询数据 +sql=spark.sql("select count(*) key,value from yy group by value order by key desc ") +``` + +我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。然后使用开始流计算start()。 + +- 每1秒启动一次流式查询,并把最后结果存储到kafka +``` +query = sql \ + .selectExpr("CONCAT(value,'_',key)as value") \ + .writeStream \ + .format("kafka") \ + .outputMode("Complete")\ + .option("checkpointLocation", "/") \ + .option("kafka.bootstrap.servers", "121.40.96.250:9092")\ + .option("topic", "jun")\ + .trigger(processingTime='1 seconds')\ + .start() +``` + +- 等待停止指令 + +`query.awaitTermination()`