diff --git a/chapter3/3.1Spark结构化流快速入门.md b/chapter3/3.1Spark结构化流快速入门.md index f80fb94..a3cfd5f 100644 --- a/chapter3/3.1Spark结构化流快速入门.md +++ b/chapter3/3.1Spark结构化流快速入门.md @@ -1,14 +1,14 @@ ## 3.1 Spark结构化流快速入门 -Spark Streaming是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从像kafka,Flume,Kinesis,或TCP套接字许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理map,reduce,join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,你可以在数据流上应用Spark的 机器学习和 图形处理算法。 +Spark Streaming 是核心 Spark API 的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。数据可以从像 kafka,Flume,Kinesis,或 TCP 套接字许多来源摄入,并且可以使用与像高级别功能表达复杂的算法来处理 map,reduce,join和window。最后,可以将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,你可以在数据流上应用 Spark 的 机器学习和 图形处理算法。 ![](https://www.educoder.net/api/attachments/504914) -在编写自己的Spark Streaming程序之前,让我们快速简单对Spark Streaming进行入门。 +在编写自己的 Spark Streaming 程序之前,让我们快速简单对 Spark Streaming 进行入门。 ### 3.1.1 一个简单的例子 -假设我们要计算从侦听TCP套接字的数据服务器接收到的文本数据中的单词数。 +假设我们要计算从侦听 TCP 套接字的数据服务器接收到的文本数据中的单词数。 -首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。 +首先,我们必须导入必要的类并创建一个本地 SparkSession ,这是与 Spark 相关的所有功能的起点。 ``` from pyspark.sql import SparkSession @@ -21,7 +21,7 @@ spark = SparkSession \ .getOrCreate() ``` -接下来,让我们创建一个流数据框架,该数据框架表示从侦听localhost:9999的服务器接收的文本数据,并转换该数据框架以计算字数。 +接下来,让我们创建一个流数据框架,该数据框架表示从侦听`localhost:9999`的服务器接收的文本数据,并转换该数据框架以计算字数。 ``` lines = spark \ .readStream \ @@ -41,9 +41,9 @@ words = lines.select( wordCounts = words.groupBy("word").count() ``` -此lines DataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此当前未接收到任何数据。接下来,我们使用了两个内置的SQL函数-split和explode,将每一行拆分为多行,每行各有一个单词。另外,我们使用函数alias将新列命名为“ word”。最后,我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。 +此 lines DataFrame 表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此当前未接收到任何数据。接下来,我们使用了两个内置的 SQL 函数`split`和`explode`,将每一行拆分为多行,每行各有一个单词。另外,我们使用函数`alias`将新列命名为'word'。最后,我们 `wordCounts` 通过对数据集中的唯一值进行分组并对其进行计数来定义 DataFrame 。请注意,这是一个流数据帧,它表示流的运行字数。 -现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。 +现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置`outputMode("complete")`为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算`start()`。 ``` # Start running the query that prints the running counts to the console @@ -56,16 +56,16 @@ query = wordCounts \ query.awaitTermination() ``` -执行此代码后,流计算将在后台开始执行。该query对象是该活动流查询的句柄,可以使用它来终止查询,awaitTermination()用来防止在该查询处于活动状态时退出该过程。 +执行此代码后,流计算将在后台开始执行。该 query 对象是该活动流查询的句柄,可以使用它来终止查询,`awaitTermination()`用来防止在该查询处于活动状态时退出该过程。 -要实际执行此示例代码,你还需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行。 +要实际执行此示例代码,你还需要通过使用以下命令将 Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行。 ``` nc -lk 9999 apache spark apache hadoop ``` -示例代码侦听localhost:9999的服务器输出如下: +示例代码侦听`localhost:9999`的服务器输出如下: ``` ------------------------------------------- Batch: 0 diff --git a/chapter3/3.2对飞机的点击次数实时统计.md b/chapter3/3.2对飞机的点击次数实时统计.md index 1b14bf8..00bedc9 100644 --- a/chapter3/3.2对飞机的点击次数实时统计.md +++ b/chapter3/3.2对飞机的点击次数实时统计.md @@ -28,7 +28,7 @@ $ cd zookeeper-3.4.6 $ mkdir data ``` -使用命令vi “conf/zoo.cfg"打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。 +使用 vi 命令打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。 ``` $ vi conf/zoo.cfg tickTime=2000 @@ -128,7 +128,7 @@ pip install pyspark pip install kafka-python ``` -然后我们必须导入必要的类并创建一个本地SparkSession +然后我们必须导入必要的类并创建一个本地 SparkSession ``` from pyspark.sql import SparkSession @@ -151,7 +151,7 @@ df = spark \ .load() ``` -创建临时表,通过 sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。 +创建临时表,通过 Sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。 ``` #通过`DataFrame`创建`planeNumber`表 @@ -160,9 +160,10 @@ df.createOrReplaceTempView("planeNumber") sql=spark.sql("select count(*) num,value from planeNumber group by value order by num desc ") ``` -我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。 +我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置`outputMode("complete")`为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算`start()`。 - 每1秒启动一次流式查询,并把最后结果存储到kafka + ``` query = sql \ .selectExpr("CONCAT(value,'_',num)as value") \ @@ -178,7 +179,7 @@ query = sql \ - 等待停止指令 -`query.awaitTermination()` + `query.awaitTermination()` 要想运行对飞机点击次数实时统计,我们还需要创建 kafka 的 Producer 来发布数据, 发布消息的 Topic 需要和 上面的订阅 Topic 一致, 创建 kafka Producer 代码如下: