国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

學習kafka教程(二)

Prasanta / 1106人閱讀

摘要:目標了解會使用過程首先示例代碼以上它實現了算法,該算法從輸入文本計算單詞出現的直方圖。

歡迎關注公眾號:n平方
如有問題或建議,請后臺留言,我會盡力解決你的問題。

本文主要介紹【KafkaStreams】

簡介

Kafka Streams編寫關鍵任務實時應用程序和微服務的最簡單方法,是一個用于構建應用程序和微服務的客戶端庫,其中輸入和輸出數據存儲在Kafka集群中。它結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性和Kafka服務器端集群技術的優點。

Kafka Streams是一個用于構建關鍵任務實時應用程序和微服務的客戶端庫,其中輸入和/或輸出數據存儲在Kafka集群中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性和Kafka服務器端集群技術的優點,使這些應用程序具有高度可伸縮性、靈活性、容錯性、分布式等等。

目標

了解kafka Streams

會使用kafka Streams

過程

1.首先WordCountDemo示例代碼(Java8以上)

// Serializers/deserializers (serde) for String and Long types
final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);
 
KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count()
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

它實現了WordCount算法,該算法從輸入文本計算單詞出現的直方圖。然而,與您以前可能看到的對有界數據進行操作的其他WordCount示例不同,WordCount演示應用程序的行為略有不同,因為它被設計為對無限、無界的數據流進行操作。與有界變量類似,它是一種有狀態算法,用于跟蹤和更新單詞的計數。然而,由于它必須假定輸入數據可能是無界的,因此它將周期性地輸出當前狀態和結果,同時繼續處理更多的數據,因為它不知道何時處理了“所有”輸入數據。

2.安裝并啟動zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

3.創建主題
接下來,我們創建名為streams-plain -input的輸入主題和名為streams-wordcount-output的輸出主題:

bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic streams-plaintext-input
Created topic "streams-plaintext-input"

我們創建啟用壓縮的輸出主題,因為輸出流是一個變更日志流.

bin/kafka-topics.sh --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic streams-wordcount-output 
    --config cleanup.policy=compact
Created topic "streams-wordcount-output"

創建的主題也可以使用相同的kafka主題進行描述

bin/kafka-topics.sh --zookeeper localhost:2181 --describe

4.啟動Wordcount應用程序

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

a)演示應用程序將從輸入主題流(明文輸入)中讀取,對每個讀取的消息執行WordCount算法的計算,并不斷將其當前結果寫入輸出主題流(WordCount -output)。因此,除了日志條目之外,不會有任何STDOUT輸出,因為結果是用Kafka寫回去的。

b)現在我們可以在一個多帶帶的終端上啟動控制臺生成器,向這個主題寫入一些輸入數據和檢查輸出的WordCount演示應用程序從其輸出主題與控制臺消費者在一個多帶帶的終端.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
    --topic streams-wordcount-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

c)輸入端:現在讓我們使用控制臺生成器將一些消息寫入輸入主題流——純文本輸入,方法是輸入一行文本,然后單擊。這將發送新消息輸入主題,消息鍵為空和消息值是剛才輸入的字符串編碼的文本行。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

此時你可以在控制臺輸入如下字符:

all streams lead to kafka

d))輸出端:此消息將由Wordcount應用程序處理,以下輸出數據將寫入streams-wordcount-output主題并由控制臺使用者打印:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
    --topic streams-wordcount-output 
    --from-beginning 
    --formatter kafka.tools.DefaultMessageFormatter 
    --property print.key=true 
    --property print.value=true 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

這個時候會接收到剛剛在控制臺輸入的單詞統計結果:

all     1
streams 1
lead    1
to      1
kafka   1

如此類推:你可以在輸入端輸入單詞,對應的在輸出端就會有統計結果。

小結:
可以看到,Wordcount應用程序的輸出實際上是連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是記錄鍵,如“kafka”。對于具有相同鍵的多個記錄,后面的每個記錄都是前一個記錄的更新。

下面的兩個圖說明了幕后的本質。第一列顯示KTable的當前狀態的演變,該狀態為count計算單詞出現的次數。第二列顯示KTable的狀態更新所產生的更改記錄,這些記錄被發送到輸出Kafka主題流-wordcount-output。

最后

本人水平有限,歡迎各位建議以及指正。順便關注一下公眾號唄,會經常更新文章的哦。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73235.html

相關文章

  • 寫這么多系列博客,怪不得找不到女朋友

    摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...

    JerryWangSAP 評論0 收藏0

發表評論

0條評論

Prasanta

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<