• -------------------------------------------------------------
  • ====================================

使用Kafka Stream处理数据

kafka dewbay 5年前 (2019-04-12) 2921次浏览 已收录 0个评论 扫描二维码

Step 8: 使用 Kafka Stream 来处理数据

Kafka Streamkafka的客户端库,用于实时流处理和分析存储在kafka broker 的数据,这个快速入门示例将演示如何运行一个流应用程序。一个 WordCountDemo 的例子(为了方便阅读,使用的是 java8 lambda 表达式)

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

它实现了 wordcount 算法,从输入的文本计算出一个词出现的次数。然而,不像其他的 WordCount 的例子,你可能会看到,在有限的数据之前,执行的演示应用程序的行为略有不同,因为它的目的是在一个无限的操作,数据流。类似的有界变量,它是一种动态算法,跟踪和更新的单词计数。然而,由于它必须假设潜在的无界输入数据,它会定期输出其当前状态和结果,同时继续处理更多的数据,因为它不知道什么时候它处理过的“所有”的输入数据。

现在准备输入数据到kafka的 topic 中,随后kafka Stream应用处理这个 topic 的数据。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

接下来,使用控制台的 producer 将输入的数据发送到指定的 topic(streams-file-input)中,(在实践中,stream 数据可能会持续流入,其中kafka的应用将启动并运行)

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input


> cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input

现在,我们运行 WordCount demo 处理输入的数据:

> ./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo

不会有任何的 STDOUT 输出,除了日志,结果不断地写回另一个 topic(streams-wordcount-output),demo 运行几秒,然后,不像典型的流处理应用程序,自动终止。

现在我们检查 WordCountDemo 应用,从输出的 topic 读取。

> ./bin/kafka-console-consumer --zookeeper localhost:2181 
            --topic streams-wordcount-output 
            --from-beginning 
            --formatter kafka.tools.DefaultMessageFormatter 
            --property print.key=true 
            --property print.key=true 
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

输出数据打印到控台(你可以使用 Ctrl-C 停止):

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1
^C

第一列是 message 的 key,第二列是 message 的 value,要注意,输出的实际是一个连续的更新流,其中每条数据(即:原始输出的每行)是一个单词的最新的 count,又叫记录键“kafka”。对于同一个 key 有多个记录,每个记录之后是前一个的更新。

作者:無名
链接:http://orchome.com/300#
来源:OrcHome
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:使用Kafka Stream处理数据
喜欢 (1)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址