diff --git a/chapter3/3.1Spark结构化流快速入门.md b/chapter3/3.1Spark结构化流快速入门.md index 841d59a..f80fb94 100644 --- a/chapter3/3.1Spark结构化流快速入门.md +++ b/chapter3/3.1Spark结构化流快速入门.md @@ -56,4 +56,36 @@ query = wordCounts \ query.awaitTermination() ``` -执行此代码后,流计算将在后台开始执行。该query对象是该活动流查询的句柄,可以使用它来终止查询,awaitTermination()用来防止在该查询处于活动状态时退出该过程。 \ No newline at end of file +执行此代码后,流计算将在后台开始执行。该query对象是该活动流查询的句柄,可以使用它来终止查询,awaitTermination()用来防止在该查询处于活动状态时退出该过程。 + +要实际执行此示例代码,你还需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行。 +``` +nc -lk 9999 +apache spark +apache hadoop +``` + +示例代码侦听localhost:9999的服务器输出如下: +``` +------------------------------------------- +Batch: 0 +------------------------------------------- ++------+-----+ +| value|count| ++------+-----+ +|apache| 1| +| spark| 1| ++------+-----+ + +------------------------------------------- +Batch: 1 +------------------------------------------- ++------+-----+ +| value|count| ++------+-----+ +|apache| 2| +| spark| 1| +|hadoop| 1| ++------+-----+ +... +``` \ No newline at end of file diff --git a/chapter3/3.2对飞机的点击次数实时统计.md b/chapter3/3.2对飞机的点击次数实时统计.md index 22bb7a6..5983d2a 100644 --- a/chapter3/3.2对飞机的点击次数实时统计.md +++ b/chapter3/3.2对飞机的点击次数实时统计.md @@ -24,29 +24,9 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一 - 将数据结果持久化到 kafka 中,跳转到步骤一 -首先我们需要在 kafka 存入数据,创建 kafka Producer 代码如下: -``` -import json -import time -import random -from kafka import KafkaProducer -# Producer 配置 kafka 地址 、序列化 及编码格式 -producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'], - key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('utf-8')) - -f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16'] -for i in range(0, 10000): - #向 Topic 为 'topic1' 发布随机消息 - future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0) - time.sleep(1) -producer.flush() -producer.close() -``` - -然后我们必须导入必要的类并创建一个本地SparkSession +首先我们必须导入必要的类并创建一个本地SparkSession ``` from pyspark.sql import SparkSession @@ -58,7 +38,7 @@ spark = SparkSession \ .getOrCreate() ``` -接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 则是 Producer 中发布消息使用的 'topic1'。 +接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 设置为 `topic1`。 ``` df = spark \ @@ -75,15 +55,15 @@ df = spark \ #通过`DataFrame`创建`planeNumber`表 df.createOrReplaceTempView("planeNumber") #获取查询数据 -sql=spark.sql("select count(*) key,value from yy group by value order by key desc ") +sql=spark.sql("select count(*) num,value from yy group by value order by num desc ") ``` -我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。然后使用开始流计算start()。 +我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。 - 每1秒启动一次流式查询,并把最后结果存储到kafka ``` query = sql \ - .selectExpr("CONCAT(value,'_',key)as value") \ + .selectExpr("CONCAT(value,'_',num)as value") \ .writeStream \ .format("kafka") \ .outputMode("Complete")\ @@ -97,3 +77,106 @@ query = sql \ - 等待停止指令 `query.awaitTermination()` + + +要想运行对飞机点击次数实时统计,我们还需要创建 kafka 的 Producer 来发布数据, 发布消息的 Topic 需要和 上面的订阅 Topic 一致, 创建 kafka Producer 代码如下: + +``` +import json +import time +import random +from kafka import KafkaProducer + +# Producer 配置 kafka 地址 、序列化 及编码格式 +producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'], + key_serializer=str.encode, + value_serializer=lambda v: json.dumps(v).encode('utf-8')) + +f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16'] +for i in range(0, 10000): + #向 Topic 为 'topic1' 发布随机消息 + future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0) + time.sleep(1) +producer.flush() +producer.close() +``` + +运行流查询代码控制台输出如下: +``` +[Stage 223:===========> (41 + 2) / 200] +[Stage 223:==============> (53 + 2) / 200] +[Stage 223:==================> (70 + 2) / 200] +[Stage 223:=======================> (88 + 2) / 200] +[Stage 223:===========================> (105 + 2) / 200] +[Stage 223:===============================> (120 + 2) / 200] +[Stage 223:====================================> (137 + 2) / 200] +[Stage 223:=========================================> (155 + 2) / 200] +[Stage 223:===========================================> (165 + 2) / 200] +[Stage 223:================================================> (184 + 2) / 200] + + +[Stage 226:=================> (65 + 2) / 200] +[Stage 226:======================> (83 + 2) / 200] +[Stage 226:========================> (91 + 2) / 200] +[Stage 226:=============================> (113 + 2) / 200] +[Stage 226:===================================> (135 + 2) / 200] +[Stage 226:=========================================> (157 + 2) / 200] +[Stage 226:===============================================> (179 + 2) / 200] +[Stage 226:====================================================>(197 + 2) / 200] +``` + +因为结果存储到 Kafka , 控制台上没有明显结果输出,接下来我们可以订阅 Kafka Topic 为 `jun` 的消息来获取数据。 +``` +from kafka import KafkaConsumer + +if __name__ == "__main__": + consumer = KafkaConsumer('jun', bootstrap_servers=['121.40.96.250:9092']) + for msg in consumer: + print(msg.value.decode('utf-8')) +``` + +控制台输出部分结果如下: +``` +"J16"_140 +"J20"_121 +"HY"_112 +"Y200"_116 +"F35"_106 +"S30"_107 +"F15"_99 +"J20"_122 +"J16"_140 +"Y200"_117 +"HY"_114 +"F35"_106 +"S30"_108 +"F15"_101 +"J16"_141 +"J20"_122 +"HY"_114 +"Y200"_118 +"S30"_110 +"F35"_109 +"F15"_101 +"J16"_143 +"J20"_124 +"Y200"_119 +"HY"_115 +"F15"_101 +"S30"_110 +"F35"_110 +"J16"_143 +"J20"_124 +"Y200"_119 +"HY"_117 +"F35"_112 +"S30"_111 +"F15"_103 +"J16"_144 +"J20"_124 +"HY"_119 +"Y200"_120 +"S30"_111 +"F35"_112 +"F15"_106 +``` \ No newline at end of file