摘要:目標了解會使用過程首先示例代碼以上它實現了算法,該算法從輸入文本計算單詞出現的直方圖。
歡迎關注公眾號:n平方
如有問題或建議,請后臺留言,我會盡力解決你的問題。
本文主要介紹【KafkaStreams】
簡介Kafka Streams編寫關鍵任務實時應用程序和微服務的最簡單方法,是一個用于構建應用程序和微服務的客戶端庫,其中輸入和輸出數據存儲在Kafka集群中。它結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性和Kafka服務器端集群技術的優點。
Kafka Streams是一個用于構建關鍵任務實時應用程序和微服務的客戶端庫,其中輸入和/或輸出數據存儲在Kafka集群中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性和Kafka服務器端集群技術的優點,使這些應用程序具有高度可伸縮性、靈活性、容錯性、分布式等等。
目標了解kafka Streams
會使用kafka Streams
過程1.首先WordCountDemo示例代碼(Java8以上)
// Serializers/deserializers (serde) for String and Long types final SerdestringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
它實現了WordCount算法,該算法從輸入文本計算單詞出現的直方圖。然而,與您以前可能看到的對有界數據進行操作的其他WordCount示例不同,WordCount演示應用程序的行為略有不同,因為它被設計為對無限、無界的數據流進行操作。與有界變量類似,它是一種有狀態算法,用于跟蹤和更新單詞的計數。然而,由于它必須假定輸入數據可能是無界的,因此它將周期性地輸出當前狀態和結果,同時繼續處理更多的數據,因為它不知道何時處理了“所有”輸入數據。
2.安裝并啟動zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
3.創建主題
接下來,我們創建名為streams-plain -input的輸入主題和名為streams-wordcount-output的輸出主題:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input Created topic "streams-plaintext-input"
我們創建啟用壓縮的輸出主題,因為輸出流是一個變更日志流.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact Created topic "streams-wordcount-output"
創建的主題也可以使用相同的kafka主題進行描述
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
4.啟動Wordcount應用程序
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
a)演示應用程序將從輸入主題流(明文輸入)中讀取,對每個讀取的消息執行WordCount算法的計算,并不斷將其當前結果寫入輸出主題流(WordCount -output)。因此,除了日志條目之外,不會有任何STDOUT輸出,因為結果是用Kafka寫回去的。
b)現在我們可以在一個多帶帶的終端上啟動控制臺生成器,向這個主題寫入一些輸入數據和檢查輸出的WordCount演示應用程序從其輸出主題與控制臺消費者在一個多帶帶的終端.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
c)輸入端:現在讓我們使用控制臺生成器將一些消息寫入輸入主題流——純文本輸入,方法是輸入一行文本,然后單擊。這將發送新消息輸入主題,消息鍵為空和消息值是剛才輸入的字符串編碼的文本行。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
此時你可以在控制臺輸入如下字符:
all streams lead to kafka
d))輸出端:此消息將由Wordcount應用程序處理,以下輸出數據將寫入streams-wordcount-output主題并由控制臺使用者打印:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
這個時候會接收到剛剛在控制臺輸入的單詞統計結果:
all 1 streams 1 lead 1 to 1 kafka 1
如此類推:你可以在輸入端輸入單詞,對應的在輸出端就會有統計結果。
小結:
可以看到,Wordcount應用程序的輸出實際上是連續的更新流,其中每個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是記錄鍵,如“kafka”。對于具有相同鍵的多個記錄,后面的每個記錄都是前一個記錄的更新。
下面的兩個圖說明了幕后的本質。第一列顯示KTable的當前狀態的演變,該狀態為count計算單詞出現的次數。第二列顯示KTable的狀態更新所產生的更改記錄,這些記錄被發送到輸出Kafka主題流-wordcount-output。
最后本人水平有限,歡迎各位建議以及指正。順便關注一下公眾號唄,會經常更新文章的哦。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73235.html
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
閱讀 3247·2021-09-22 15:58
閱讀 1717·2019-08-30 14:17
閱讀 1716·2019-08-28 18:05
閱讀 1505·2019-08-26 13:33
閱讀 683·2019-08-26 12:20
閱讀 606·2019-08-26 12:18
閱讀 3192·2019-08-26 11:59
閱讀 1401·2019-08-26 10:36