|
|
|
@ -24,29 +24,9 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一
|
|
|
|
|
|
|
|
|
|
- 将数据结果持久化到 kafka 中,跳转到步骤一
|
|
|
|
|
|
|
|
|
|
首先我们需要在 kafka 存入数据,创建 kafka Producer 代码如下:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
import random
|
|
|
|
|
from kafka import KafkaProducer
|
|
|
|
|
|
|
|
|
|
# Producer 配置 kafka 地址 、序列化 及编码格式
|
|
|
|
|
producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'],
|
|
|
|
|
key_serializer=str.encode,
|
|
|
|
|
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
|
|
|
|
|
|
|
|
|
|
f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16']
|
|
|
|
|
for i in range(0, 10000):
|
|
|
|
|
#向 Topic 为 'topic1' 发布随机消息
|
|
|
|
|
future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0)
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
producer.flush()
|
|
|
|
|
producer.close()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
然后我们必须导入必要的类并创建一个本地SparkSession
|
|
|
|
|
首先我们必须导入必要的类并创建一个本地SparkSession
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
from pyspark.sql import SparkSession
|
|
|
|
@ -58,7 +38,7 @@ spark = SparkSession \
|
|
|
|
|
.getOrCreate()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 则是 Producer 中发布消息使用的 'topic1'。
|
|
|
|
|
接下来,让我们创建一个流数据框架,该数据框架表示从 kafka Topic 订阅消息。 而 订阅 的Topic 设置为 `topic1`。
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
df = spark \
|
|
|
|
@ -75,15 +55,15 @@ df = spark \
|
|
|
|
|
#通过`DataFrame`创建`planeNumber`表
|
|
|
|
|
df.createOrReplaceTempView("planeNumber")
|
|
|
|
|
#获取查询数据
|
|
|
|
|
sql=spark.sql("select count(*) key,value from yy group by value order by key desc ")
|
|
|
|
|
sql=spark.sql("select count(*) num,value from yy group by value order by num desc ")
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。然后使用开始流计算start()。
|
|
|
|
|
我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。
|
|
|
|
|
|
|
|
|
|
- 每1秒启动一次流式查询,并把最后结果存储到kafka
|
|
|
|
|
```
|
|
|
|
|
query = sql \
|
|
|
|
|
.selectExpr("CONCAT(value,'_',key)as value") \
|
|
|
|
|
.selectExpr("CONCAT(value,'_',num)as value") \
|
|
|
|
|
.writeStream \
|
|
|
|
|
.format("kafka") \
|
|
|
|
|
.outputMode("Complete")\
|
|
|
|
@ -97,3 +77,106 @@ query = sql \
|
|
|
|
|
- 等待停止指令
|
|
|
|
|
|
|
|
|
|
`query.awaitTermination()`
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
要想运行对飞机点击次数实时统计,我们还需要创建 kafka 的 Producer 来发布数据, 发布消息的 Topic 需要和 上面的订阅 Topic 一致, 创建 kafka Producer 代码如下:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
import random
|
|
|
|
|
from kafka import KafkaProducer
|
|
|
|
|
|
|
|
|
|
# Producer 配置 kafka 地址 、序列化 及编码格式
|
|
|
|
|
producer = KafkaProducer(bootstrap_servers=['121.40.96.250:9092'],
|
|
|
|
|
key_serializer=str.encode,
|
|
|
|
|
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
|
|
|
|
|
|
|
|
|
|
f = ['J20', 'F35', 'F15', 'Y200', 'HY', 'S30', 'J16']
|
|
|
|
|
for i in range(0, 10000):
|
|
|
|
|
#向 Topic 为 'topic1' 发布随机消息
|
|
|
|
|
future = producer.send('topic1', key='null', value=random.sample(f, 1)[0], partition=0)
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
producer.flush()
|
|
|
|
|
producer.close()
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
运行流查询代码控制台输出如下:
|
|
|
|
|
```
|
|
|
|
|
[Stage 223:===========> (41 + 2) / 200]
|
|
|
|
|
[Stage 223:==============> (53 + 2) / 200]
|
|
|
|
|
[Stage 223:==================> (70 + 2) / 200]
|
|
|
|
|
[Stage 223:=======================> (88 + 2) / 200]
|
|
|
|
|
[Stage 223:===========================> (105 + 2) / 200]
|
|
|
|
|
[Stage 223:===============================> (120 + 2) / 200]
|
|
|
|
|
[Stage 223:====================================> (137 + 2) / 200]
|
|
|
|
|
[Stage 223:=========================================> (155 + 2) / 200]
|
|
|
|
|
[Stage 223:===========================================> (165 + 2) / 200]
|
|
|
|
|
[Stage 223:================================================> (184 + 2) / 200]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
[Stage 226:=================> (65 + 2) / 200]
|
|
|
|
|
[Stage 226:======================> (83 + 2) / 200]
|
|
|
|
|
[Stage 226:========================> (91 + 2) / 200]
|
|
|
|
|
[Stage 226:=============================> (113 + 2) / 200]
|
|
|
|
|
[Stage 226:===================================> (135 + 2) / 200]
|
|
|
|
|
[Stage 226:=========================================> (157 + 2) / 200]
|
|
|
|
|
[Stage 226:===============================================> (179 + 2) / 200]
|
|
|
|
|
[Stage 226:====================================================>(197 + 2) / 200]
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
因为结果存储到 Kafka , 控制台上没有明显结果输出,接下来我们可以订阅 Kafka Topic 为 `jun` 的消息来获取数据。
|
|
|
|
|
```
|
|
|
|
|
from kafka import KafkaConsumer
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
consumer = KafkaConsumer('jun', bootstrap_servers=['121.40.96.250:9092'])
|
|
|
|
|
for msg in consumer:
|
|
|
|
|
print(msg.value.decode('utf-8'))
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
控制台输出部分结果如下:
|
|
|
|
|
```
|
|
|
|
|
"J16"_140
|
|
|
|
|
"J20"_121
|
|
|
|
|
"HY"_112
|
|
|
|
|
"Y200"_116
|
|
|
|
|
"F35"_106
|
|
|
|
|
"S30"_107
|
|
|
|
|
"F15"_99
|
|
|
|
|
"J20"_122
|
|
|
|
|
"J16"_140
|
|
|
|
|
"Y200"_117
|
|
|
|
|
"HY"_114
|
|
|
|
|
"F35"_106
|
|
|
|
|
"S30"_108
|
|
|
|
|
"F15"_101
|
|
|
|
|
"J16"_141
|
|
|
|
|
"J20"_122
|
|
|
|
|
"HY"_114
|
|
|
|
|
"Y200"_118
|
|
|
|
|
"S30"_110
|
|
|
|
|
"F35"_109
|
|
|
|
|
"F15"_101
|
|
|
|
|
"J16"_143
|
|
|
|
|
"J20"_124
|
|
|
|
|
"Y200"_119
|
|
|
|
|
"HY"_115
|
|
|
|
|
"F15"_101
|
|
|
|
|
"S30"_110
|
|
|
|
|
"F35"_110
|
|
|
|
|
"J16"_143
|
|
|
|
|
"J20"_124
|
|
|
|
|
"Y200"_119
|
|
|
|
|
"HY"_117
|
|
|
|
|
"F35"_112
|
|
|
|
|
"S30"_111
|
|
|
|
|
"F15"_103
|
|
|
|
|
"J16"_144
|
|
|
|
|
"J20"_124
|
|
|
|
|
"HY"_119
|
|
|
|
|
"Y200"_120
|
|
|
|
|
"S30"_111
|
|
|
|
|
"F35"_112
|
|
|
|
|
"F15"_106
|
|
|
|
|
```
|