摘要:介紹是一個分布式的流數(shù)據(jù)平臺,可發(fā)布訂閱消息流,使用進行集群管理。啟動一個,拉取消息參數(shù)表示從頭開始讀取數(shù)據(jù),如果不設(shè)置,則只讀取最新的數(shù)據(jù)。消息發(fā)布者,方式,負(fù)責(zé)發(fā)布消息到。表明消息,被同一內(nèi)的均分了。
介紹
Kafka是一個分布式的流數(shù)據(jù)平臺,可發(fā)布、訂閱消息流,使用zookeeper進行集群管理。也可作為一個消息隊列中間件,類似于RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn開源,用Scala語言實現(xiàn)。
Kafka有如下特點:
kafka利用線性存儲來進行硬盤讀寫,速度快;
以時間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能。因此不清除存儲的數(shù)據(jù)并不會影響性能;
zero-copy Gzip和Snappy壓縮
已消費的消息不會自動刪除
考慮到高效性,對事務(wù)的支持較弱。
應(yīng)用場景 安裝使用// 從官網(wǎng)下載最新版本,這里為:kafka_2.11-1.0.0.tgz
// 解壓
$ tar -xzf kafka_2.11-1.0.0.tgz $ cd kafka_2.11-1.0.0
// Kafka用到了zookeeper,所以需要啟動zookeeper(新版本內(nèi)置了zookeeper,如果讀者已有其他zookeeper啟動了,這步可以略過)
$ bin/zookeeper-server-start.sh config/zookeeper.properties
// 修改配置文件,并啟動kafka server:
config/server.properties中的zookeeper.connect默認(rèn)為localhost:2181,可以修改為其他的zookeeper地址。多個地址間,通過逗號分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默認(rèn)為9092端口,通過修改“l(fā)isteners=PLAINTEXT://:9092” 來指定其他端口或IP。
配置好后,啟動kafka server:
$ bin/kafka-server-start.sh config/server.properties
配置文件目錄下還有consumer.properties和producer.properties,按默認(rèn)即可。
// 創(chuàng)建一個topic,topic名稱為test
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以通過命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看當(dāng)前所有的topic.
// 通過producer發(fā)送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
往test中發(fā)送數(shù)據(jù)。
// 啟動一個consumer,拉取消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
--from-beginning參數(shù)表示從頭開始讀取數(shù)據(jù),如果不設(shè)置,則只讀取最新的數(shù)據(jù)。
// 可以再啟動一個Server
$ bin/kafka-server-start.sh config/server-1.properties &
這里的server-1.properties是拷貝的server.properties,主要修改如下幾個參數(shù):
# broker id,整數(shù),和其他broker不能重復(fù) broker.id=2 # 指定端口為9094,因為在同一臺機器上,需要避免端口沖突。這里沒有配置IP,默認(rèn)為本機 listeners=PLAINTEXT://:9094 # 日志文件路徑,即topic數(shù)據(jù)的存儲位置。不同的broker,指定不同的路徑。 log.dir=/tmp/kafka-logs-2示意圖
Producer1和Producer2往Topic A中發(fā)送消息,Consumer1/2/3/4/5 從Topic中接收消息。
Kafka Cluster包含兩個Server,分別為Server1,Server2。
Topic A包含4個Partition,為:P0, P1, P3, P4,平均分配到Server1和Server2上。
一個Broker就是一個server。多個Broker構(gòu)成一個kafka集群,同時對外提供服務(wù),如果某個節(jié)點down掉,則重新分配。
注意:集群和主從熱備不同,對于主從熱備,同時只有一個節(jié)點提供服務(wù),其他節(jié)點待命狀態(tài)。
消息發(fā)布者,Push方式,負(fù)責(zé)發(fā)布消息到Kafka broker。
Consumer消費者,Pull方式,消費消息。每個consumer屬于一個特定的consuer group。
主題(Topic)通過對消息指定主題可以將消息分類,Consumer可以只關(guān)注特定Topic中的消息。
查看總共有多少個Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
查看某個topic的情況(分區(qū)、副本數(shù)等),這里查看topic為test的信息:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test消費者組(Consumer Group)
每個Consumer都會歸到一個Group中。某個Partition中的消息,可以被多個Group消費,但只能被Group中的一個Consumer消費。所以,如果要對多個Consumer進行消息廣播,則這些Consumer需要放在不同的Group中。
當(dāng)一個Consumer進程或線程掛掉,它所訂閱的Partition會被重新分配到該Group內(nèi)的其他Consumer上。如果Consumer A訂閱了多個Partition,那么當(dāng)該Group內(nèi)新增Consumer B時,會從Consumer A中分配出一個Partition給Consumer B。
為了維持Consumer 與 Consumer Group的關(guān)系,需要Consumer周期性的發(fā)送heartbeat到coordinator(協(xié)調(diào)者,在早期版本,以zookeeper作為協(xié)調(diào)者。后期版本則以某個broker作為協(xié)調(diào)者)。當(dāng)Consumer由于某種原因不能發(fā)Heartbeat到coordinator時,并且時間超過session.timeout.ms時,就會認(rèn)為該consumer已退出,它所訂閱的partition會分配到同一group 內(nèi)的其它的consumer上。而這個過程,被稱為rebalance。
位移(Offset)Offset是針對Partition的,它用來記錄消費到Partition中的哪條消息了。
Consumer并不維護Offset,而是由Consumer所在的Group維護。因此,Group中的一個Consumer消費了某個Partition中的消息,那么該組的其他Consumer就不能重復(fù)消費該條消息了,因為Offset已經(jīng)+1了。
上圖中,Consumer A和Consumer B屬于不同的Group。Consumer A所在的Group,在該Partition的Offset=9,表示下次該Group獲取消息時是從9開始獲取;同理,Consumer B所在的Group在該Partition的Offset=11,下次該Group的Consumer獲取消息時,從11開始獲取。
分區(qū)(Partition)Partition是物理上的概念,每個Partition對應(yīng)一個文件夾(默認(rèn)在/tmp/kafka-logs下,通過server.properties中l(wèi)og.dirs配置)。一個topic可以對應(yīng)多個partition,consumer訂閱的其實就是partition。
上圖表示一個Topic,指定了3個分區(qū)。在向該Topic寫數(shù)據(jù)時,會根據(jù)均衡策略,往相應(yīng)的分區(qū)中寫。這3個分區(qū)中的數(shù)據(jù)是不一樣的,它們的數(shù)據(jù)總和,構(gòu)成該Topic的數(shù)據(jù)。
每個分區(qū)中的數(shù)據(jù),保證嚴(yán)格的寫入順序。
分區(qū)會自動根據(jù)均衡策略分配到多個broker上。比如有2個broker(或者叫Server):broker1, broker2,創(chuàng)建一個包含4個partition且replication-factor(副本)為1的topic,那么對于該topic,每個broker會被分配2個partition。如下圖:
有兩個Group:Group A和Group B,其中Group A包含C1、C2兩個Consumer;Group B包含C3,C4,C5,C6四個Consumer。
如果向該Topic寫入4條信息:M1, M2, M3, M4。那么各個Consumer收到的消息是(一種情況):
C1:M1, M3 C2:M2, M4 C3:M1 C4:M3 C5:M2 C6:M4
C1,C2各接收到2條消息,它們的和為:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1條消息,它們的和為:M1,M2,M3,M4。
表明Topic消息,被同一Group內(nèi)的Consumer均分了。因為每次向Topic中寫入消息時,會被均分至各個Partition,然后各Consumer收到自己所訂閱Partition的消息。同時,這里也說明了同一個partition內(nèi)的消息只能被同一個組中的一個consumer消費。
注:如果replication-factor為3,那么每個broker會有6(即2x3)個partition。
另外,創(chuàng)建topic時,在當(dāng)前的所有broker間進行均分,分好后就不會變了。假設(shè)把上述broker1停掉,它的分區(qū)不會轉(zhuǎn)到broker2上。producer在寫消息時,不會再寫入broker2中的分區(qū)。
那么,原先訂閱broker2分區(qū)的consumer,不能接收消息了。提示:
WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
對于一個Topic的partition數(shù),增加Broker(即服務(wù)節(jié)點)并不會增加partition的數(shù)量。
驗證:
查看topic信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
再啟動一個新的Broker:
$ bin/kafka-server-start.sh ../config/server-1.properties
啟動后,再用上一步的命令看topic信息,partition數(shù)量并未改變。
并且,如果group g1上有兩個consumer,始終只會有一個consumer能收到該topic的消息,另一個一直處于空閑狀態(tài)(光占著資源不做事)。所以,Topic的Partition數(shù),要大于等于Consumer數(shù)量。
默認(rèn)組的疑問
可能讀者會有疑問,通過命令:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
執(zhí)行多次,創(chuàng)建了多個consumer,這些consumer屬于默認(rèn)的一個組,但是卻能同時收到一個topic的信息。和上述所說的“一個Topic中的消息,只能被group中的一個consumer消費”有沖突。
其實,不指定group名稱,的確會分配默認(rèn)的group,但每次分配的名稱是不一樣的,即這里創(chuàng)建的consumer是屬于不同的group的。可以通過命令查看所有g(shù)roup:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-94070 console-consumer-27823 console-consumer-4826 console-consumer-47050
可以看出,這里的group名稱是不一樣的。
consumer數(shù)量和group數(shù)量
對于一個topic,如果group中consumer數(shù)量比partition數(shù)量多,那么多余的consumer會空閑。這是因為,group中的某個consumer一旦訂閱了某個partition,則會一直占用并消費該partition中的信息。除非該consumer退出,否則該partition不會被該組的其他consumer占用。所以會導(dǎo)致多余的consumer空閑,一直收不到消息。
可以通過命令,查看consumer和partition的對應(yīng)關(guān)系:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4
一個topic可以對應(yīng)多個partition,但一個partition只能對應(yīng)一個topic。
數(shù)據(jù)文件分段Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個多帶帶的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
上圖展示文件傳輸?shù)絊ocket的常規(guī)方式,步驟:
操作系統(tǒng)將文件數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存;
應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中;
應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中;
操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出。
上圖展示零拷貝方式傳輸文件到Socket,少了文件緩存到用戶空間,再由用戶空間到內(nèi)核空間的操作。
Kafka采用零拷貝的方式。
通過Replication Factor指定副本的數(shù)量,這樣,如果一個Partition出現(xiàn)了問題,那么可以從副本中恢復(fù)了。
Kafka Manager安裝和使用如果不喜歡通過命令行操作,也可以通過圖形化管理界面,比如yahoo開源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
這里以CentOS7為例,進行編譯、運行說明。
注:Kafka Manager的編譯需要javac,需要安裝jdk環(huán)境。最新版的需要jdk8版本。
CentOS7默認(rèn)安裝了OpenJDK,將其卸載,從Oracle官網(wǎng)下載jdk8文件,然后安裝。
// github上下載kafka manager源碼
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager
// 修改配置文件中zookeeper地址
配置文件:conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
如果有多個zookeeper,通過逗號分隔,如:
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
// 將源碼編譯打包成zip包
$ ./sbt clean dist
這一步用到了javac,運行完后,會在當(dāng)前目錄下生成target文件夾。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip
// 進入zip所在目錄,解壓zip包,啟動服務(wù)(默認(rèn)9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager
// 打開Kafka Manager頁面
瀏覽器輸入地址:http://192.168.0.12:9000/ (這里的IP需要替換成讀者自己的IP)
很簡潔的一個頁面,第一次打開,什么都沒有。
// 添加一個Cluster
Cluster Name: 名稱隨意,比如MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,比如:192.168.0.12:2181
Kafka Version: 筆者選的0.11
勾選“Enable JMX Polling”。注意:勾選了該項,啟動kafka server前,需要設(shè)置JMX_PORT變量,如:
$ JMX_PORT=9999 $ bin/zookeeper-server-start.sh config/zookeeper.properties
保存后,就可以通過MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:如果查看不了Consumer信息,提示“Please enable consumer polling here.”,需要勾選一個配置。如:
提示信息:
修改Cluster:
勾選中“Poll consumer information”
保存。具體的管理功能,可以通過操作頁面進一步挖掘。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/70973.html
摘要:關(guān)閉套接字和上下文備注說明如何利用使用首先下載所需的包,解壓以后將和文件放到自己電腦中的安裝路徑中的文件夾下,最后需要將之前解壓后的包放在項目的中或者資源下載鏈接密碼項目源碼下載鏈接鏈接密碼 在講ZeroMQ前先給大家講一下什么是消息隊列。 消息隊列簡介: 消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問題。實現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是...
閱讀 1887·2021-11-15 11:46
閱讀 1077·2021-10-26 09:49
閱讀 1820·2021-10-14 09:42
閱讀 3374·2021-09-26 09:55
閱讀 827·2019-08-30 13:58
閱讀 1024·2019-08-29 16:40
閱讀 3462·2019-08-26 10:27
閱讀 601·2019-08-23 18:18