From c85cf7127c3d59b029bd48935ceecd1093335e9b Mon Sep 17 00:00:00 2001 From: zy <370308065@qq.com> Date: Wed, 27 Nov 2019 10:49:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...3.2对飞机的点击次数实时统计.md | 106 +++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/chapter3/3.2对飞机的点击次数实时统计.md b/chapter3/3.2对飞机的点击次数实时统计.md index 23784ec..1b14bf8 100644 --- a/chapter3/3.2对飞机的点击次数实时统计.md +++ b/chapter3/3.2对飞机的点击次数实时统计.md @@ -11,6 +11,103 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一

+### 3.1.2 Kafka安装 + + +首先需要在你的计算机上安装 ZooKeeper 框架,请访问以下链接并下载最新版本的 ZooKeeper。 + +http://zookeeper.apache.org/releases.html + +现在,最新版本的 ZooKeeper 是 3.4.6(ZooKeeper-3.4.6.tar.gz)。 + +使用以下命令提取tar文件 +``` +$ cd opt/ +$ tar -zxf zookeeper-3.4.6.tar.gz +$ cd zookeeper-3.4.6 +$ mkdir data +``` + +使用命令vi “conf/zoo.cfg"打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。 +``` +$ vi conf/zoo.cfg +tickTime=2000 +dataDir=/path/to/zookeeper/data +clientPort=2181 +initLimit=5 +syncLimit=2 +``` +一旦配置文件成功保存并再次返回终端,你可以启动 zookeeper 服务器。 + +可以使用如下命令启动 ZooKeeper 服务器: +``` +$ bin/zkServer.sh start +``` + +执行此命令后,你将得到如下所示的响应 + +``` +$ JMX enabled by default +$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg +$ Starting zookeeper ... STARTED +``` + + +``` +$ bin/zkCli.sh +``` +输入上面的命令后,你将被连接到zookeeper服务器,并将获得以下响应。 + +``` +Connecting to localhost:2181 +................ +................ +................ +Welcome to ZooKeeper! +................ +................ +WATCHER:: +WatchedEvent state:SyncConnected type: None path:null +[zk: localhost:2181(CONNECTED) 0] +``` +连接服务器并执行所有操作后,可以使用以下命令停止 zookeeper 服务器 - +``` +$ bin/zkServer.sh stop +``` +现在你已经在你的机器上成功安装了 Java 和 ZooKeeper 。 让我们看看安装 Apache Kafka 的步骤。 + + +要在您的机器上安装Kafka, +https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz 把 kafka_2.11_0.9.0.0.tgz 将下载到您的计算机上。 + +使用以下命令提取tar文件 +``` +$ cd opt/ +$ tar -zxf kafka_2.11.0.9.0.0 tar.gz +$ cd kafka_2.11.0.9.0.0 +``` +现在您已经在你的机器上下载了最新版本的 Kafka。 + +你可以通过给出以下命令来启动服务器 +``` +$ bin/kafka-server-start.sh config/server.properties +``` + +服务器启动后,您会在屏幕上看到以下响应: + +``` +$ bin/kafka-server-start.sh config/server.properties +[2016-01-02 15:37:30,410] INFO KafkaConfig values: +request.timeout.ms = 30000 +log.roll.hours = 168 +inter.broker.protocol.version = 0.9.0.X +log.preallocate = false +security.inter.broker.protocol = PLAINTEXT +``` +执行所有操作后,可以使用以下命令停止服务器 +``` +$ bin/kafka-server-stop.sh config/server.properties +``` @@ -24,9 +121,14 @@ Kafka 中发布订阅的对象是 Topic。我们可以为每类数据创建一 - 将数据结果持久化到 kafka 中,跳转到步骤一 +首先安装程序需要的 python 库 +``` +pip install pyspark +pip install kafka-python +``` -首先我们必须导入必要的类并创建一个本地SparkSession +然后我们必须导入必要的类并创建一个本地SparkSession ``` from pyspark.sql import SparkSession @@ -55,7 +157,7 @@ df = spark \ #通过`DataFrame`创建`planeNumber`表 df.createOrReplaceTempView("planeNumber") #获取查询数据 -sql=spark.sql("select count(*) num,value from yy group by value order by num desc ") +sql=spark.sql("select count(*) num,value from planeNumber group by value order by num desc ") ``` 我们对流数据进行了查询,剩下的就是实际开始接收数据并计算次数了。由于 kafka 只能存一个列值'value',为此我们要拼接查询结果,把 value 和 num 通过 `_` 拼接在一起,并设置别名 'value',然后我们将其设置outputMode("complete")为在每次更新计数时将完整的结果集(由指定)存到 kafka 的 'jun' 的 Topic 里 。最后使用开始流计算start()。