master
zy 6 years ago
parent c85cf7127c
commit ab9d1dad42

@ -21,7 +21,7 @@ spark = SparkSession \
.getOrCreate() .getOrCreate()
``` ```
接下来让我们创建一个流数据框架该数据框架表示从侦听localhost9999的服务器接收的文本数据并转换该数据框架以计算字数。 接下来,让我们创建一个流数据框架,该数据框架表示从侦听`localhost9999`的服务器接收的文本数据,并转换该数据框架以计算字数。
``` ```
lines = spark \ lines = spark \
.readStream \ .readStream \
@ -41,9 +41,9 @@ words = lines.select(
wordCounts = words.groupBy("word").count() wordCounts = words.groupBy("word").count()
``` ```
此lines DataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串流文本数据中的每一行都成为表中的一行。请注意由于我们正在设置转换并且尚未开始转换因此当前未接收到任何数据。接下来我们使用了两个内置的SQL函数-split和explode将每一行拆分为多行每行各有一个单词。另外我们使用函数alias将新列命名为“ word”。最后我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意这是一个流数据帧它表示流的运行字数。 lines DataFrame 表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此当前未接收到任何数据。接下来,我们使用了两个内置的 SQL 函数`split`和`explode`,将每一行拆分为多行,每行各有一个单词。另外,我们使用函数`alias`将新列命名为'word'。最后,我们 `wordCounts` 通过对数据集中的唯一值进行分组并对其进行计数来定义 DataFrame 。请注意,这是一个流数据帧,它表示流的运行字数。
现在我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集由指定打印到控制台。然后使用开始流计算start()。 现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置`outputMode("complete")`为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算`start()`
``` ```
# Start running the query that prints the running counts to the console # Start running the query that prints the running counts to the console
@ -56,7 +56,7 @@ query = wordCounts \
query.awaitTermination() query.awaitTermination()
``` ```
执行此代码后流计算将在后台开始执行。该query对象是该活动流查询的句柄可以使用它来终止查询awaitTermination()用来防止在该查询处于活动状态时退出该过程。 执行此代码后,流计算将在后台开始执行。该 query 对象是该活动流查询的句柄,可以使用它来终止查询,`awaitTermination()`用来防止在该查询处于活动状态时退出该过程。
要实际执行此示例代码,你还需要通过使用以下命令将 Netcat在大多数类Unix系统中找到的一个小实用程序作为数据服务器运行。 要实际执行此示例代码,你还需要通过使用以下命令将 Netcat在大多数类Unix系统中找到的一个小实用程序作为数据服务器运行。
``` ```
@ -65,7 +65,7 @@ apache spark
apache hadoop apache hadoop
``` ```
示例代码侦听localhost9999的服务器输出如下 示例代码侦听`localhost9999`的服务器输出如下:
``` ```
------------------------------------------- -------------------------------------------
Batch: 0 Batch: 0

@ -28,7 +28,7 @@ $ cd zookeeper-3.4.6
$ mkdir data $ mkdir data
``` ```
使用命令vi “conf/zoo.cfg"打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。 使用 vi 命令打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。
``` ```
$ vi conf/zoo.cfg $ vi conf/zoo.cfg
tickTime=2000 tickTime=2000
@ -151,7 +151,7 @@ df = spark \
.load() .load()
``` ```
创建临时表,通过 sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。 创建临时表,通过 Sparksql 统计流文本数据中相同名称飞机的出现次数,并按照降序排序。
``` ```
#通过`DataFrame`创建`planeNumber`表 #通过`DataFrame`创建`planeNumber`表
@ -160,9 +160,10 @@ df.createOrReplaceTempView("planeNumber")
sql=spark.sql("select count(*) num,value from planeNumber group by value order by num desc ") sql=spark.sql("select count(*) num,value from planeNumber group by value order by num desc ")
``` ```
我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。 我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置`outputMode("complete")`为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算`start()`
- 每1秒启动一次流式查询,并把最后结果存储到kafka - 每1秒启动一次流式查询,并把最后结果存储到kafka
``` ```
query = sql \ query = sql \
.selectExpr("CONCAT(value,'_',num)as value") \ .selectExpr("CONCAT(value,'_',num)as value") \

Loading…
Cancel
Save