master
zy 6 years ago
parent 4e691c06d4
commit c85cf7127c

@ -11,6 +11,103 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一
<img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/293503" alt="" height="100%" width="100%" /> <img style="border: 2px solid #ddd;padding: 5px; background: #fff;" src="https://www.educoder.net/api/attachments/293503" alt="" height="100%" width="100%" />
</p> </p>
### 3.1.2 Kafka安装
首先需要在你的计算机上安装 ZooKeeper 框架,请访问以下链接并下载最新版本的 ZooKeeper。
http://zookeeper.apache.org/releases.html
现在,最新版本的 ZooKeeper 是 3.4.6(ZooKeeper-3.4.6.tar.gz)。
使用以下命令提取tar文件
```
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
```
使用命令vi “conf/zoo.cfg"打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。
```
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
```
一旦配置文件成功保存并再次返回终端,你可以启动 zookeeper 服务器。
可以使用如下命令启动 ZooKeeper 服务器:
```
$ bin/zkServer.sh start
```
执行此命令后,你将得到如下所示的响应
```
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
```
```
$ bin/zkCli.sh
```
输入上面的命令后你将被连接到zookeeper服务器并将获得以下响应。
```
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
```
连接服务器并执行所有操作后,可以使用以下命令停止 zookeeper 服务器 -
```
$ bin/zkServer.sh stop
```
现在你已经在你的机器上成功安装了 Java 和 ZooKeeper 。 让我们看看安装 Apache Kafka 的步骤。
要在您的机器上安装Kafka
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz 把 kafka_2.11_0.9.0.0.tgz 将下载到您的计算机上。
使用以下命令提取tar文件
```
$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
```
现在您已经在你的机器上下载了最新版本的 Kafka。
你可以通过给出以下命令来启动服务器
```
$ bin/kafka-server-start.sh config/server.properties
```
服务器启动后,您会在屏幕上看到以下响应:
```
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
```
执行所有操作后,可以使用以下命令停止服务器
```
$ bin/kafka-server-stop.sh config/server.properties
```
@ -24,9 +121,14 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一
- 将数据结果持久化到 kafka 中,跳转到步骤一 - 将数据结果持久化到 kafka 中,跳转到步骤一
首先安装程序需要的 python 库
```
pip install pyspark
pip install kafka-python
```
首先我们必须导入必要的类并创建一个本地SparkSession 然后我们必须导入必要的类并创建一个本地SparkSession
``` ```
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
@ -55,7 +157,7 @@ df = spark \
#通过`DataFrame`创建`planeNumber`表 #通过`DataFrame`创建`planeNumber`表
df.createOrReplaceTempView("planeNumber") df.createOrReplaceTempView("planeNumber")
#获取查询数据 #获取查询数据
sql=spark.sql("select count(*) num,value from yy group by value order by num desc ") sql=spark.sql("select count(*) num,value from planeNumber group by value order by num desc ")
``` ```
我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。 我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。

Loading…
Cancel
Save